In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(123)
from statsmodels.tsa.arima.model import ARIMA

# cerate a dummy timeseries
shape=(120,3)
freq='D'
data = pd.DataFrame(np.random.randn(*shape)*np.random.randint(1,1e6,size=(shape[0],1)),
                           index=pd.date_range(
                               end=pd.Timestamp.today().date(),
                               periods=shape[0], freq=freq), 
                           columns=[f'T_{c}' for c in range(shape[1])])
skinny = data.reset_index().melt(id_vars='index')
skin_grpd = skinny.groupby(['variable', pd.Grouper(key='index', freq=freq)]).sum()

groups=list({g[:-1] for g in skin_grpd.index})
ts_dict = {g:skin_grpd.loc[g]['value'].astype('int') for g in groups}
groups

[('T_1',), ('T_2',), ('T_0',)]

## `ARIMA` from statsmodels

In [2]:
class TimeSeriesSplit:
    """To split timeseries into train and validation index
    """
    def __init__(self, ts_len, pred_steps, n_windows, gap=0):
        self.n = ts_len-3
        self.steps = pred_steps
        self.n_w = n_windows
        self.gap = gap
        
    def split(self):
        return[(np.arange(self.n-n_w).tolist(), 
                np.arange(self.n-n_w-self.steps-self.gap).tolist(), 
                np.arange(self.n-n_w-self.steps, self.n-n_w).tolist()) for n_w in range(self.n_w)]
    

def mape(y_true, y_pred, tol=1e-6):
    return np.abs(y_pred-y_true)/(np.abs(y_true)+tol)

# make a test case
# TimeSeriesSplit(10,2,3).split()

from functools import *
from ray import tune
import ray
from ray.tune.schedulers import ASHAScheduler, AsyncHyperBandScheduler
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.suggest import ConcurrencyLimiter
from ray.tune.suggest.skopt import SkOptSearch
from ray.tune import JupyterNotebookReporter
from ray.tune import Analysis, ExperimentAnalysis

fit_params = {
    'p': tune.randint(0,3),
    'd': tune.randint(0,3),
    'q': tune.randint(0,3),
    'P': tune.randint(0,3),
    'D': tune.randint(0,3),
    'Q': tune.randint(0,3),
    's': tune.randint(0,12),
    't_1': tune.randint(0,1),
    't_2': tune.randint(0,1),
    't_3': tune.randint(0,1),
    'enforce_stationarity': tune.choice([True, False]),
    'enforce_invertibility': tune.choice([True, False]),
    'concentrate_scale': tune.choice([True, False]),
    }

In [3]:
@ray.remote
class UniVarTs():
    verbose=2
    
    def __init__(self, ts, index, params,
                 pred_steps, n_windows,
                 error_func,trials=3, local_dir='./results'):
        self.ts=ts
        self.index='_'.join(map(str,index))
        self.local_dir=local_dir
        self.pred_steps=pred_steps
        self.n_windows=n_windows
        self.error_func=error_func
        self.params=params
        self.reporter=JupyterNotebookReporter(overwrite=True)
        self.set_algo() # TODO : change based on algo choosen
        self.trials=trials
        
    @staticmethod
    def trainable(config, 
                  ts=None, 
                  pred_steps=None, 
                  n_windows=None, 
                  error_func=None):

        tscv=TimeSeriesSplit(len(ts), pred_steps, n_windows)

        p, d, q = config['p'], config['d'],config['q']
        P, D, Q, s = config['P'],config['D'],config['Q'], config['s']
        t1, t2, t3 = config['t_1'],config['t_2'],config['t_3']

        error = 0
        for _, train_idx, test_idx in tscv.split():
            X_train, X_val = ts[train_idx], ts[test_idx]

            fit_obj = ARIMA(X_train, 
                            order=(p,d,q), 
                            seasonal_order=(P,D,Q,s), 
                            trend=[t1,t2,t3], 
                            enforce_stationarity=config['enforce_stationarity'],
                            enforce_invertibility=config['enforce_invertibility'],
                            concentrate_scale=config['concentrate_scale']).fit()

            forecast = fit_obj.forecast(steps=tscv.steps)
            error += error_func(X_val[-1], forecast[-1])
        tune.report(mape=error)
        
    def set_algo(self, concurent=1):
        algo = SkOptSearch(metric="mape", mode="min")
        self.algo = ConcurrencyLimiter(algo, max_concurrent=concurent)
        
    def tune(self):
        self.analysis = tune.run(
            tune.with_parameters(self.trainable, ts=self.ts, pred_steps=self.pred_steps, 
                                 error_func=self.error_func, n_windows=self.n_windows),
            metric="mape",
            mode="min",
            name=self.index,
            search_alg=self.algo,
            num_samples=self.trials, 
            local_dir=self.local_dir,
            raise_on_failed_trial=False,
            checkpoint_at_end=True,
            config=self.params,
            verbose=verbose,
            progress_reporter=self.reporter,
        )
    
    @ray.method(num_returns=1)       
    def get_analysis(self):
        return self.analysis
    
    @ray.method(num_returns=1) 
    def best_result(self):
        return self.analysis.best_result

