In [1]:
import math
import time
import json
import pandas as pd
import os
import numpy as np
import sqlite3
import boto3
import tensorflow as tf

In [35]:
from logfuncts import logger

FDIR = os.path.abspath(".")
DATABASE_PATH = os.path.join(FDIR, '../data.db')
OUTPUT_PATH = os.path.join(FDIR, '../html/dart.json')
RIVER_NAME = "dart"

NUM_STEPS = 80
NUM_LEVEL_UPDATES = 40
MIMIMUM_THRESHOLD = 0.7
MAXIMUM_THRESHOLD = 1.5

In [12]:
def load_dataframe_from_sql(river, limit=-1):
    """Load data from the database and return a pandas dataframe. 
    Limit param specifies number of rows returned. Default is to return all"""
    if limit > 0:
        logger.debug("loading df for river {river} from sql with row limit of {limit}".format(river=river, limit=limit))
    else:
        logger.debug("loading entire df for river {river} from sql".format(river=river))
    con = sqlite3.connect(DATABASE_PATH)
    cur = con.cursor()
    query = """
            SELECT timestamp, rain, level, forecast 
                from {river}
            ORDER BY timestamp DESC
            LIMIT {limit}
        """
    cur.execute(query.format(river=river, limit=limit))
    result = cur.fetchall()

    df = pd.DataFrame(result, columns=['timestamp', 'cum_rain', 'level', 'forecast'])
    # # Set index to timestamp column as object
    df.timestamp = pd.to_datetime(df.timestamp)
    df = df.set_index('timestamp')
    df = df.sort_index()

    return df

In [103]:

testing_mode = False

if testing_mode:
    current_time = pd.to_datetime("2019-04-05 11:30:00")
    df = load_dataframe_from_sql(river=RIVER_NAME, limit=-1)
    df = df[df.index > current_time - pd.Timedelta('2days')]
    df = df[df.index < current_time + pd.Timedelta('2days')]
    df.loc[(df.index > current_time - pd.Timedelta('1days')), "level"] = None
    df.loc[(df.index > current_time - pd.Timedelta('12hours')), "cum_rain"] = None
    
else:
    current_time = time.time()
    current_time = pd.to_datetime(current_time - (current_time % (15*60)), unit='s')
    df = load_dataframe_from_sql(river=RIVER_NAME, limit=130)


latest_level_update_time = max(df[df.level.notnull()].index)
latest_rain_time = max(df.index[df.cum_rain.notnull()])
latest_forecast_rain_time = max(df.index[df.forecast.notnull()])

# Fill in missing timestamps by reindexing
min_time = min(df.index)
max_time = max(df.index)
rng = pd.date_range(min_time, max_time + pd.Timedelta('2.5hours'), freq='15Min')
df = df.reindex(rng)

num_level_updates = df[df.index <= latest_level_update_time].shape[0]
num_rain_updates = df[df.index <= latest_rain_time].shape[0]
num_forecast_rain_updates = df[df.index <= latest_forecast_rain_time].shape[0]

print "latest_level_update_time:", latest_level_update_time
print "latest_rain_time:", latest_rain_time
print "latest_forecast_rain_time:", latest_forecast_rain_time

print "num_level_updates:", num_level_updates
print "num_rain_updates:", num_rain_updates
print "num_forecast_rain_updates:", num_forecast_rain_updates


2019-04-12 19:36:58,411 root         DEBUG    loading df for river dart from sql with row limit of 130


latest_level_update_time: 2019-04-12 16:00:00
latest_rain_time: 2019-04-12 17:00:00
latest_forecast_rain_time: 2019-04-13 21:00:00
num_level_updates: 118
num_rain_updates: 122
num_forecast_rain_updates: 234


In [104]:

# Remove rows after latest cum_rain value (no longer using forecast data) 
df = df[df.index <= latest_rain_time]

# Convert cumulative rain to actual rain
df['rain'] = df['cum_rain'].diff(periods=2)

