# Data Generator

Generate the data an add a timestamp

In [1]:
import os
import logging

import pandas as pd
import numpy as np

import h5py
import threading
from datetime import datetime, timedelta

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_score

DEP_FEATURE_NAME = 'surf_temp_sqerror'
HDF_LOCK = threading.Lock()
DATE_PATTERN = 'date%Y%m%d'
TIME_PATTERN = 'time%H%M'
KEY_PATTERN = '/{}/{}'.format(DATE_PATTERN, TIME_PATTERN)
STORE_NAME = 'metro_error_data_pro_swe2018.h5'

# Reduce log level
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='3'

### Environment Variables

In [2]:
# from horizon_handler
HORIZON_FEATURES = ['radiation', 'shade']

# from SMHI
FCST_FEATURES = ['AirTemp', 'DewPoint', 'RainPrecip', 'SnowPrecip', 'SnowPrecipAccumulated', 'WindSpeed',
                 'AirPressure', 'OctalCloudCoverage', 'PrecipAmount', 'station_id']
# from METRo
EBM_FEATURES = ['air_temp', 'dew_point', 'surf_temp', 'water_amount', 'snow_amount', 'sub_temp']


# setting the independent and dependent features
INDEPENDENT_FEATURES = ['forecast_length'] + HORIZON_FEATURES + FCST_FEATURES + EBM_FEATURES
DEPENDENT_FEATURES = ['stn_{}'.format(f) for f in ['dew_point', 'surf_temp']]
ERROR_FEATURES = ['surf_temp_sqerror']

### Data Readers

The functions used for extracting and generating the data

In [319]:

start_time = datetime(2018, 9, 29, 5)
end_time = datetime(2018, 10, 29, 6)
count = 0
# end_time = datetime(2019, 3, 14, 14)

def define_SSE(df, dep_feature_name):
    if dep_feature_name == 'surf_temp_sqerror':
        df[dep_feature_name] = (df['surf_temp'] - df['stn_surf_temp']) ** 2

def store_path(store_name):
    cwd = os.getcwd()
    return os.path.join(cwd, '{}.h5'.format(store_name))

def read(key, store_name=STORE_NAME):
    key = key.strftime(KEY_PATTERN)
    path = store_name
    with HDF_LOCK:
        with pd.HDFStore(path, 'r') as store:
            if key in store:
                try:
                    df = store.get(key)
                    for dep_feature_name in ERROR_FEATURES:
                        # ['surf_temp_sqerror']:
                        define_SSE(df, dep_feature_name=dep_feature_name)
                except Exception as e:
                    print(e)
                    df = None
                if df is None:
                    return pd.Series([])
                else:
                    return df
            else:
                return pd.Series([])

def gather_training_data(start_time, end_time):
    print('gathering training data from {} until {}'.format(start_time, end_time))
    ref_time = start_time
    m = ref_time.month
    times = []
    dfs = []
    while ref_time <= end_time:
        date = ref_time.strftime("%m/%d/%Y %H:%M:%S")
        val = read(ref_time)
        
        if (val.empty == False):
            times.append(date)
            data = pd.DataFrame(val, columns=val.columns)
            dfs.append(data)
        ref_time += timedelta(days=1)

        if (ref_time.month != m):
            m = ref_time.month
            print(m)
    #print(pd.Series(pd.to_datetime(times)))
    #dfs = pd.concat([pd.Series(pd.to_datetime(times)), pd.Series(dfs)], axis=1)
    #x = pd.DataFrame(dfs, columns=val.columns)
    dfs = pd.concat(dfs, keys=pd.to_datetime(times), names=['date'])
#    print(df.memory_usage(index=False).sum())
    return dfs

df = gather_training_data(start_time, end_time)

gathering training data from 2018-09-29 05:00:00 until 2018-10-29 06:00:00
10


In [327]:
df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,AirTemp,DewPoint,RainPrecip,SnowPrecip,SnowPrecipAccumulated,WindSpeed,AirPressure,OctalCloudCoverage,PrecipAmount,station_id,...,air_temp,dew_point,water_amount,snow_sum,forecast_length,stn_surf_temp,stn_dew_point,radiation,shade,surf_temp_sqerror
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
2018-09-29 05:00:00,0,10.9,8.824256,0.0,-0.0,0.0,4.8,102630.0,0.0,0.0,1202,...,8.1,7.7,0.02,0.0,1,7.8,7.96,257.708954,0.0,0.0784
2018-09-29 05:00:00,1,10.8,7.854865,0.0,-0.0,0.0,6.0,102650.0,0.0,0.0,1202,...,8.6,6.9,0.014,0.0,2,10.5,10.26,572.579224,0.0,1.690001
2018-09-29 05:00:00,2,11.4,6.937371,0.0,-0.0,0.0,6.4,102670.0,1.0,0.0,1202,...,9.7,6.2,0.002,0.0,3,13.7,10.2,713.93512,0.0,3.385601
2018-09-29 05:00:00,3,12.0,6.071365,0.0,-0.0,0.0,6.8,102670.0,3.0,0.0,1202,...,10.7,5.5,0.0,0.0,4,16.4,8.8,783.778503,0.0,4.368097
2018-09-29 05:00:00,4,12.3,6.139616,0.0,-0.0,0.0,7.1,102659.0,3.0,0.0,1202,...,11.3,5.7,0.0,0.0,5,18.9,8.12,817.249634,0.0,7.452898


