In [1]:
import logging
import multiprocessing as mp
import os
import random as rn
from tempfile import TemporaryDirectory
import warnings

warnings.filterwarnings("ignore")

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
import yfinance as yf
from IPython.display import display
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.preprocessing import Normalizer
from zoo import init_spark_on_local
from zoo.automl.config.recipe import BayesRecipe, LSTMGridRandomRecipe
from zoo.automl.regression.time_sequence_predictor import TimeSequencePredictor
from zoo.ray import RayContext

seed = 42
sns.set()

def reset_seed():
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    rn.seed(seed)
    tf.set_random_seed(seed)

Prepending /projappl/project_2003107/anaconda3/envs/analytics-zoo/lib/python3.6/site-packages/bigdl/share/conf/spark-bigdl.conf to sys.path
Adding /projappl/project_2003107/anaconda3/envs/analytics-zoo/lib/python3.6/site-packages/zoo/share/lib/analytics-zoo-bigdl_0.10.0-spark_2.4.3-0.8.1-jar-with-dependencies.jar to BIGDL_JARS
Prepending /projappl/project_2003107/anaconda3/envs/analytics-zoo/lib/python3.6/site-packages/zoo/share/conf/spark-analytics-zoo.conf to sys.path


In [None]:
# data = pd.read_csv("AirPassengers.csv")
# data = pd.read_csv("data/nyc_energy.csv")[:-81][:1000]
# data = pd.read_csv("data/nyc_taxi.csv")[:2000]
data = yf.download("GOOG").reset_index()[-1000:]
# data = pd.read_csv("data/temperature.csv")[1:-792]

print(data.columns)

# target = "#Passengers"
# dt, target = "timeStamp", "demand"
# target = "value"
dt, target = "Date", "Adj Close"
# target = "Tel Aviv District"
print(data.shape, data[target].isna().sum())

data[dt] = pd.to_datetime(data[dt])
data = data.set_index(dt)
display(data)
data = data[[target]]
data = data.resample("d").mean().interpolate()
print(data.shape, data[target].isna().sum())

plt.figure(figsize=(12, 5))
sns.lineplot(x=data.index, y=target, data=data)
plt.tight_layout()
plt.show()

In [None]:
x_train, x_test, y_train, y_test = train_test_split(data.index.values, data[target].values,
                                                    shuffle=False, test_size=.2)
# x_train = x_train.reshape(-1, 1)
# x_test = x_test.reshape(-1, 1)

# y_train_mean = np.mean(y_train)
# y_train_std = np.std(y_train)
# y_train = (y_train - y_train_mean) / y_train_std
# y_test = (y_test - y_train_mean) / y_train_std

df_train = pd.DataFrame({dt: x_train, target: y_train})
df_test = pd.DataFrame({dt: x_test, target: y_test})
display(df_train, df_test)

In [2]:
results_path = TemporaryDirectory(prefix="ray_results_", dir=os.environ["TMPDIR"]).name
tmp_path = TemporaryDirectory(prefix="ray_tmp_", dir=os.environ["TMPDIR"]).name

In [3]:
sc = init_spark_on_local(cores=mp.cpu_count())
ray_ctx = RayContext(sc=sc, extra_params={"local_dir": results_path, "temp-dir": tmp_path})
ray_ctx.init()
TimeSequencePredictor()

Current pyspark location is : /projappl/project_2003107/anaconda3/envs/analytics-zoo/lib/python3.6/site-packages/pyspark/__init__.py
Start to getOrCreate SparkContext


2020-08-20 12:23:30,193	INFO node.py:498 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2020-08-20_12-23-30_193073_152093/logs.


Successfully got a SparkContext


2020-08-20 12:23:30,379	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:11270 to respond...
2020-08-20 12:23:30,633	INFO services.py:409 -- Waiting for redis server at 127.0.0.1:10911 to respond...
2020-08-20 12:23:30,664	INFO services.py:806 -- Starting Redis shard with 10.0 GB max memory.
2020-08-20 12:23:30,799	INFO node.py:512 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2020-08-20_12-23-30_193073_152093/logs.
2020-08-20 12:23:30,806	INFO services.py:1446 -- Starting the Plasma object store with 20.0 GB memory using /dev/shm.


AssertionError: 

In [None]:
sc = init_spark_on_local(cores=mp.cpu_count(), spark_log_level="ERROR", redirect_spark_log=False)
ray_ctx = RayContext(sc=sc)
ray_ctx.init()
tsp = TimeSequencePredictor(dt_col=dt, target_col=target, future_seq_len=len(df_test))

In [None]:
reset_seed()

max_look_back = len(df_test)
pipeline = tsp.fit(df_train, resources_per_trial={"cpu": 4}, distributed=False,
                   recipe=BayesRecipe(num_samples=100, look_back=(2, max_look_back)))

In [None]:
df_pred = pipeline.predict(df_train)
y_pred = df_pred.iloc[-1][1:].values.astype(float)
df_pred

In [None]:
print(f"RMSE: {np.sqrt(mean_squared_error(y_test, y_pred))}")
print(f"R^2: {r2_score(y_test, y_pred)}")

df = pd.DataFrame({dt: df_test[dt].values, "test": y_test, "pred": y_pred})

plt.figure(figsize=(12, 5))
sns.lineplot(x=dt, y=target, hue="y", data=df.melt(dt, var_name="y", value_name=target))
plt.tight_layout()
plt.show()

In [None]:
ray_ctx.stop()