In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Load data
test_data = pd.read_csv(r"D:\Downloads\Germany_Volatile_Test_Data.csv")
predictions = pd.read_csv(r"D:\Downloads\kalman_predictions.csv")

test_data['Datetime'] = pd.to_datetime(test_data['Datetime'])
predictions['Datetime'] = pd.to_datetime(sarimax_predictions['Datetime'])

data = test_data.merge(
    predictions, 
    on='Datetime', 
    suffixes=('_actual', '_predicted')
)

data.rename(columns={'Price (EUR/MWHE)': 'market_price', 'Predicted Price': 'market_price_predicted'}, inplace=True)

data['market_price'] /= 1000
data['market_price_predicted'] /= 1000


battery_capacity = 55 
starting_soc = 50 
initial_budget = 100  
charge_efficiency = 0.95
discharge_efficiency = 0.95

n_soc_states = 10 
n_price_states = 10  
actions = ["Charge", "Discharge", "Hold"]
gamma = 0.875

def get_state(soc, price):
    soc_state = min(n_soc_states - 1, int(soc / (100 / n_soc_states)))
    price_state = min(n_price_states - 1, int(price / (data['market_price'].max() / n_price_states)))
    return soc_state, price_state

transition_probabilities = {}
reward_function = {}
value_function = {}

for soc_state in range(n_soc_states):
    for price_state in range(n_price_states):
        for action in actions:
            transition_probabilities[(soc_state, price_state, action)] = {}
            reward_function[(soc_state, price_state, action)] = 0
            value_function[(soc_state, price_state)] = 0

df calculate_reward(soc, action, current_price, predicted_future_price):
    reward = 0
    soc_kwh = (soc / 100) * battery_capacity  
    if action == "Charge" and soc < 100:
        amount = min(battery_capacity - soc_kwh, 10) 
        reward = (predicted_future_price - current_price)*amount*charge_efficiency  
        soc += (amount / battery_capacity) * 100  
    elif action == "Discharge" and soc > 0:
        amount = min(soc_kwh, 10)  
        reward = (current_price-predicted_future_price)*amount*charge_efficiency  
        soc -= (amount / battery_capacity) * 100  
    elif action == "Hold":
        price_ratio = predicted_future_price / current_price
        price_ratio_lower = 0.975
        price_ratio_upper = 1.025
        if price_ratio_lower < price_ratio and price_ratio < price_ratio_upper:
            reward = current_price*0.1
        else:
            reward = -0.1  
    return reward, soc

for episode in range(70):
    soc = starting_soc
    for t in range(len(data) - 1):
        current_price = data['market_price'].iloc[t]
        predicted_future_price = data['market_price_predicted'].iloc[t + 1]
        
        state = get_state(soc, current_price)
        action_values = []
        for action in actions:
            reward, new_soc = calculate_reward(soc, action, current_price, predicted_future_price)
            action_values.append(reward + gamma * value_function.get(get_state(new_soc, predicted_future_price), 0))
        
        best_action_value = max(action_values)
        best_action_index = action_values.index(best_action_value)
        best_action = actions[best_action_index]
        
        value_function[state] = best_action_value
        
        reward, soc = calculate_reward(soc, best_action, current_price, predicted_future_price)

        transition_probabilities[(state[0], state[1], best_action)] = reward

cumulative_profits = []
soc_values = [starting_soc]
decisions = []

soc = starting_soc
budget = initial_budget

charge_indices = []
discharge_indices = []
hold_indices = []

for t in range(len(data) - 1):
    current_price = data['market_price'].iloc[t]
    predicted_future_price = data['market_price_predicted'].iloc[t + 1]
    state = get_state(soc, current_price)

    action_values = []
    for action in actions:
        reward, new_soc = calculate_reward(soc, action, current_price, predicted_future_price)
        action_values.append(reward + gamma * value_function.get(get_state(new_soc, predicted_future_price), 0))

    best_action_value = max(action_values)
    best_action_index = action_values.index(best_action_value)
    best_action = actions[best_action_index]

    decisions.append(best_action)

    if best_action == "Charge":
        charge_indices.append(t)
    elif best_action == "Discharge":
        discharge_indices.append(t)
    else:
        hold_indices.append(t)

    reward, soc = calculate_reward(soc, best_action, current_price, predicted_future_price)
    soc_values.append(soc)
    if best_action == "Charge":
        amount = min(battery_capacity - (soc / 100) * battery_capacity, 10)  
        cost = current_price * amount / charge_efficiency
        budget -= cost  
    elif best_action == "Discharge":
        amount = min((soc / 100) * battery_capacity, 10) 
        revenue = predicted_future_price * amount * discharge_efficiency
        budget += revenue

    cumulative_profits.append(budget - initial_budget)

plt.figure(figsize=(12, 6)

x = np.arange(len(data)) 
plt.plot(x, data["market_price"], label="Market Price (EUR/kWh)", color="blue", linestyle="--")
plt.scatter(charge_indices, data["market_price"].iloc[charge_indices], color="green", label="Charge Actions")
plt.scatter(discharge_indices, data["market_price"].iloc[discharge_indices], color="red", label="Discharge Actions")
plt.scatter(hold_indices, data["market_price"].iloc[hold_indices], color="orange", label="Hold Actions")
plt.plot(x, data["market_price_predicted"], label="Predicted Price (EUR/kWh)", color="purple", linestyle=":")
plt.xlabel("Index")
plt.ylabel("Price (EUR/kWh)")
plt.legend()
plt.grid()

plt.plot(cumulative_profits, label="Cumulative Profit (€)", color="orange")
plt.axhline(0, color="red", linestyle="--", label="Break-even")
plt.xlabel("Time Step")
plt.ylabel("Profit (€)")
plt.title("Cumulative Profit over Time")
plt.legend()
plt.grid()

plt.tight_layout()
plt.show()

print(f"Final budget: €{budget:.2f}")
print(f"Net profit: €{budget - initial_budget:.2f}")