In [616]:
included = np.array(df.loc[pd.IndexSlice[:], INDEPENDENT_FEATURES + ['surf_temp_sqerror']].notnull().all(axis=1))

rsi = {
    "data": df.loc[included, INDEPENDENT_FEATURES].values.astype(np.float), 
    "target": df.loc[included, 'surf_temp_sqerror'].values.astype(np.float)
}

rsi["data"].shape

(381610, 19)

In [617]:
def shuffle_split_data(X, y):
    arr_rand = np.random.rand(X.shape[0])
    split = arr_rand < np.percentile(arr_rand, 70)

    X_train = X[split]
    y_train = y[split]
    X_test =  X[~split]
    y_test = y[~split]

    print(len(X_train), len(y_train), len(X_test), len(y_test))
    return X_train, X_test, y_train, y_test

X_train, X_test, y_train, y_test = shuffle_split_data(rsi["data"], rsi["target"])

267127 267127 114483 114483


In [618]:
mean = X_train.mean(axis=0)    
X_train -= mean
std = X_train.std(axis=0)
X_train /= std

X_test -= mean
X_test /= std

In [704]:
def generator(data, lookback, delay, min_index, max_index,
              shuffle=False, batch_size=128, step=6):
    if max_index is None:
        max_index = len(df) - delay - 1
    i = min_index + lookback
    if shuffle:
        rows = np.random.randint(
            min_index + lookback, max_index, size=batch_size)
    else:
        if i + batch_size >= max_index:
            i = min_index + lookback
        rows = np.arange(i, min(i + batch_size, max_index))
        i += len(rows)

    samples = np.zeros((len(rows),
                       lookback // step,
                       data.shape[-1]))
    targets = np.zeros((len(rows), lookback // step,))
    for j, row in enumerate(rows):
        indices = range(rows[j] - lookback, rows[j], step)
        t = [y_train[indices]]
        samples[j] = data[indices]
        targets[j] = y_train[indices]
        
    return samples, targets

In [710]:
lookback = mx.max()
step = 1
delay = mx.max()
batch_size = len(mx.keys())
max_index = None
min_index = 0
shuffle = True

train_gen = generator(X_train,
                      lookback=lookback,
                      delay=delay,
                      min_index=0,
                      max_index=100000,
                      shuffle=True,
                      step=step, 
                      batch_size=batch_size)

samples, targets = train_gen

In [711]:
val_gen = generator(float_data,
                    lookback=lookback,
                    delay=delay,
                    min_index=100001,
                    max_index=200000,
                    step=step,
                    batch_size=batch_size)

In [712]:
test_gen = generator(float_data,
                     lookback=lookback,
                     delay=delay,
                     min_index=200001,
                     max_index=None,
                     step=step,
                     batch_size=batch_size)

In [713]:
# This is how many steps to draw from `val_gen`
# in order to see the whole validation set:
val_steps = (300000 - 200001 - lookback) // batch_size

# This is how many steps to draw from `test_gen`
# in order to see the whole test set:
test_steps = (len(float_data) - 300001 - lookback) // batch_size

print(val_steps, test_steps)

3177 2496


In [720]:
from keras.models import Sequential
from keras import layers
from keras.optimizers import RMSprop
print(targets.shape[-1])

model = Sequential()
model.add(layers.GRU(32, input_shape=(None, samples.shape[-1])))
model.add(layers.Dense(1))

model.compile(optimizer='rmsprop', loss='mse')
history = model.fit_generator(train_gen,
                              steps_per_epoch=500,
                              epochs=20,
                              validation_data=val_gen,
                              validation_steps=val_steps,
                              verbose=1)

14212


ValueError: Error when checking target: expected dense_13 to have shape (1,) but got array with shape (14212,)

In [721]:
# Evaluate on test data
result = model.evaluate_generator(test_gen)
print('MSE on test data: {}'.format(result[1]))
test_gen

ValueError: `steps=None` is only valid for a generator based on the `keras.utils.Sequence` class. Please specify `steps` or use the `keras.utils.Sequence` class.