In [None]:
# default_exp core

# core

> API details.

In [None]:
#hide
#export
import pandas as pd
from nbdev.showdoc import *
from fastai.data.external import *
from fastcore.all import *
from pathlib import PosixPath
from fastcore.test import *
from fastai.tabular.all import *
import fastai
from fastai.tabular.core import _maybe_expand

In [None]:
#hide
#export
def str_to_path(file: str):
    "Convers a string to a Posixpath."
    if isinstance(file, str) and "~" in file:
        file = os.path.expanduser(file)

    file = Path(file)
    
    return file

In [None]:
#hide
test_eq_type(Path(""), str_to_path(""))
test_eq_type(Path(""), str_to_path(Path("")))

In [None]:
#export
def read_hdf(file:PosixPath, key: str = "/powerdata", key_metadata=None):
    "Reads a hdf5 table based on the given key."
    file = str_to_path(file)
    if "/" not in key: key = "/" + key
    with pd.HDFStore(file, "r") as store:
        if key in store.keys():
            df = store[key]
            if key_metadata is not None:
                df_meta = store[key_metadata]
                for c in df_meta: df[c] = df_meta[c].values[0]
        else:
            df = pd.DataFrame()
    return df

In [None]:
#hide
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},
                  index=['a', 'b', 'c'])
df.to_hdf('data.h5', key='df', mode='w')
test_eq(df, read_hdf("data.h5", key="df"))

In [None]:
#export
def read_csv(file:PosixPath, sep:str =";"):
    "Reads a csv file."
    file = str_to_path(file)
    df = pd.read_csv(str(file), sep=sep)
    df.drop(["Unnamed: 0"], inplace=True, axis=1, errors="ignore")
    return df

In [None]:
#hide
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},)
df.to_csv('data.csv', sep=";")
test_eq(df, read_csv("data.csv", sep=";"))

In [None]:
#export
def read_files(
    files:PosixPath,
    key:str ="/powerdata",
    key_metadata=None,
    sep:str=";",
    add_task_id=True
) -> pd.DataFrame:
    "Reads a number of CSV or HDF5 files depending on file ending."
    
    files = listify(files)
    dfs=L()
    for task_id,file in enumerate(files):
        if isinstance(file, str):
            file = str_to_path(file)

        if file.suffix == ".h5":
            df = read_hdf(file, key, key_metadata=key_metadata)
        elif file.suffix == ".csv":
            df = read_csv(file, sep=";")
        else:
            raise f"File ending of file {file} not supported."
        if add_task_id:df["TaskID"]=task_id
        dfs += df
        
    return dfs

In [None]:
df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},
                  index=['a', 'b', 'c'])
df.to_hdf('data.h5', key='df', mode='w')
test_eq(df, read_files("data.h5", key="df", add_task_id=False)[0])

df = pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},)
df.to_csv('data.csv', sep=";")
test_eq(df, read_files("data.csv", add_task_id=False)[0])

In [None]:
#export
# this is merely a class to differentiate between fastai processing and renewbale pre-processing functionality
class RenewablesTabularProc(TabularProc):
    pass

In [None]:
class TestProc(RenewablesTabularProc): pass
test_eq(isinstance(TestProc(), RenewablesTabularProc), True)

In [None]:
#export
class CreateTimeStampIndex(RenewablesTabularProc):
    order=0
    def __init__(self, col_name, offset_correction=None):
        self.col_name = col_name
        self.offset_correction = offset_correction
        
    def encodes(self, to):
        df = to.items

        
        if self.col_name in df.columns:
            df.reset_index(drop=True, inplace=True)
            df.rename({self.col_name: "TimeUTC"}, axis=1, inplace=True)
            #  in case the timestamp is index give it a proper timestamp,e.g., in GermanSolarFarm dataset
            if "0000-" in df.TimeUTC[0]:
                df.TimeUTC = df.TimeUTC.apply(
                    lambda x: x.replace("0000-", "2015-").replace("0001-", "2016-")
                )
            df.TimeUTC = pd.to_datetime(df.TimeUTC, infer_datetime_format=True, utc=True)
            df.set_index("TimeUTC", inplace=True)
            df.index = df.index.rename("TimeUTC")
            
            #  for GermanSolarFarm, the index is not corret. Should have a three hour resolution but is one...
            if self.offset_correction is not None:
                i, new_index = 0, []
                for cur_index in df.index:
                    new_index.append(cur_index + pd.DateOffset(hours=i))
                    i += self.offset_correction
                df.index = new_index
        elif to.items.index.name == self.col_name:
            warnings.warn(f"Timetamps column {self.col_name} already set as index.")
        else:  warnings.warn(f"Timetamps column {self.col_name} not in columns {df.columns}")