## SkOpt

In [4]:
ray.init(ignore_reinit_error=True, local_mode=True, num_cpus=4)

2021-02-08 15:37:48,349	INFO services.py:1174 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '172.16.93.66',
 'raylet_ip_address': '172.16.93.66',
 'redis_address': '172.16.93.66:6379',
 'object_store_address': '/tmp/ray/session_2021-02-08_15-37-47_894569_24405/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-02-08_15-37-47_894569_24405/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-02-08_15-37-47_894569_24405',
 'metrics_export_port': 58030,
 'node_id': 'bd8816653dce1237faec3815f3eefc7ac7d154ff104b16b7e97475bd'}

In [5]:
# UniVarTs_ray = ray.remote(UniVarTs)
l_ = [UniVarTs.remote(ts,grp,fit_params,3,5,mape,trials=10) for grp,ts in ts_dict.items()]
l_

[Actor(UniVarTs,a67dc375e60ddd1a23bd3bb901000000),
 Actor(UniVarTs,63964fa4841d4a2ecb45751801000000),
 Actor(UniVarTs,69a6825d641b461327313d1c01000000)]

In [None]:
tune_list = [l.tune.remote() for l in l_]
tune_list

Trial name,status,loc,D,P,Q,concentrate_scale,d,enforce_invertibility,enforce_stationarity,p,q,s,t_1,t_2,t_3
_inner_9cd7f93a,RUNNING,,2,3,2,1,1,0,1,1,1,10,1,0,0




Result for _inner_9cd7f93a:
  date: 2021-02-08_15-38-08
  done: false
  experiment_id: 6b639dca08e54af2a3590c7c58601e9e
  hostname: ip-172-16-93-66
  iterations_since_restore: 1
  mape: 14.096666630783071
  node_ip: 172.16.93.66
  pid: 24405
  time_since_restore: 13.347153663635254
  time_this_iter_s: 13.347153663635254
  time_total_s: 13.347153663635254
  timestamp: 1612798688
  timesteps_since_restore: 0
  training_iteration: 1
  trial_id: 9cd7f93a
  
Result for _inner_9cd7f93a:
  date: 2021-02-08_15-38-08
  done: true
  experiment_id: 6b639dca08e54af2a3590c7c58601e9e
  experiment_tag: 1_D=2,P=3,Q=2,concentrate_scale=True,d=1,enforce_invertibility=False,enforce_stationarity=True,p=1,q=1,s=10,t_1=1,t_2=0,t_3=0
  hostname: ip-172-16-93-66
  iterations_since_restore: 1
  mape: 14.096666630783071
  node_ip: 172.16.93.66
  pid: 24405
  time_since_restore: 13.347153663635254
  time_this_iter_s: 13.347153663635254
  time_total_s: 13.347153663635254
  timestamp: 1612798688
  timesteps_since_

