# Real-Time Forecast

This notebook has been designed to monitor a single feature at a time in real time. You can change the feature at any time by repeating the [Startup Sequence](#Startup-Squence) section.

## Initialize Environment

In [None]:
import sys
import os
import datetime
import pandas as pd
import holoviews as hv
from holoviews import opts
from holoviews.streams import Buffer
import hazelcast
from padogrid.bundle.hazelcast.data.PortableFactoryImpl import PortableFactoryImpl

# Disable CPU warning message
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Brokeh extension
hv.extension('bokeh')

# Setup StreamingDataFrame
df_observed = pd.DataFrame({'Time': [], 'Observed': []}, columns=['Time', 'Observed'] )
dfstream_observed = Buffer(df_observed, length=100, index=False)
df_forecast = pd.DataFrame({'Time': [], 'Forecast': []}, columns=['Time', 'Forecast'] )
dfstream_forecast = Buffer(df_forecast, length=100, index=False)
observed_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream_observed])
forecast_dmap = hv.DynamicMap(hv.Curve, streams=[dfstream_forecast])
prev_forecast_value = 0

def forecast_received(event):
    '''
    Receives map events from Hazelcast. The events contain ForecastValue objects that are
    readily plotted.
    '''
    global prev_forecast_value
    global dfstream_observed, dfstream_forecast

    forecast = event.value
    #print("forecast_received(): previous_forecast=%f, observed=%f, diff=%f" %
    #      (prev_forecast_value, forecast.observedValue, forecast.observedValue - prev_forecast_value))
    try:
        observed_date = datetime.datetime(*forecast.observedDate[:6]).date()
        forecast_date = datetime.datetime(*forecast.forecastDate[:6]).date()
    except:
        print("forecast_received() - Exception: ", sys.exc_info()[0])

    df_observed = pd.DataFrame([(observed_date, forecast.observedValue)], columns=['Time', 'Observed'])
    df_forecast = pd.DataFrame([(forecast_date, forecast.forecastValue)], columns=['Time', 'Forecast'])
    dfstream_observed.send(df_observed)
    dfstream_forecast.send(df_forecast)
    prev_forecast_value = forecast.forecastValue

## Submit Jobs to Hazelcast

### Terminal: Submit Job

For each simulated dataset, their respective job must be submitted.

![Terminal](https://raw.githubusercontent.com/wiki/padogrid/padogrid/images/terminal.png) Terminal

```bash
# Submit the default feature, "stock1-jitter"
cd_app ml_lstm
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar
```

#### Complete Job List

```bash
cd_app ml_lstm
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar -feature stock1-jitter
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar -feature stock1-no-jitter
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar -feature stock1-jitter-large
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar -feature stock2-jitter
hz-cli -t ml_jet@localhost:5701 submit target/ml-lstm-1.0.3.jar -feature stock2-no-jitter
```

### Terminal: Stream Data

The following publishes datasets for all the jobs listed above.

![Terminal](https://raw.githubusercontent.com/wiki/padogrid/padogrid/images/terminal.png) Terminal

```bash
cd_app simulator/bin_sh
./simulator -simulator-config ../etc/simulator-hazelcast-journal.yaml
```

## Connect to Hazelcast

In [None]:
# Connect to Hazelcast
cluster_name = "ml_jet"
grid_path = "forecast"
client = hazelcast.HazelcastClient(cluster_name=cluster_name,
                                    cluster_members=[
                                            "localhost:5701",
                                            "localhost:5702"
                                        ],
                                    lifecycle_listeners=[
                                        lambda state: print("Hazelcast Lifecycle: ", state),
                                    ],
                                    portable_factories=PortableFactoryImpl.factories())
listener_id = None

---

## Startup Sequence

You can monitor a different feature by repeating this section.

### 1. User Inputs

The `feature` variable below must be set to one of the Hazelcast job features submitted above.

In [None]:
feature = "stock1-jitter"

### 2. Register Hazelcast Entry Listener

In [None]:
# First, clear dataframe streams
dfstream_observed.clear()
dfstream_forecast.clear()

# Listen on the forecast map for forecasts generated via Jet
forecast_map = client.get_map(grid_path)
if listener_id != None:
    forecast_map.remove_entry_listener(listener_id)
listener_id = forecast_map.add_entry_listener(key=feature, include_value=True, added_func=forecast_received, updated_func=forecast_received).result()

print("--------------------------------------")
print("      cluster: %s" % (cluster_name))
print("          map: %s" % (forecast_map.name))
print("feature (key): %s" % (feature))
print("--------------------------------------")
print("The streaming chart will not show until the Hazelcast job generates")
print("forecasts into the map, '%s'." % (forecast_map.name))


### 3. Monitor Forecast

In [None]:
title = feature + " " + "LSTM Forecast"
(observed_dmap.relabel('Observed') * forecast_dmap.relabel('Forecast')).opts(
    opts.Curve(width=800, show_grid=True)
).opts(title=title, ylabel='Vaue', legend_position='top_left')

### 4. Clear DataFrame Streams

The following refreshes the streaming chart.

In [None]:
# Clear dataframe streams
dfstream_observed.clear()
dfstream_forecast.clear()

---

## Shutdown Hazelcast Client

The following shuts down the Hazelcast client instance. Hence, the streaming chart will stop receiving data. You can create a new instance by repeating the [Connect to Hazelcast](#connect-to-hazelcast) section.

In [None]:
# Remove Hazelcast entry listener
if listener_id != None:
    forecast_map.remove_entry_listener(listener_id)
listener_id = None

# Shutdown Hazelcast client
client.shutdown()