In [None]:
type(CreateTimeStampIndex(col_name="TimeStamps"))

__main__.CreateTimeStampIndex

In [None]:
def get_test_data(index=None):
    df = pd.DataFrame(index=range(0,5), columns = ['A', 'B', 'C'] ).fillna(0)
    if index is not None: df["TimeStamps"] = index
    return TabularPandas(df)
# tests basic functionality to set a proper timestamp based index
index = ['2015-01-01-01', '2015-01-01-02', '2015-01-02-03', '2015-02-01-23', '2015-02-01-13'] 
to = get_test_data(index)
test_eq(CreateTimeStampIndex(col_name="TimeStamps")(to).items.index, pd.to_datetime(index, utc=True))

# corrects missing year
index_missing_year = ['0000-01-01-01', '0000-01-01-02', '0000-01-02-03', '0000-02-01-23', '0000-02-01-13'] 
to = get_test_data(index_missing_year)
test_eq(CreateTimeStampIndex(col_name="TimeStamps")(to).items.index, pd.to_datetime(index, utc=True))

# check if warning is triggered, due to wrong column name
to = get_test_data(index)
test_call = lambda: CreateTimeStampIndex(col_name="FalseColumnName")(to)
test_warns(test_call)

In [None]:
#export
class AddSeasonalFeatures(RenewablesTabularProc):
    order=0
    def encodes(self, to):
        to.items["Month"] = to.items.index.month
        to.items["Day"] = to.items.index.day
        to.items["Hour"] = to.items.index.hour

In [None]:
to = get_test_data(index=['2018-01-01-01', '2018-01-01-02', '2018-01-02-03', '2018-02-01-23', '2018-02-01-13'] )
CreateTimeStampIndex("TimeStamps")(to)
AddSeasonalFeatures()(to)
test_eq(np.array([1,1,1,2,2]), to.items.Month.values)
test_eq(np.array([1,1,2,1,1]), to.items.Day.values)
test_eq(np.array([1,2,3,23,13]), to.items.Hour.values)

In [None]:
#export
class FilterYear(RenewablesTabularProc):
    "Filter a list of years. By default the years are dropped."
    order = 10
    def __init__(self, year, drop=True):
        "year(s) to filter, whether to drop or keep the years."
        year = listify(year)
        self.year = L(int(y) for y in year)
        self.drop = drop
        
    def encodes(self, to): 
        mask = None
        for y in self.year:
            cur_mask = to.items.index.year == y              
            if mask is None: mask = cur_mask
            else: mask = mask | cur_mask
                
        if not self.drop: mask = ~mask
        to.items.drop(to.items[mask].index, inplace=True)

In [None]:
def test_data_filter_year():
    index = ['2018-01-01-01', '2019-01-01-02', '2020-01-02-03',] 
    return TabularPandas(pd.DataFrame(index=pd.to_datetime(index),
                      columns = ['A', 'B', 'C'] ).fillna(0))
    
to = test_data_filter_year()
FilterYear(year=2018)(to)
test_eq(np.array([2019,2020]), to.items.index.year)
to = test_data_filter_year()
FilterYear(year=2020, drop=False)(to)
test_eq(np.array([2020]), to.items.index.year)
to = test_data_filter_year()
FilterYear(year=[2018,2020], drop=True)(to)
test_eq(np.array([2019]), to.items.index.year)
to = test_data_filter_year()
FilterYear(year=[2018,2020], drop=False)(to)
test_eq(np.array([2018,2020]), to.items.index.year)

In [None]:
#export
class DropCols(RenewablesTabularProc):
    "Drops rows by column name."
    order = 10
    def __init__(self, cols):
        self.cols = listify(cols)
        
    def encodes(self, to): 
        to.items.drop(self.cols, axis=1, inplace=True, errors="ignore")

In [None]:
to = get_test_data()
DropCols(None)(to)
test_eq(to.items.columns, ["A", "B", "C"])
to = get_test_data()
DropCols([])(to)
test_eq(to.items.columns, ["A", "B", "C"])
to = get_test_data()
DropCols(["C"])(to)
test_eq(to.items.columns, ["A", "B"])
to = get_test_data()
DropCols(["A", "B"])(to)
test_eq(to.items.columns, ["C"])
to = get_test_data()

