# Cloud Distributed Time Series Analysis

### Load environment variables

In [None]:
from dotenv import load_dotenv
from pathlib import Path
import os

env_path = Path('..') / '.env'
load_dotenv(dotenv_path=env_path)

HOST_IP = os.getenv("HOST_IP")
HOST_PORT = os.getenv("HOST_PORT")
USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")
os.environ['CAS_CLIENT_SSL_CA_LIST'] = os.getenv("CAS_CLIENT_SSL_CA_LIST")

## Import the data

### Connect to SAS Viya

In [None]:
import swat

# Connect to CAS server
s = swat.CAS(HOST_IP, HOST_PORT, USERNAME,PASSWORD)

### Access and filter the table

In [None]:
# Access the data from the server
tbl = s.CASTable("M5_final",caslib='Public')

# display first rows
tbl.head()

## Local Analysis

In [None]:
import pandas as pd
import numpy as np

import time
from tqdm import tqdm

### Import the data locally

In [None]:
# Get the data as a pandas DataFrame
df = tbl.to_frame()

### Convert to time series

In [None]:
# convert date
df.date = pd.to_datetime(df.date)

# separate time series
start_time = time.time()
series = []
for idx, dfp in  df.groupby(["item_id"]):
    series.append(dfp.reset_index()[["date","Quantity"]].rename(columns={"date":"ds","Quantity":"y"}))

print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
len(series)

### Prophet model

In [None]:
from fbprophet import Prophet

def run_prophet(timeserie):
    model = Prophet()
    model.fit(timeserie)
    forecast = model.make_future_dataframe(periods=7)
    forecast = model.predict(forecast)
    return forecast

### Run Forecast locally

In [None]:
start_time = time.time()
result = list(map(lambda timeserie: run_prophet(timeserie), tqdm(series)))
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
start_time = time.time()
result = list(map(lambda timeserie: run_prophet(timeserie), tqdm(series)))
print("--- %s seconds ---" % (time.time() - start_time))

## Distributed Analysis

In [None]:
# Load needed action sets
s.loadactionset(actionset="timedata")

In [None]:
s.dropTable("outobj_pylog")
s.dropTable("outobj_pyvars")

### Code file

In [None]:
cmpcode = """
    declare object py(PYTHON3) ;
    rc1 = py.Initialize() ;
    
    rc2 = py.addVariable(Quantity, 'ALIAS', 'Y') ;
    rc3 = py.addVariable(date, 'ALIAS', 'DS') ;
    rc4 = py.AddVariable(PRED, "READONLY", "FALSE") ;
    rc5 = py.AddVariable(_LENGTH_, 'ALIAS', 'NFOR') ;
    rc6 = py.AddVariable(_LEAD_,'ALIAS','HORIZON') ;
    
    rc7 = py.PushCodeFile('/home/sahbic/python_prophet_code.py');
    
    rc14 = py.Run() ;
    pyExitCode = py.GetExitCode() ;
    pyRuntime = py.GetRunTime() ;

    declare object pylog(OUTEXTLOG) ;
    rc15 = pylog.Collect(py, 'EXECUTION') ;
    declare object pyvars(OUTEXTVARSTATUS) ;
    rc16 = pyvars.collect(py) ;
    
 """

### Time Series Parameters

In [None]:
forecast_lead = 7
data_interval = "Day" # INTERVAL= value for Timedata.RunTimecode action
series_params = dict(accumulate='SUM', name='Quantity') # SERIES parameters for Timedata.RunTimecode action

### Run Distributed Forecast

In [None]:
# Call the action
dname = lambda name: dict(name=name) # helper function to make the action call code more clear

# Define and call the timedata.runTimecode action
res = s.timedata.runtimecode(
      table={'name':"M5_final",'caslib':'Public',
             'groupby':[dname("item_id")]},
      series=[series_params],
      interval=data_interval,
      require=dict(pkg="extlang"),
      timeid=dict(name='date'),
      lead=forecast_lead,
      arrayout={'arrays':[dname("PRED")],
                        'table':dict(name="outarray", replace=True)},
      objout=[
                dict(table=dname("outobj_pylog"), objRef="pylog"),
                dict(table=dname("outobj_pyvars"), objRef="pyvars"),
                  ],
      code=cmpcode)

In [None]:
res.outinfo

### Print Python logs

In [None]:
outlog_tbl = s.CASTable("outobj_pylog")
loglen = sum(outlog_tbl["_LOGLEN_"].values)
if loglen > 0:
    text = "".join(outlog_tbl["_LOGTEXT_"].values)
    print("LOG:")
    print(text)
    print()

In [None]:
outvars_tbl = s.CASTable("outobj_pyvars")
outvars_tbl.UPDATED.value_counts()

In [None]:
outvars_tbl.head()

### Results

In [None]:
outarray_tbl = s.CASTable("OUTARRAY")
outarray_tbl.tail(15)

## Close session

In [None]:
s.terminate()