In [None]:
import os
import datetime

import pandas as pd
import seaborn as sns
import tensorflow as tf

import matplotlib as mpl
import matplotlib.pyplot as plt

import numpy as np

from tqdm import tqdm

from confluent_kafka import avro, Consumer, KafkaError, KafkaException
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerde


In [None]:
# load the model first before we start filling GPU mem with other stuff
# NOTE! If running on notebook env make sure you don't have other kernels consuming GPU mem
saved_model = tf.keras.models.load_model('multi_lstm')

In [None]:
def wind_vector(self, velocity, max_velocity, direction):
    # Convert to radians.
    wd_rad = direction*np.pi / 180
    self['Wx'] = velocity*np.cos(wd_rad)
    self['Wy'] = velocity*np.sin(wd_rad)
    self['max Wx'] = max_velocity*np.cos(wd_rad)
    self['max Wy'] = max_velocity*np.sin(wd_rad)
pd.DataFrame.wind_vector = wind_vector

In [None]:
def tod_signal(self, date_time):
    day = 24*60*60
    year = (365.2425)*day
    timestamp_s = date_time.map(datetime.datetime.timestamp)
    self['Day sin'] = np.sin(timestamp_s * (2 * np.pi / day))
    self['Day cos'] = np.cos(timestamp_s * (2 * np.pi / day))
    self['Year sin'] = np.sin(timestamp_s * (2 * np.pi / year))
    self['Year cos'] = np.cos(timestamp_s * (2 * np.pi / year))
pd.DataFrame.tod_signal = tod_signal

In [None]:
schema_registry_config = {}
with open('../tms-secrets/schema_registry_uri') as f:
    schema_registry_config['url'] = f.read().rstrip('\n')
    
schema_registry = CachedSchemaRegistryClient(schema_registry_config)
avro_serde = AvroSerde(schema_registry)
deserialize_avro = avro_serde.decode_message

In [None]:
def create_client():    
    
    consumer_config = { "group.id": "jhub-mac",
                        "max.poll.interval.ms": 20000,
                        "session.timeout.ms": 10000,
                        "default.topic.config": {"auto.offset.reset": "earliest"},
                        "security.protocol": "SSL",
                        "ssl.ca.location": "../tms-secrets/processing/ca.pem",
                        "ssl.certificate.location": "../tms-secrets/processing/service.cert",
                        "ssl.key.location": "../tms-secrets/processing/service.key"
                       }
    with open('../tms-secrets/kafka_service_uri') as f:
        consumer_config['bootstrap.servers'] = f.read().rstrip('\n')
    
    return Consumer(consumer_config)  

In [None]:
dataset_dict = []
def consume_records():
    client = create_client()
    client.subscribe(["observations.weather.multivariate"])
    i = 0
    for i in tqdm(range(400000)):
        msg = client.poll(15)
        if msg is None: 
            continue
        
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                 (msg.topic(), msg.partition(), msg.offset()))
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            value = deserialize_avro(message=msg.value(), is_key=False)
            dataset_dict.append(value)
            pass
    client.close()

In [None]:
# input dateset from Kafka
consume_records()
kafka_df = pd.json_normalize(dataset_dict)
kafka_df['measuredTime'] = pd.to_datetime(kafka_df['measuredTime'] * 1000 * 1000)


In [None]:
# select features
columns = ['roadStationId', 'measuredTime', 'measurements.19', 'measurements.21', 'measurements.1', 'measurements.18', 'measurements.17', 'measurements.16']
kafka_df = kafka_df[columns]
kafka_df.index = kafka_df['measuredTime']
del kafka_df['measuredTime']
kafka_df.info()

In [None]:
# fill gaps
kafka_interpo = kafka_df.groupby('roadStationId').resample('600s').mean().interpolate()
del kafka_interpo['roadStationId']
len(kafka_interpo.index.unique(level='roadStationId'))
# drop weather stations that can't provide all needed features
kafka_interpo = kafka_interpo.dropna()
kafka_interpo = kafka_interpo.iloc[kafka_interpo.index.get_level_values(0) == 2052]

In [None]:
# create wind vectors from velocity and direction
kafka_interpo.wind_vector(kafka_interpo.pop('measurements.16'), kafka_interpo.pop('measurements.17'), kafka_interpo.pop('measurements.18'))

In [None]:
plt.hist2d(kafka_interpo['max Wx'], kafka_interpo['max Wy'], bins=(50, 50), vmax=10)
plt.colorbar()
plt.xlabel('Wind X [m/s]')
plt.ylabel('Wind Y [m/s]')
ax = plt.gca()
ax.axis('tight')

In [None]:
# calculate time of day signal from time index
kafka_interpo.tod_signal(kafka_interpo.index.get_level_values('measuredTime'))

In [None]:
column_names = pd.read_csv('predict/trainset_columns.csv', index_col=0)
kafka_interpo.columns = column_names['0'].values

In [None]:
# Normalize
train_mean = pd.read_pickle('predict/train_mean.pkl')
train_std = pd.read_pickle('predict/train_std.pkl')
kafka_norm = (kafka_interpo - train_mean) / train_std
kafka_norm.shape

In [None]:
kafka_norm.head(10)

In [None]:
inpslic = slice(-6*24,None)
inputdf = kafka_norm[inpslic]
data = np.array(inputdf, dtype=np.float32)
input = tf.keras.preprocessing.timeseries_dataset_from_array(
      data=data,
      targets=None,
      sequence_length=len(inputdf),
      sequence_stride=1,
      shuffle=True,
      batch_size=32,)
y = saved_model.predict(input);
#y = repeat_baseline.predict(input);

result = pd.DataFrame(y[0,:], columns=kafka_norm.columns)
result = train_std * result + train_mean

# model predicted next 24h
result['date'] = inputdf.index.get_level_values(1) + pd.Timedelta('1 day')
result = result.set_index('date').add_prefix('pred_')

all = pd.concat([result, kafka_interpo.droplevel('roadStationId')], axis=1)

In [None]:
mask = all.columns.str.contains('.*lämpötila.*|.*Suhteellinen.*')
all.sort_index().loc[:,mask].plot()
#all['Ilman lämpötila (degC)'].plot()