In [None]:
#export
class FilterByCol(TabularProc):
    "Drops rows by column."
    order = 0
    def __init__(self, col_name, drop=True, drop_col_after_filter=True):
        self.col_name = col_name
        self.drop = drop
        self.drop_col_after_filter=drop_col_after_filter
        
    def encodes(self, to): 
        mask = to.items[self.col_name].astype(bool).values
        if not self.drop: mask = ~mask
        to.items.drop(to.items[mask].index, inplace=True)
        if self.drop_col_after_filter: to.items.drop(self.col_name, axis=1, inplace=True, errors="ignore")


In [None]:
to = get_test_data()
to.loc[:,"C"] = [0,0,1,1,0]
FilterByCol(col_name="C", drop_col_after_filter=True, drop=True)(to)
test_eq(list(to.items.index),[0,1,4])
test_eq(to.items.columns,["A","B"])

to = get_test_data()
to.loc[:,"C"] = [0,0,1,1,0]
FilterByCol(col_name="C", drop_col_after_filter=False, drop=False)(to)
test_eq(list(to.items.index),[2,3])
test_eq(to.items.columns,["A","B", "C"])

In [None]:
#export
class FilterMonths(TabularProc):
    "Filter dataframe for specific months."
    order = 10
    def __init__(self, months=range(1,13), drop=False):
        self.months = listify(months)
        self.drop = drop
        
    def encodes(self, to): 
        mask = to.items.index.month.isin(self.months)
        if not self.drop: mask = ~mask
        to.items.drop(to.items[mask].index, inplace=True)

In [None]:
def get_test_data_filter_month():
    to = get_test_data(index=['2018-01-01-01', '2018-02-01-02', '2018-03-02-03', '2018-04-01-23', '2018-05-01-13'])
    CreateTimeStampIndex("TimeStamps")(to)
    return to

def test_filter_month(months,drop,expected_result):
    to = get_test_data_filter_month()
    FilterMonths(months,drop)(to)
    test_eq(to.items.index.month, expected_result)
    
test_filter_month([1,2], False, [1,2])
test_filter_month(range(1,3), False, [1,2])
test_filter_month([1], False, [1])
test_filter_month([1,2], True, [3,4,5])

In [None]:
# export
class TabularRenewables(TabularPandas):
    def __init__(self, dfs, procs=None, cat_names=None, cont_names=None, do_setup=True, reduce_memory=False,
                 y_names=None, add_y_to_x=False, add_x_to_y=False, pre_process=None, device=None, splits=None, y_block=RegressionBlock()):
        
        self.pre_process = pre_process
        if pre_process is not None:
            self.prepared_to = TabularPandas(dfs, y_names=y_names, procs=pre_process, cont_names=cont_names,
                                          do_setup=True, reduce_memory=False)
            prepared_df = self.prepared_to.items
        else:
            prepared_df = dfs
        if splits is not None: splits = splits(range_of(prepared_df))    
        super().__init__(prepared_df, 
            procs=procs,
            cat_names=cat_names,
            cont_names=cont_names,
            y_names=y_names,
            splits=splits,
            do_setup=do_setup,
            inplace=True,
            y_block=y_block, 
            reduce_memory=reduce_memory)
     
    def new(self, df, pre_process=None, splits=None):
        return type(self)(df, do_setup=False, reduce_memory=False, y_block=TransformBlock(), 
                          pre_process=pre_process, splits=splits,
                          **attrdict(self, 'procs','cat_names','cont_names','y_names', 'device'))
    
    def show(self, max_n=10, **kwargs): 
        to_tmp = self.new(self.all_cols[:max_n])
        to_tmp.items["TaskID"] = self.items.TaskID[:max_n]
        display_df(to_tmp.decode().items)


To assure that we can de-normlize each task we assure that the task id is always stored.

In [None]:
#export

