In [None]:
import pandas as pd
import itertools
import numpy as np
from datetime import date, time, datetime

import warnings
warnings.filterwarnings("ignore")

In [None]:
# Battery properties
BATTERY_POWER = 300
BATTERY_CAP = 580
CHARGE_EFF = 90
DISCHARGE_EFF = 90
MLF = 0.991

In [None]:
def raw_power(charge_forecast, discharge_forecast, opening_cap):
    '''Takes in the forecasted battery behaviour and opening capacity, returns the amount of raw power.'''
    
    if charge_forecast == 1:
        return -min(BATTERY_POWER, (BATTERY_CAP - opening_cap) / (CHARGE_EFF / 100) * 2) 
    elif discharge_forecast == 1:
        return min(BATTERY_POWER, opening_cap * 2)
    else:
        return 0

In [None]:
def market_dispatch(raw_power):
    '''Takes in the raw power, returns the power for market dispatch.'''
    
    if raw_power < 0:
        return raw_power / 2
    elif raw_power > 0:
        return (raw_power / 2) * DISCHARGE_EFF / 100
    else:
        return 0 

In [None]:
def market_revenue(market_dispatch, spot_price):
    '''Takes in the power for market dispatch and spot price, returns the market revenue generated.'''
    
    if market_dispatch < 0:
        return market_dispatch * spot_price * (1 / MLF)
    elif market_dispatch > 0:
        return market_dispatch * spot_price * MLF
    else:
        return 0

In [None]:
def closing_capacity(market_dispatch, opening_cap):
    '''Takes in the power for market dispatch and opening capacity, returns the closing capacity.'''
    
    if market_dispatch < 0:
        closing_cap_cand = opening_cap - market_dispatch * (CHARGE_EFF / 100)
        return max(0, min(closing_cap_cand, BATTERY_CAP))
    elif market_dispatch >= 0:
        closing_cap_cand = opening_cap - market_dispatch * (100 / DISCHARGE_EFF)
        return max(0, min(closing_cap_cand, BATTERY_CAP))
    else:
        return 0

In [None]:
def weighted_future_avg(df, index, num_future_periods):
    '''Takes in the full set of spot prices and index of the current period, 
    returns the weighted future average price relative to the current period.'''
    
    total_periods = len(df)
    
    # Compute the weighted future average price relative to periods with at least 10 future periods
    if index < (total_periods - num_future_periods):   
        future_df = df.loc[(index + 1):(index + num_future_periods), "Spot Price"].to_frame()
        future_df["Weights"] = list(range(num_future_periods, 0, -1))
        future_avg = round(np.average(future_df["Spot Price"], weights = future_df["Weights"]),2)
        
    # Compute the weighted future average price relative to periods with less than 10 future periods, excluding the last period
    elif (index >= (total_periods - num_future_periods)) and (index != (total_periods - 1)):
        future_df = df.loc[(index + 1):total_periods, "Spot Price"].to_frame()
        future_df["Weights"] = list(range(num_future_periods, num_future_periods - (total_periods - index) + 1, -1))
        future_avg = round(np.average(future_df["Spot Price"], weights = future_df["Weights"]),2)
    
    # Set the weighted future average price of the last period as its spot price
    elif index == (total_periods - 1):
        future_avg = df.loc[index,"Spot Price"]
        
    else:
        future_avg = 0
        
    return future_avg

In [None]:
def battery_forecast(df, index, comparison_threshold):
    '''Sets the forecasted charge and discharge behaviour of the current period.'''
    
    current_price = df.loc[index, "Spot Price"]
    weighted_avg_future = df.loc[index, "Future Average"]
    
    # Calculate the absolute difference between the current price and weighted future average price
    current_future_diff = abs(weighted_avg_future - current_price)
    
    # Determine the discharge behaviour of the current period
    if (current_price > weighted_avg_future) and (current_future_diff >= comparison_threshold):
        df["Discharge Forecast"][index] = 1
    else:
        df["Discharge Forecast"][index] = 0
        
    # Determine the charge behaviour of the current period
    if (current_price < weighted_avg_future) and (current_future_diff >= comparison_threshold):
            df["Charge Forecast"][index] = 1       
    else:
        df["Charge Forecast"][index] = 0
        
    return

