In [126]:
import pandas as pd
from statsmodels.tsa.statespace.varmax import VARMAX, VARMAXResults
from sklearn.preprocessing import StandardScaler

In [127]:
interpolated_weather_df = pd.read_csv('data/region/vietnam/interpolated_weather.csv', index_col=0, parse_dates=True)
interpolated_air_df = pd.read_csv('data/region/vietnam/interpolated_air.csv', index_col=0, parse_dates=True)
interpolated_air_df.drop(columns='aqi', inplace=True)

In [128]:
air_train, air_test = interpolated_air_df.loc[:'2023-12-31 23:00:00'], interpolated_air_df.loc['2024-01-01 00:00:00':]
weather_train, weather_test = interpolated_weather_df.loc[:'2023-12-31 23:00:00'], interpolated_weather_df.loc['2024-01-01 00:00:00':]

In [129]:
air_scaler = StandardScaler()
weather_scaler = StandardScaler()

In [130]:
air_normalized = air_scaler.fit_transform(air_train.iloc[:,1:].to_numpy())
weather_normalized = weather_scaler.fit_transform(weather_train.iloc[:,1:].to_numpy())

In [131]:
air_normalized = pd.DataFrame(air_normalized, columns=air_train.columns[1:], index=air_train.index)
weather_normalized = pd.DataFrame(weather_normalized, columns=weather_train.columns[1:], index=weather_train.index)

In [132]:
def _exog_gen(exog, partitions):
    """partitions exog data"""

    n_exog = exog.shape[0]
    n_part = np.ceil(n_exog / partitions)

    ii = 0
    while ii < n_exog:
        jj = int(min(ii + n_part, n_exog))
        yield exog.iloc[ii:jj, :]
        ii += int(n_part)


def _endog_gen(endog, partitions):
    """partitions endog data"""

    n_endog = endog.shape[0]
    n_part = np.ceil(n_endog / partitions)

    ii = 0
    while ii < n_endog:
        jj = int(min(ii + n_part, n_endog))
        yield endog.iloc[ii:jj]
        ii += int(n_part)

In [136]:
weather_normalized = pd.concat([air_train['province'] ,weather_normalized],axis=1)
air_normalized = pd.concat([air_train['province'] ,air_normalized],axis=1)

In [None]:
weather_by_province = weather_normalized.reset_index().sort_values(by=['province', 'time']).set_index('time')
air_by_province = air_normalized.reset_index().sort_values(by=['province', 'time']).set_index('time')

In [141]:
air_by_province.shape

(1707048, 7)

In [139]:
from statsmodels.base.distributed_estimation import DistributedModel
varmax_distributed = DistributedModel(partitions = 63, model_class = VARMAX, init_kwds = {'order':(0,2), 'trend':'ct'}, results_class=VARMAXResults)

varmax_fit = varmax_distributed.fit(
    zip(_endog_gen(air_by_province.iloc[:,1:], 63), _exog_gen(weather_by_province.iloc[:,1:], 63)),
    fit_kwds={'maxiter':1, 'method':'lbfgs', "alpha": 0.2},
    parallel_method='joblib' 
)

[Parallel(n_jobs=63)]: Using backend LokyBackend with 63 concurrent workers.


KeyboardInterrupt: 

In [None]:
varmax_fit.save('varmax.pickle')