class ReadTabBatchRenewables(ItemTransform):
    "Transform `TabularPandas` values into a `Tensor` with the ability to decode"
    def __init__(self, to): self.to = to.new_empty()

    def encodes(self, to):
        self.task_ids = to.items[["TaskID"]]
        if not to.with_cont: res = (tensor(to.cats).long(),)
        else: res = (tensor(to.cats).long(),tensor(to.conts).float())
        ys = [n for n in to.y_names if n in to.items.columns]
        if len(ys) == len(to.y_names): res = res + (tensor(to.targ),)
        if to.device is not None: res = to_device(res, to.device)
        return res

    def decodes(self, o):
        
        o = [_maybe_expand(o_) for o_ in to_np(o) if o_.size != 0]
        vals = np.concatenate(o, axis=1)
        try: df = pd.DataFrame(vals, columns=self.to.all_col_names)
        except: df = pd.DataFrame(vals, columns=self.to.x_names)
        
        to = self.to.new(df)
        to.items["TaskID"]=self.task_ids.values

        return to


In [None]:
# @typedispatch
# def show_batch(x: Tabular, y, its, max_n=10, ctxs=None):
#     x.show()

In [None]:
@delegates()
class TabDataLoaderRenewables(TfmdDL):
    "A transformed `DataLoader` for Tabular data"
    def __init__(self, dataset, bs=16, shuffle=False, after_batch=None, num_workers=0, **kwargs):
        if after_batch is None: after_batch = L(TransformBlock().batch_tfms)+ReadTabBatchRenewables(dataset)
        super().__init__(dataset, bs=bs, shuffle=shuffle, after_batch=after_batch, num_workers=num_workers, **kwargs)

    def create_batch(self, b): return self.dataset.iloc[b]
    def do_item(self, s):      return 0 if s is None else s

TabularRenewables._dl_type = TabDataLoaderRenewables

In [None]:
#export
class NormalizePerTask(TabularProc):
    "Normalize per TaskId"
    order = 1
    def __init__(self, task_id_col="TaskID"):
        self.task_id_col = task_id_col
    def setups(self, to:Tabular):
        self.means = getattr(to, 'train', to)[to.cont_names + "TaskID"].groupby("TaskID").mean()
        self.stds = getattr(to, 'train', to)[to.cont_names + "TaskID"].groupby("TaskID").std(ddof=0)+1e-7


    def encodes(self, to):
        for task_id in to.items[self.task_id_col].unique():
            # in case this is a new task, we update the means and stds
            if task_id not in self.means.index:
                mu = getattr(to, 'train', to)[to.cont_names + "TaskID"].groupby("TaskID").mean()
 
                self.means= self.means.append(mu)
                self.stds = self.stds.append(getattr(to, 'train', to)[to.cont_names + "TaskID"].groupby("TaskID").std(ddof=0)+1e-7)
             
                
            mask = to.loc[:,self.task_id_col] == task_id
                   
            to.loc[mask, to.cont_names] = ((to.conts[mask] - self.means.loc[task_id]) / self.stds.loc[task_id])
            
    def decodes(self, to): 
        for task_id in to.items[self.task_id_col].unique():
            # in case this is a new task, we update the means and stds
            if task_id not in self.means.index:
                warnings.warn("Missing task id, could not decode.")
                
            mask = to.loc[:,self.task_id_col] == task_id
                   
            to.loc[mask, to.cont_names] = to.conts[mask] * self.stds.loc[task_id] + self.means.loc[task_id]
        return to

In [None]:
def get_test_data_task_normalization(index=None, procs=NormalizePerTask):
    df = pd.DataFrame(index=range(1,11), columns = ['A', 'B', 'C'] , 
                      data=np.array([list(range(1,11)), list(range(11,21)), list(range(21,31))]).T)
    if index is not None: df["TimeStamps"] = index
    df["TaskID"] = L(1 if i <= 5  else 2 for i in range(1,11))
    index = ['2015-01-01-01', '2015-01-01-02', '2015-01-02-03', '2015-02-01-23', '2015-02-01-13',
        '2016-01-01-01', '2016-01-01-02', '2016-06-02-03', '2016-02-01-23', '2016-02-01-13'] 
    df["TimeStamps"] = index
    to = TabularRenewables(df, pre_process=CreateTimeStampIndex(col_name="TimeStamps"), 
                           procs=[NormalizePerTask], cont_names=["A", "B"] , y_names="C", 
#                            cat_names=["TaskID"]
                          )
    df["TimeStamps"] = pd.to_datetime(index, utc=True)
    df.set_index("TimeStamps",inplace=True)
    return df,to


In [None]:
original_df, to = get_test_data_task_normalization()
test_eq(original_df.astype(float), to.decode().items.astype(float))
test_eq(np.array([[3,13],[8,18]]), to.normalize_per_task.means.values)
test_close(np.array([[1.41421366, 1.41421366],[1.41421366, 1.41421366]]), to.normalize_per_task.stds.values)

