In [2]:
import pandas as pd
import torch as t

from abc import ABC, abstractmethod
from typing import Callable, Dict, List

from m4.pipeline.adaptors import dataframe_to_tensor, tensor_to_dataframe

class Model(ABC):    
    @abstractmethod
    def update(time_series: pd.DataFrame) -> None:
        pass
    
    @abstractmethod
    def predict(time_series: pd.DataFrame, forecast_horizon: int) -> pd.DataFrame:
        pass


class Trainer:
    def __init__(self, 
                 models: Dict[str, Model], 
                 history: pd.DataFrame, 
                 future: pd.DataFrame, 
                 training_loss_fn: Callable[[pd.DataFrame, pd.DataFrame], float],
                 test_loss_fn: Callable[[pd.DataFrame, pd.DataFrame], float]
                ):

        self.models = models
        self.history = history
        self.future = future
        self.reality = pd.concat([history, future], axis=0, ignore_index=True)
        self.training_loss_fn = training_loss_fn
        self.test_loss_fn = test_loss_fn
        
        time_series_dataframes = {'reality': self.reality}
        loss_dataframes = {}
        
        for model_name in self.models:
            time_series_dataframes.update({model_name: pd.DataFrame(columns=self.reality.columns)})
            loss_dataframes.update({model_name: pd.DataFrame(columns=['train', 'test'])})
        
        self.time_series = pd.concat(time_series_dataframes, axis=1)
        self.losses = pd.concat(loss_dataframes, axis=1)
#         self.losses.set_index(list(self.models.keys()))
        
    def train(self):
        for model_name in self.models:
            model = self.models[model_name]
            # train
            model.update(self.history)
            # predict
            prediction = model.predict(self.history, len(self.future))
            assert(len(prediction) == len(self.reality))            
            self.time_series[model_name] = prediction
            # calculate losses            
            training_loss = self.training_loss_fn(prediction[1:-len(self.future)], self.history[1:])
            test_loss = self.test_loss_fn(prediction[-len(self.future):], self.future)
            self.losses.loc[len(self.losses[model_name].dropna()), pd.IndexSlice[model_name]] = [training_loss, test_loss]
            
#
# Base Models
#
    
class Naive(Model):
    def update(self, time_series: pd.DataFrame):
        pass
    
    def predict(self, time_series: pd.DataFrame, forecast_horizon: int) -> pd.DataFrame:
        return time_series.append([time_series[-1:]] * forecast_horizon, ignore_index=True).shift(1)

#
# Utility Functions
#
def L1(df1: pd.DataFrame, df2: pd.DataFrame) -> float:
    return (df1 - df2.values).abs().mean().values[0]

def df_to_tensor(df):
    return t.unsqueeze(dataframe_to_tensor(df), 0)

def tensor_to_df(tensor, df):
    return tensor_to_dataframe(t.squeeze(tensor, 0), df.columns)

def minmax_scale(df):
    min_v = float(df.min(axis=0))
    max_v = float(df.max(axis=0))
    denominator = max_v - min_v
    return df.apply(lambda v: (v - min_v) / denominator)