In [None]:
# Categorise the technical variables according to data type
floats_vars = ["Future Average", "Raw Power", "Market Dispatch", "Market Revenue", "Opening Capacity", "Closing Capacity"]
ints_vars = ["Charge Forecast", "Discharge Forecast"]

In [None]:
def battery(df, threshold, num_future_periods):
    '''Create a dataframe with all the technical variables.'''
    
    # Initialise all entries as 0
    for var in ints_vars:
        df[var] = 0
    for var in floats_vars:
        df[var] = 0.0
    
    # Update the values of the technical variables in each period
    for index, row in df.iterrows():
        # Computed the weighted future average price 
        future_avg = weighted_future_avg(df, index, num_future_periods)
        df["Future Average"][index] = future_avg
        
        # Forecast the charge and discharge behaviour of the battery
        battery_forecast(df, index, threshold)
        
        # Set the opening capacity of the current period as the previous period's closing capacity, 
        # excluding the first period which is assumed to start discharged
        if index != 0:
            df["Opening Capacity"][index] = df.loc[index-1, "Closing Capacity"]
        
        df["Raw Power"][index] = raw_power(df["Charge Forecast"][index], df["Discharge Forecast"][index], df["Opening Capacity"][index])
        df["Market Dispatch"][index] = market_dispatch(df["Raw Power"][index])
        df["Market Revenue"][index] = market_revenue(df["Market Dispatch"][index], df["Spot Price"][index])
        df["Closing Capacity"][index] = closing_capacity(df["Market Dispatch"][index], df["Opening Capacity"][index])
        
    return

Section 1: Data Preprocessing

In [None]:
# Read the raw dataset
data = pd.read_excel("../../data/market_data.xlsx") 

In [None]:
# Remove variables that are not in the range of study
vic = data.filter(items=["Time (UTC+10)", "Regions VIC Trading Price ($/MWh)"])
vic = vic.rename(columns={"Regions VIC Trading Price ($/MWh)": "Spot Price", "Time (UTC+10)": "Datetime"})

In [None]:
# Recast data type of datetime column to datetime type
vic["Datetime"] = pd.to_datetime(vic["Datetime"])

Section 2: Cross-validation for Hyperparameter Tuning

In [None]:
# Retrieve records for the cross-validation period
cv = vic.copy().loc[(vic["Datetime"].dt.date >= date(2021,1,1)) & (vic["Datetime"].dt.date <= date(2021,6,30))]
cv = cv.reset_index().drop(columns = "index")

In [None]:
opt_threshold = 0
opt_future_periods = 0
max_rev = 0
 
# Determine the optimal number of future periods and optimal comparison threshold by grid search
for future_periods in np.arange(8, 12, 1):
    for threshold in np.arange(6, 10, 0.5):   
        battery(cv, threshold, future_periods)
        
        # Calculate the market revenue with the current combination of hyperparameters
        curr_rev = sum(cv["Market Revenue"])
    
        if curr_rev > max_rev:
            max_rev = curr_rev
            opt_threshold = threshold
            opt_future_periods = future_periods

print("Optimal Number of Future Periods: " + str(opt_future_periods)) 
print("Optimal Comparison Threshold: " + str(opt_threshold)) 

Section 3: Run the Weighted Future Average Algorithm

In [None]:
# Create the dataframe with technical variables for the full sample period
battery(vic, opt_threshold, opt_future_periods)

In [None]:
# Retrieve the index of the first period in the test set
index_test = vic.index[vic["Datetime"] == datetime(2021, 7, 1, 0, 0, 0)][0]

train_cv = vic.iloc[:index_test,]
test = vic.iloc[index_test:,]

# Calculate the market revenue for respective periods
print("Total Revenue for Training + CV Period: " + str(round(sum(train_cv["Market Revenue"]),2)))
print("Total Revenue for Test Period: " + str(round(sum(test["Market Revenue"]),2)))
print("Total Revenue for Full Period: " + str(round(sum(vic["Market Revenue"]),2)))

Section 4: Generate Output Files

In [None]:
# Create output dataframe used for submission
output = vic[['Datetime', 'Raw Power', 'Opening Capacity']]. \
         rename(columns={"Datetime": "datetime", "Raw Power": "power", "Opening Capacity":"capacity"})

In [None]:
# Create submission files
output.to_csv("../../results/mandatory_submission.csv", index=False)