Lets create a dataloader and show a single batch.

In [None]:
dl = to.dataloaders(bs=4)

In [None]:
dl.show_batch()

Unnamed: 0,A,B,C,TaskID
0,7.242641,31.384778,23.0,1
1,10.071068,34.213205,25.0,1
2,17.899496,42.041632,27.0,2
3,16.485282,40.627419,26.0,2


The following gives an example on how to add a new task, that is normalized based on the first year. E.g. when the features are numerical weather predictions. As those are themselves forecasts, we can always extract the past and use the data for standardization.

In [None]:
original_df, to = get_test_data_task_normalization()
original_df["TaskID"] = 3

In [None]:
pd.options.mode.chained_assignment=None
# setups task normalization
to_new = to.new(original_df, pre_process=FilterYear(2016, drop=True))
to_new.process()
to_new.items.describe()

Unnamed: 0,A,B,C,TaskID
count,5.0,5.0,5.0,5.0
mean,-4.4408920000000007e-17,-4.4408920000000007e-17,23.0,3.0
std,1.118034,1.118034,1.581139,0.0
min,-1.414213,-1.414213,21.0,3.0
25%,-0.7071067,-0.7071067,22.0,3.0
50%,0.0,0.0,23.0,3.0
75%,0.7071067,0.7071067,24.0,3.0
max,1.414213,1.414213,25.0,3.0


In [None]:
to_new.items

Unnamed: 0_level_0,A,B,C,TaskID
TimeStamps,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2015-01-01 01:00:00+00:00,-1.414213,-1.414213,21,3
2015-01-01 02:00:00+00:00,-0.707107,-0.707107,22,3
2015-01-02 03:00:00+00:00,0.0,0.0,23,3
2015-02-01 23:00:00+00:00,0.707107,0.707107,24,3
2015-02-01 13:00:00+00:00,1.414213,1.414213,25,3


Normalization based on second year

In [None]:
to_new = to.new(original_df, pre_process=FilterYear(2016, drop=False))
to_new.process()

As the data has larger values in the second year, the normalization is quite off.

In [None]:
to_new.items

Unnamed: 0_level_0,A,B,C,TaskID
TimeStamps,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2016-01-01 01:00:00+00:00,2.12132,2.12132,26,3
2016-01-01 02:00:00+00:00,2.828427,2.828427,27,3
2016-06-02 03:00:00+00:00,3.535534,3.535534,28,3
2016-02-01 23:00:00+00:00,4.24264,4.24264,29,3
2016-02-01 13:00:00+00:00,4.949747,4.949747,30,3


This can also be seen in the summary:

In [None]:
to_new.items.describe()

Unnamed: 0,A,B,C,TaskID
count,5.0,5.0,5.0,5.0
mean,3.535534,3.535534,28.0,3.0
std,1.118034,1.118034,1.581139,0.0
min,2.12132,2.12132,26.0,3.0
25%,2.828427,2.828427,27.0,3.0
50%,3.535534,3.535534,28.0,3.0
75%,4.24264,4.24264,29.0,3.0
max,4.949747,4.949747,30.0,3.0


Let's check a dataloader and verify if we can display the data.

In [None]:
to_new.dataloaders(bs=4).show_batch()

Unnamed: 0,A,B,C,TaskID
0,6.0,16.0,26.0,3
1,8.0,18.0,28.0,3
2,9.0,19.0,29.0,3
3,7.0,17.0,27.0,3


In [None]:
original_df

Unnamed: 0_level_0,A,B,C,TaskID
TimeStamps,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2015-01-01 01:00:00+00:00,1,11,21,3
2015-01-01 02:00:00+00:00,2,12,22,3
2015-01-02 03:00:00+00:00,3,13,23,3
2015-02-01 23:00:00+00:00,4,14,24,3
2015-02-01 13:00:00+00:00,5,15,25,3
2016-01-01 01:00:00+00:00,6,16,26,3
2016-01-01 02:00:00+00:00,7,17,27,3
2016-06-02 03:00:00+00:00,8,18,28,3
2016-02-01 23:00:00+00:00,9,19,29,3
2016-02-01 13:00:00+00:00,10,20,30,3


In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_core.ipynb.
Converted 01_data.ipynb.
Converted 02_model.ipynb.
Converted index.ipynb.
