In [1]:
import joblib
import numpy as np
import pandas as pd
from azure.cosmosdb.table.tableservice import TableService
import creds
import datetime

# Load model
model_path = ('model_predlabel_6hwill_rain.pkl')
model = joblib.load(model_path)

# Build data for prediction
table_service = TableService(account_name=creds.ACCNAME, account_key=creds.KEY)

def get_df(table):
    now = datetime.datetime.now() - datetime.timedelta(days = 2, hours = 1)
    datefilter = "RowKey ge '" + str(now.timestamp()) + "'"
    weather_gen = table_service.query_entities(table, filter=datefilter)
    to_df = []
    for weather in weather_gen:
        to_df.append(weather)
    df = pd.DataFrame(to_df)
    return df

def get_and_clean_cur_weather():
    table = 'WeatherAPICurrent'
    df = get_df(table).drop(['PartitionKey','RowKey','Timestamp','etag','last_updated','wind_degree'], axis=1)
    cols = df.columns.drop(['condition','localtime','wind_dir'])
    df[cols] = df[cols].apply(pd.to_numeric)
    df = df.sort_values('localtime_epoch').reset_index(drop=True)
    df['pressure_mb'] = df['pressure_mb'].astype(int)
    df['uv'] = df['uv'].astype(int)
    cols = df.columns.tolist()
    cols = cols[8:10] + cols[6:8] + cols[:6] + cols[10:]
    df = df[cols]
    df['localtime'] = pd.to_datetime(df['localtime_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
#     df['last_updated'] = pd.to_datetime(df['last_updated_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
    df = df.drop(['localtime_epoch','last_updated_epoch'], axis=1)
#     df['year'] = df['localtime'].dt.year
#     df['month'] = df['localtime'].dt.month
#     df['day'] = df['localtime'].dt.day
#     df['hour'] = df['localtime'].dt.hour
#     df['minute'] = df['localtime'].dt.minute
#     df['minute'] = df['minute'].map(floor)
    return df

def get_and_clean_for_weather():
    table = 'WeatherAPIForecast'
    df = get_df(table).drop(['PartitionKey','RowKey','Timestamp','etag'], axis=1)
    df.dropna(inplace=True)
    cols = df.columns.drop(['localtime','D0_condition','D1_condition','D2_condition','D0_date','D1_date','D2_date'])
    df[cols] = df[cols].apply(pd.to_numeric)
    for col in cols:
        try:
            df[col] = df[col].astype(int)
        except:
            continue
    df = df.sort_values('localtime_epoch')
    cols = df.columns.tolist()
    cols = cols[-2:] + cols[:-2]
    df = df[cols]
    df = df.reset_index(drop=True)
    df['localtime'] = pd.to_datetime(df['localtime_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
    df['D0_date'] = pd.to_datetime(df['D0_date_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
    df['D1_date'] = pd.to_datetime(df['D1_date_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
    df['D2_date'] = pd.to_datetime(df['D2_date_epoch'], unit='s').dt.tz_localize('UTC').dt.tz_convert('Asia/Phnom_Penh')
    df = df.drop(['D0_date_epoch','D1_date_epoch','D2_date_epoch'], axis=1)
    
    keys = {}
    for day in df['D0_date'].unique():
        keys[day] = {}
        for D in ['D0_', 'D1_', 'D2_']:
            for column in [x for x in df.columns if D in x]:
                try:
                    if df[column].dtype in ['float64','int64','float32','int32']:
                        keys[day]['cur' + column + '_mean'] = df[df['D0_date'] == day][column].describe()[1]
                        keys[day]['cur' + column + '_std'] = df[df['D0_date'] == day][column].describe()[2]
                        keys[day]['cur' + column + '_states'] = len(df[df['D0_date'] == day][column].unique())
                    elif df[column].dtype in ['O', 'string']: # Sometimes this line doesnt work and gives a TypeError: data type 'string' not understood; not clear why it throws this error, hence the try/except
                        keys[day]['cur' + column + '_consensus'] = (df[df['D0_date'] == day][column].value_counts()[0] / df[df['D0_date'] == day][column].shape[0])
                        keys[day]['cur' + column + '_states'] = len(df[df['D0_date'] == day][column].unique())
                except:
                    if df[column].dtype in ['float64','int64','float32','int32']:
                        keys[day]['cur' + column + '_mean'] = df[df['D0_date'] == day][column].describe()[1]
                        keys[day]['cur' + column + '_std'] = df[df['D0_date'] == day][column].describe()[2]
                        keys[day]['cur' + column + '_states'] = len(df[df['D0_date'] == day][column].unique())
                    elif df[column].dtype in ['O']:
                        keys[day]['cur' + column + '_consensus'] = (df[df['D0_date'] == day][column].value_counts()[0] / df[df['D0_date'] == day][column].shape[0])
                        keys[day]['cur' + column + '_states'] = len(df[df['D0_date'] == day][column].unique())
    # get the days lined up in a dict

#     today_cols = [x for x in keys[df['D0_date'].unique()[0]].keys() if 'D0' in x]
    yesterday_cols = [x for x in keys[df['D0_date'].unique()[0]].keys() if 'D1' in x]
    daybefore_cols = [x for x in keys[df['D0_date'].unique()[0]].keys() if 'D2' in x]

    aligned_for1 = pd.DataFrame(columns=['D0_date'])
    aligned_for2 = pd.DataFrame(columns=yesterday_cols)
    aligned_for3 = pd.DataFrame(columns=daybefore_cols)
    aligned_for = pd.concat([aligned_for1, aligned_for2, aligned_for3])

    forecast_avg = pd.DataFrame.from_dict(keys, orient='index')

    for day in df['D0_date'].unique()[2:]:
        target_day = day
#         today = target_day
        yesterday = target_day - pd.DateOffset(1)
        daybefore = target_day - pd.DateOffset(2)

        temp = pd.Series({'D0_date': day})
        temp = temp.append(forecast_avg.loc[yesterday][yesterday_cols])
        temp = temp.append(forecast_avg.loc[daybefore][daybefore_cols])

        aligned_for = aligned_for.append(temp, ignore_index=True)
        
    del temp
    
    aligned_for['D0_date'] = pd.to_datetime(aligned_for['D0_date'])
        
    df = pd.merge(df, aligned_for, on='D0_date')

    return df

def get_all_clean_weather():
    left = get_and_clean_cur_weather()
#     print('%s rows in cur' % left.shape[0])
    right = get_and_clean_for_weather()
#     print('%s rows in for' % right.shape[0])
    df = pd.merge_asof(left, right, direction='nearest') # This will get the nearest forecast to the current weather, whether taking place before or after, without any time constraint
#     df = pd.merge_asof(left, right, direction='nearest', tolerance=pd.Timedelta('120s')) # This will get the nearest forecast to the current weather, whether taking place before or after, and within 2 minutes
#     df = pd.merge_asof(left, right, direction='backward') # This will get the latest forecast up to the current time, but not after, regardless of how long away

    # Running 'backward' creates a gaps of 20 minutes or longer between cur and for associations, as the forecast seems to take place after the current weather is posted to the API. therefore prefer to use nearest for matching, as we can run the model a few moments after the forecast comes in to make the prediction.
    
#     print('%s rows before drop' % df.shape[0])
#     df.dropna(inplace=True)
#     print('%s rows after drop' % df.shape[0])
    return df

def get_conditions_set():
    weather_gen = table_service.query_entities('WeatherAPICurrent', select='condition')
    to_df = []
    for weather in weather_gen:
        to_df.append(weather)
    curdf = pd.DataFrame(to_df).drop('etag', axis=1)
    weather_gen = table_service.query_entities('WeatherAPIForecast', select="D0_condition, D1_condition, D2_condition")
    to_df = []
    for weather in weather_gen:
        to_df.append(weather)
    fordf = pd.DataFrame(to_df).drop('etag', axis=1)
    conditions_set = set()
    for x in curdf.condition.unique():
        conditions_set.add(x)
    for col in ['D0_condition', 'D1_condition', 'D2_condition']:
        for x in fordf[col].unique():
            conditions_set.add(x)
    return conditions_set

def get_wind_dir_set():
    weather_gen = table_service.query_entities('WeatherAPICurrent', select='wind_dir')
    to_df = []
    for weather in weather_gen:
        to_df.append(weather)
    curdf = pd.DataFrame(to_df).drop('etag', axis=1)
    conditions_set = set()
    for x in curdf.wind_dir.unique():
        conditions_set.add(x)
    return conditions_set

def cleaned_data():
    df = get_all_clean_weather().dropna(subset=['localtime_epoch']).reset_index()
    
    df.loc[df['precip_mm'] > 0, 'is_raining'] = 1
    df['is_raining'] = df['is_raining'].fillna(0)
    
    drop_columns = ['index','localtime','localtime_epoch','D0_date','D1_date','D2_date']
    df = df.drop(drop_columns, axis=1)
    
    for col in ['condition','D0_condition', 'D1_condition', 'D2_condition']:
        for condition in get_conditions_set():
            new_condition = col + '_' + condition.replace(' ', '_') # To satisfy the Table Service requirement that column names do not have spaces
            df.loc[df[col] == condition, new_condition] = int(1)
            df[new_condition] = df[new_condition].fillna(0)
        df.drop(col, axis=1, inplace=True)

    for wind in get_wind_dir_set():
        new_wind_dir = 'wind_dir_' + wind
        df.loc[df['wind_dir'] == wind, new_wind_dir] = int(1)
        df[new_wind_dir] = df[new_wind_dir].fillna(0)
    df.drop('wind_dir', axis=1, inplace=True)
        
    print(df.shape)
    df.dropna(inplace=True)
    print(df.shape)
    return df



In [2]:
data = cleaned_data().iloc[-1]

# Get the input data as a numpy array
npdata = np.array(data).reshape(1, -1)
# Get a prediction from the model
predictions = model.predict(npdata)
# Return the predictions as any JSON serializable format
print(predictions)

(98, 208)
(98, 208)
[0.]


In [3]:
data['pred6hrain'] = predictions[0]

In [4]:
data_dict = dict(data)
for key in data_dict.keys():
    data_dict[key] = str(data_dict[key])
data_dict['PartitionKey'] = 'Predictions'
data_dict['RowKey'] = datetime.datetime.utcnow().date().strftime('%Y-%m-%d')
table_service.insert_entity('Predictions', data_dict)

'W/"datetime\'2020-07-17T01%3A51%3A11.89686Z\'"'

In [None]:
def dump(data):
    data['prediction_date'] = datetime.datetime.utcnow().date().strftime('%Y-%m-%d')
    dump = []
    dump.append('CREATE TABLE IF NOT EXISTS predictions (')
    for column in data.index:
        dump.append(' "{0}" REAL,'.format(column))
    dump.append(' PRIMARY KEY ("prediction_date")')
    dump.append(')')
    with open('predictions.txt', 'w') as f:
        for line in dump:
            f.write(line + '\n')
        f.write("""CLUSTERED INTO 4 SHARDS
WITH (
    "allocation.max_retries" = 5,
    "blocks.metadata" = false,
    "blocks.read" = false,
    "blocks.read_only" = false,
    "blocks.read_only_allow_delete" = false,
    "blocks.write" = false,
    column_policy = 'strict',
    "mapping.total_fields.limit" = 1000,
    max_ngram_diff = 1,
    max_shingle_diff = 3,
    number_of_replicas = '0-1',
    refresh_interval = 1000,
    "routing.allocation.enable" = 'all',
    "routing.allocation.total_shards_per_node" = -1,
    "translog.durability" = 'REQUEST',
    "translog.flush_threshold_size" = 536870912,
    "translog.sync_interval" = 5000,
    "unassigned.node_left.delayed_timeout" = 60000,
    "warmer.enabled" = true,
    "write.wait_for_active_shards" = 'ALL'
)
""")
    print('Remember to change integers to INT in the text file!')

In [None]:
dump(data)

In [None]:
datefilter = "RowKey ge '" + datetime.datetime.utcnow().date().strftime('%Y-%m-%d') + "'"
weather_gen = table_service.query_entities('Predictions', filter=datefilter)
to_df = []
for weather in weather_gen:
    to_df.append(weather)
df = pd.DataFrame(to_df)
df

In [None]:
df.iloc[-1].to_json()

In [None]:
# Explainability component

features = model.feature_importances_
temp = data.copy().reset_index()
temp['feature_importances'] = features
temp = temp.sort_values(by='feature_importances', ascending=False)
temp = temp.reset_index(drop=True)
temp.rename(columns={temp.columns[0]:'feature',temp.columns[1]:'value'})

In [None]:
datetime.datetime.utcnow().date().strftime('%Y-%m-%d')