# negative values from diff are when the rain value resets so we set equal to the cumulative value
df.loc[df['rain'] < 0, 'rain'] = df.loc[df['rain'] < 0, 'cum_rain']

df['model_rain'] = df["rain"]

# Interpolate model_rain

df['model_rain'] = df['model_rain'].interpolate()

input_df = pd.concat((
    df[df.index <=latest_level_update_timestamp].tail(NUM_LEVEL_UPDATES),
    df[df.index >latest_level_update_timestamp].tail(NUM_LEVEL_UPDATES)
))

x = input_df.model_rain.values         
y = input_df.level.fillna(0).values
timestamps = input_df.index.values

# need to padd out input arrays with zeros to get the correct shape
num_padding_steps = NUM_STEPS - x.shape[0] 
x = np.concatenate((x, np.zeros(num_padding_steps)))
y = np.concatenate((y, np.zeros(num_padding_steps)))   

update_vector = np.zeros(x.shape)
update_vector[0:NUM_LEVEL_UPDATES] = 1

x = np.column_stack([x, update_vector, update_vector*y])
y = np.column_stack([y])

model_name = "production_rnn"
path_to_model = os.path.join(FDIR, model_name)
predict_fn = tf.contrib.predictor.from_saved_model(path_to_model)
predict = predict_fn({"x":[x]})["predictions"]

# remove excess padding
predict = predict[:-num_padding_steps]
level = y[:-num_padding_steps,0]
rain = x[:-num_padding_steps,0]

# set levels after latest update and predicts before to None
level[num_level_updates:] = None
predict[:num_level_updates-1] = None


INFO:tensorflow:Restoring parameters from /Users/jconn/isthedartrunning/python/production_rnn/variables/variables


2019-04-12 19:36:59,091 tensorflow   INFO     Restoring parameters from /Users/jconn/isthedartrunning/python/production_rnn/variables/variables


In [105]:
predict_fn({"x":[x]})["predictions"].shape

(80,)

In [106]:
level

array([0.317, 0.317, 0.317, 0.317, 0.317, 0.317, 0.317, 0.317, 0.317,
       0.317, 0.317, 0.317, 0.317, 0.317, 0.317, 0.317, 0.318, 0.318,
       0.318, 0.318, 0.317, 0.318, 0.318, 0.317, 0.318, 0.317, 0.318,
       0.317, 0.317, 0.317, 0.317, 0.317, 0.316, 0.317, 0.317, 0.317,
       0.   , 0.   , 0.   , 0.   ])

In [107]:
rain

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0.])

In [109]:

# create output json
output_df = pd.DataFrame({"timestamp":timestamps, "rain":rain, "level":level, "predict": predict})

output_df = output_df.round({'level': 3, 'predict': 3, 'model_rain' : 1})
output_df = pd.DataFrame(output_df).replace({np.nan:None})


if latest_level_update_timestamp == current_time:
    current_level = output_df[output_df.timestamp == current_time]["level"].values[0]
else:
    try:
        current_level = output_df[output_df.timestamp == current_time]["predict"].values[0]
    except:
        current_level = None

logger.info('currenct level: ' + str(current_level))

if current_level is None:
    text = "?"
elif current_level > MAXIMUM_THRESHOLD:
    text = "THE DART IS MASSIVE"
elif current_level > MIMIMUM_THRESHOLD:
    text = 'YES'
elif output_df[output_df.timestamp > current_time]["predict"].max() > minimum_threshold:
    text = "THE DART WILL BE UP SHORTLY"
else:
    text = 'NO'    

logger.info("OUTPUT TEXT: " + text)

output_df.timestamp = [timestamp.value / 1000 for timestamp in output_df.timestamp.tolist()]
values = output_df.T.to_dict().values()

output = {}       
output['current_time'] = current_time.value / 1000
output['current_level'] = current_level
output['text'] = text
output['values'] = values



2019-04-12 19:37:35,776 root         INFO     currenct level: None
2019-04-12 19:37:35,778 root         INFO     OUTPUT TEXT: ?


In [15]:
with open("dart.json", 'w') as f:
    json.dump(output, f, indent=4)