2021-02-08 15:38:08,916	ERROR function_runner.py:254 -- Runner Thread raised error.
Traceback (most recent call last):
  File "/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/ray/tune/function_runner.py", line 248, in run
    self._entrypoint()
  File "/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/ray/tune/function_runner.py", line 316, in entrypoint
    self._status_reporter.get_checkpoint())
  File "/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/ray/tune/function_runner.py", line 576, in _trainable_func
    output = fn()
  File "/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/ray/tune/function_runner.py", line 651, in _inner
    inner(config, checkpoint_dir=None)
  File "/home/ec2-user/anaconda3/envs/pytorch_p36/lib/python3.6/site-packages/ray/tune/function_runner.py", line 645, in inner
    fn(config, **fn_kwargs)
  File "<ipython-input-3-506cee2b51e9>", line 42, in trainable
    concentrate_s

Result for _inner_a4d591a6:
  {}
  




Result for _inner_a5385962:
  date: 2021-02-08_15-38-12
  done: false
  experiment_id: f02aaa3b81174a91bb9b6b5c7c2f5810
  hostname: ip-172-16-93-66
  iterations_since_restore: 1
  mape: 15.869465361519547
  node_ip: 172.16.93.66
  pid: 24405
  time_since_restore: 3.6487085819244385
  time_this_iter_s: 3.6487085819244385
  time_total_s: 3.6487085819244385
  timestamp: 1612798692
  timesteps_since_restore: 0
  training_iteration: 1
  trial_id: a5385962
  
Result for _inner_a5385962:
  date: 2021-02-08_15-38-12
  done: true
  experiment_id: f02aaa3b81174a91bb9b6b5c7c2f5810
  experiment_tag: 3_D=1,P=2,Q=0,concentrate_scale=True,d=2,enforce_invertibility=True,enforce_stationarity=True,p=3,q=0,s=7,t_1=1,t_2=1,t_3=0
  hostname: ip-172-16-93-66
  iterations_since_restore: 1
  mape: 15.869465361519547
  node_ip: 172.16.93.66
  pid: 24405
  time_since_restore: 3.6487085819244385
  time_this_iter_s: 3.6487085819244385
  time_total_s: 3.6487085819244385
  timestamp: 1612798692
  timesteps_since_re

  warn('Non-invertible starting MA parameters found.'
  warn('Non-invertible starting seasonal moving average'
2021-02-08 15:53:59,594	ERROR import_thread.py:89 -- ImportThread: Connection closed by server.


In [34]:
ray.wait(tune_list)

([ObjectRef(e11fe2800445c79a27c0dca5954f99af427c13c20100000001000000)],
 [ObjectRef(34c9c2094e42fdbfcc0320e00aa0584cd481d0b90100000001000000),
  ObjectRef(747754f46b61f47d42867781e3b6e074ed2613070100000001000000)])

In [None]:
analysis0 = Analysis("./results/T_2")

In [None]:
analysis0.dataframe().sort_values('mape').iloc[0,:]   #get_best_results(metric="mape", mode="min",)

In [None]:
eanalysis0 = ExperimentAnalysis("./results/T_0/experiment_state-2021-02-03_14-32-16.json")

In [None]:
eanalysis0.dataframe()

In [None]:
analysis_obj = ray.get(l_[2].get_analysis.remote())

In [None]:
analysis_obj

In [None]:
grp =groups[0]
t_s = UniVarTs.remote(ts_dict[grp],grp,fit_params,3,5,mape,trials=10)

In [None]:
t_s.tune.remote()

In [None]:
ts.tune()

In [None]:
# analysis_skOpt.results_df.sort_values('mean_accuracy', ascending=False).iloc[0,:]

In [None]:
ray.shutdown()