# Modules import

In [10]:
import numpy as np
import pandas as pd
import scipy.stats as sp

from statsmodels.tsa.seasonal import STL
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from tslearn.metrics import cdist_dtw

from sklearn.linear_model import LogisticRegression
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score, adjusted_rand_score
from sklearn.preprocessing import scale
import ruptures as rpt

import plotly.io as pio
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

import warnings, logging, swifter, ray
from tqdm import tqdm

ray.init(logging_level=logging.ERROR, ignore_reinit_error=True, 
         log_to_driver=False, configure_logging=False)
swifter.set_defaults(progress_bar=False, 
                     logging_level=logging.ERROR)
logging.getLogger("swifter").setLevel(logging.WARNING)
warnings.filterwarnings('ignore')

pd.set_option('mode.use_inf_as_na', True)
pd.options.mode.use_inf_as_na = True
px.defaults.width, px.defaults.height = 1000, 500
margin = dict(l=30, r=30, t=50, b=30)
# pio.renderers.default = 'svg'

seasonal_period, group, values = 7, ['store_nbr', 'family'], 'sales'
min_date, max_date = pd.to_datetime('2015-07-01'), pd.to_datetime('2017-08-31') # 2 full annual cycles

In [2]:
# !pip install -U ipywidgets

# Data preview & processing

In [3]:
# date period from 2013-01-01 to 2017-08-15
# 54 stores
# 33 product family per store
# no negative values

train = pd.read_csv('train.csv', engine='pyarrow', index_col='id', parse_dates=['date'])
train.head()

Unnamed: 0_level_0,date,store_nbr,family,sales,onpromotion
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,2013-01-01,1,AUTOMOTIVE,0.0,0
1,2013-01-01,1,BABY CARE,0.0,0
2,2013-01-01,1,BEAUTY,0.0,0
3,2013-01-01,1,BEVERAGES,0.0,0
4,2013-01-01,1,BOOKS,0.0,0


In [324]:
def changepoint_detection(data: pd.DataFrame, values=values, w=15) -> pd.Timestamp:
    models = {'m1': rpt.KernelCPD(kernel='linear', min_size=w),
              'm2': rpt.Binseg(model='ar', min_size=w)}

    signal, result = data[values].values, []
    for name in models:
        model = models.get(name).fit(signal)
        bkpt = model.predict(n_bkps=1)[0]
        result += [data.iloc[bkpt]['date']]
    else:
        return max(result)

In [323]:
def series_plot(data: pd.DataFrame, series: int | list[list[int | str]], 
                title: str, group=None, values=values, intermittent=None, 
                seed=42, cpd=False, w=15, line_width=1.5) -> None:
    
    if intermittent is not None:
        data = data[data['intermittent'].eq(intermittent)]

    if group is not None:
        if isinstance(series, int):
            np.random.seed(seed)
            items = data[group].drop_duplicates().values
            idx = np.random.randint(low=0, high=items.shape[0] - 1, size=series)
            items, n = items[idx].tolist(), series
        else:
            items, n = series, len(series)
    else:
        items, n = [None], 1
    
    plt = make_subplots(rows=n//2 if n > 1 else 1, cols=2 if n > 1 else 1, 
                        subplot_titles=[str(x) for x in items],
                        horizontal_spacing=.05, vertical_spacing=.075)

    for i, item in enumerate(items):
        if item is None:
            temp = data
        elif isinstance(item, list):
            store, product = item
            temp = data[data[group[0]].eq(store) & data[group[1]].eq(product)]
        else:
            temp = data[data[group].eq(item)]
            
        plt.add_trace(go.Scatter(x=temp['date'], y=temp[values], 
                                 line={'color': "#1f77b4", 'width': line_width}), 
                      row=i//2+1, col=i%2+1)
        
        if cpd:
            plt.add_shape(
                type="rect",
                xref="x",
                x0=changepoint_detection(temp, values=values, w=w),
                x1=temp['date'].max(),
                y0=0,
                y1=temp[values].max(),
                fillcolor='#3C9B1F',
                opacity=.2,
                line_width=.1,
                row=i//2+1, 
                col=i%2+1
            )

    else:
        plt.update_layout(height=230 * n // 2 if n > 2 else 500,
                          showlegend=False,
                          title={'text': title, 'x': .5},
                          margin=margin)
        plt.show()

In [5]:
series_plot(data=train, group=group, series=[[44, 'BOOKS']], title='Product sales in Equador store')

In [6]:
train = train[train['date'].ge(min_date)]

In [7]:
temp = (train.swifter.progress_bar(False)
             .groupby(group)['sales']
             .apply(lambda x: x.loc[x.gt(0).idxmax():])
             .reset_index()
             ['id'])
train = train.loc[temp, :]

In [8]:
temp = (train.swifter.progress_bar(False)
        .groupby(group)['sales']
        .apply(lambda x: x.loc[x[::-1].gt(0).idxmax():].size - 1)
        .reset_index())
train = train.join((temp[temp['sales'].lt(2 * seasonal_period)]
                    .set_index(group)
                    .drop(columns='sales')), 
                    on=group,
                    how='inner')

In [9]:
def useless_series(data: pd.DataFrame, values=values) -> pd.Series:
    dates = pd.date_range(start=data['date'].max().to_period('M').to_timestamp(),
                          end=data['date'].min().to_period('M').to_timestamp(), 
                          freq=-pd.DateOffset(years=1))

    data['date'] = data['date'].dt.to_period('M').dt.to_timestamp()
    temp = (data[data['date'].isin(dates)]
            .groupby('date')[values]
            .median()
            .reset_index())
    
    if not temp[values].eq(0).all():
        return pd.Series(data=[np.nan] * data[values].size, 
                         index=data.index,
                         name=values)
    else:
        return data[values]


temp = (train.swifter.progress_bar(False)
             .groupby(group)
             .apply(useless_series)
             .reset_index())
train.drop(index=temp[~temp['sales'].isna()]['id'], inplace=True)

In [10]:
# show relationship between history length and zeros rate

temp = (train.assign(no_sales_flg=train['sales'].eq(0))
        .groupby(group)['no_sales_flg']
        .agg(['count', 'mean'])
        .reset_index()
        .rename(columns={'count': 'length', 
                         'mean': 'zeros_rate'}))

(px.scatter(data_frame=temp, x='length', y='zeros_rate')
 .update_layout(margin=margin))

In [11]:
temp = (temp.merge((train.groupby(group)['sales'].mean()
                    .reset_index()), 
                    on=group, how='left')
        .rename(columns={'sales': 'avg_sales'}))

(px.scatter(data_frame=temp[temp['zeros_rate'].gt(.25)], x='zeros_rate', y='avg_sales')
 .update_layout(margin=margin))

In [12]:
train = train.join((temp.assign(intermittent=temp['zeros_rate'].gt(.51))
                    .set_index(group)
                    ['intermittent']),
                    on=group)

In [13]:
def zero_gaps(data: pd.DataFrame, values=values) -> pd.Series:
    data['mask'] = data[values].eq(0)
    data['shift_mask'] = data['mask'].shift()
    data['gap'] = data['mask'].ne(data['shift_mask']).cumsum()
    temp = (data[data['mask']]
            .join((data.groupby('gap')[values]
                   .count()
                   .rename('len')),
                   on='gap'))
    temp = temp[temp['len'].ge(2 * seasonal_period)]

    if temp.empty:
        return pd.Series(data=[np.nan] * data[values].size, 
                         index=data.index,
                         name=values)
    else:
        return temp[values]


temp = (train[train['intermittent'].eq(False)]
        .swifter.progress_bar(False)
        .groupby(group)
        .apply(zero_gaps)
        .reset_index())
train.drop(index=temp[~temp['sales'].isna()]['id'], inplace=True)

In [14]:
series_plot(data=train, group=group, series=10, intermittent=False, title='Product sales in Equador stores', cpd=True, seed=228)

[2026-02-11 12:52:53,019 E 19964 18409803] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


In [15]:
temp = (train[train['intermittent'].eq(False)]
        .swifter.progress_bar(False)
        .groupby(group)
        .apply(lambda x: x[x['date'].lt(changepoint_detection(x))]['sales'])
        .reset_index()
        ['id'])
train.drop(index=temp, inplace=True)

In [16]:
temp = (train.groupby(group)
        .apply(lambda x: pd.Series({'length': x.shape[0],
                                    'intermittent': x['intermittent'].max()}))
        .reset_index())

plt = go.Figure()
(plt.add_trace(go.Histogram(x=temp[temp['intermittent'].eq(False)]['length'], 
                            nbinsx=100, name='smooth'))
 .add_trace(go.Histogram(x=temp[temp['intermittent'].eq(True)]['length'], 
                         nbinsx=100, name='intermittent'))
 .update_layout(barmode='overlay', margin=margin))

In [17]:
train = train.assign(burst=0)
holidays = pd.read_csv('holidays_events.csv', engine='pyarrow', parse_dates=['date'])
stores = pd.read_csv('stores.csv').assign(country='Ecuador')

In [18]:
holidays['type'] = holidays['type'].apply(lambda x: 'Holiday' if x in 
                                          ('Transfer', 'Bridge', 'Additional') else x)
holidays = holidays[(holidays['date'].ge(min_date)) &
                    (holidays['date'].le(max_date)) &
                     holidays['type'].isin(('Holiday', 'Event'))]
holidays = (holidays.drop(index=holidays[holidays['transferred']].index,
                          columns=['transferred', 'description'])
                    .drop_duplicates())

data = pd.DataFrame({'date': [pd.to_datetime('2016-01-02'), 
                              pd.to_datetime('2017-01-01')],
                     'type': ['Holiday'] * 2,
                     'locale': ['National'] * 2,
                     'locale_name': ['Ecuador'] * 2})
holidays = pd.concat([holidays, data]).sort_values('date')

wage_dates = pd.date_range(min_date, max_date, freq='sme')
wages = pd.DataFrame({'date': wage_dates,
                      'type': ['Wage'] * len(wage_dates),
                      'locale': ['National'] * len(wage_dates),
                      'locale_name': ['Ecuador'] * len(wage_dates)})
holidays = pd.concat([holidays, wages]).sort_values('date')

holidays.head()

Unnamed: 0,date,type,locale,locale_name
179,2015-07-03,Holiday,Local,El Carmen
180,2015-07-03,Holiday,Local,Santo Domingo
0,2015-07-15,Wage,National,Ecuador
181,2015-07-23,Holiday,Local,Cayambe
182,2015-07-24,Holiday,Local,Guayaquil


In [19]:
multiindex = ['cluster', 'store_nbr', 'type', 'date', 'city', 'state', 'country']

data = (stores.merge(holidays[holidays['locale'].eq('Local')]
                     .drop(columns='type'), 
                     left_on='city', right_on='locale_name')
                     .assign(flag=1)
                     [multiindex + ['locale', 'flag']])

data = pd.concat([data, 
                  (stores.merge(holidays[holidays['locale'].eq('Regional')]
                        .drop(columns='type'), 
                        left_on='state', right_on='locale_name')
                        .assign(flag=1)
                        [multiindex + ['locale', 'flag']])])

data = pd.concat([data,
                  (stores.merge(holidays[holidays['locale'].eq('National') & 
                                         holidays['type'].eq('Holiday')]
                        .drop(columns='type'), 
                        left_on='country', right_on='locale_name')
                        .assign(flag=1)
                        [multiindex + ['locale', 'flag']])])

data = pd.concat([data,
                  (stores.merge(holidays[holidays['locale'].eq('National') & 
                                         holidays['type'].eq('Event')]
                        .drop(columns='type'), 
                        left_on='country', right_on='locale_name')
                        .assign(locale='Event')
                        .assign(flag=1)
                        [multiindex + ['locale', 'flag']])])

data = pd.concat([data,
                  (stores.merge(holidays[holidays['locale'].eq('National') & 
                                         holidays['type'].eq('Wage')]
                        .drop(columns='type'), 
                        left_on='country', right_on='locale_name')
                        .assign(locale='Wage')
                        .assign(flag=1)
                        [multiindex + ['locale', 'flag']])])

holidays = (data.pivot_table(index=multiindex, columns='locale', values=['flag'])
            .reset_index()
            .droplevel(0, axis=1))
holidays.columns = multiindex + [x.lower() for x in holidays.columns[7:]]
holidays.drop(columns=['cluster', 'city', 'state', 'country'], inplace=True)

holidays.head()

Unnamed: 0,store_nbr,type,date,event,local,national,regional,wage
0,24,D,2015-07-15,,,,,1.0
1,24,D,2015-07-24,,1.0,,,
2,24,D,2015-07-25,,1.0,,,
3,24,D,2015-07-31,,,,,1.0
4,24,D,2015-08-10,,,1.0,,


In [20]:
def smooth_series_scaling(series: pd.Series, return_seasonality=False, w=None) -> np.array:
    stl = STL(np.log(series[0 if not w else -w:] + 1), 
              period=seasonal_period, robust=True).fit()
    return scale(stl.seasonal + stl.resid if return_seasonality else stl.resid)


def smooth_series_bursts_processing(data: pd.DataFrame, holidays: pd.DataFrame, values=values) -> pd.DataFrame:
    temp = data.copy()

    data['z_score'] = smooth_series_scaling(data[values])
    data['mask'] = data['z_score'].apply(lambda x: abs(x) > 3)
    if data[data['mask']].empty:
        return temp[[values, 'burst']]
    data['shift_mask'] = data['mask'].shift()
    data['group'] = data['mask'].ne(data['shift_mask']).cumsum()
    
    cols = ['store_nbr', 'date', 'event', 'local', 'national', 'regional']
    data = (data[data['mask']]
            .join((data.groupby('group')[values]
                   .size()
                   .rename('len')), 
                   on='group')
            .join(holidays.set_index(cols[:2]), on=cols[:2], how='left')
            [cols + [values, 'len']]
            .fillna(0))
    
    t = data[data['len'].eq(1)]
    data.loc[t.index, values] = t.apply(lambda x: x[values] if x['event'] + 
                                         x['local'] + x['national'] + x['regional']
                                         else np.nan, axis=1)
    
    t = data[~data[cols[2:]].sum(axis=1).astype(bool)]
    data.loc[t.index, 'burst'] = 1
    temp.loc[data.index, [values, 'burst']] = data[[values, 'burst']]

    return temp[[values, 'burst']]


def intermittent_series_scaling(series: pd.Series) -> pd.Series:
    series = series.apply(np.log)
    median, mad = series.median(), sp.median_abs_deviation(series)
    return (series - median) / mad


def intermittent_series_bursts_processing(data: pd.DataFrame, holidays: pd.DataFrame, values=values) -> pd.DataFrame:
    temp = data.copy()

    t = data[data[values].gt(0)]
    t['z_score'] = intermittent_series_scaling(t[values])
    data = data.join(t['z_score'])
    data['mask'] = data['z_score'].apply(lambda x: abs(x) > 5)
    if data[data['mask']].empty:
        return temp[[values, 'burst']]
    
    cols = ['store_nbr', 'date', 'event', 'local', 'national', 'regional']
    data = (data[data['mask']]
            .join(holidays.set_index(cols[:2]), on=cols[:2], how='left')
            [cols + [values, 'burst']]
            .fillna(0))
    data = (data.assign(f_shift_days=(data['date'].shift(-1) - data['date']).dt.days)
            .assign(b_shift_days=(data['date'] - data['date'].shift()).dt.days)
            .fillna(float('inf')))

    condition = ~data[cols[2:]].sum(axis=1).astype(bool)
    t = data[condition & data['f_shift_days'].ge(seasonal_period) &
             data['b_shift_days'].ge(seasonal_period)]
    data.loc[t.index, values] = np.nan
    t = data[condition]
    data.loc[t.index, 'burst'] = 1

    temp.loc[data.index, [values, 'burst']] = data[[values, 'burst']]
    return temp[[values, 'burst']]


temp = (train[train['intermittent'].eq(False)].swifter.progress_bar(False)
        .groupby(group)
        .apply(smooth_series_bursts_processing, holidays=holidays)
        .reset_index())
train.loc[temp['id'], ['sales', 'burst']] = temp[['sales', 'burst']].values
train.drop(index=train[train['sales'].isna()].index, inplace=True)


temp = (train[train['intermittent'].eq(True)].swifter.progress_bar(False)
        .groupby(group)
        .apply(intermittent_series_bursts_processing, holidays=holidays)
        .reset_index())
train.loc[temp['id'], ['sales', 'burst']] = temp[['sales', 'burst']].values
train.drop(index=train[train['sales'].isna()].index, inplace=True)

In [21]:
train[group + ['intermittent']].drop_duplicates().groupby('intermittent').size()

intermittent
False    1524
True       52
dtype: int64

# Clustering

## Sales

In [22]:
train = train.join(stores.set_index(group[0])[['type', 'cluster']], on=group[0])

In [None]:
def intermittent_series_features(data: pd.DataFrame, values=values, bootstrap=False) -> list[float]:
    if bootstrap:
        t = data[data[values].gt(0)][values].value_counts(normalize=True)
        data[values] = data[values].apply(lambda x: np.random.choice(a=t.index, p=t.values)
                                            if x else x)
    
    metrics = []
    metrics += [(p := data[values].gt(0).mean()), 1 / p]

    data['mask'] = data[values].eq(0)
    data['shift_mask'] = data['mask'].shift()
    data['gap'] = data['mask'].ne(data['shift_mask']).cumsum()
    gaps = data[data['mask']].groupby('gap').size()
    m, s = gaps.mean(), gaps.std(ddof=1)
    metrics += [(s - m) / (s + m)]

    pos_vals = data[~data['mask']][values]
    metrics += [pos_vals.mean(), pos_vals.median(), 
                sp.variation(pos_vals, ddof=1) ** 2]

    series = data[values]
    metrics += [series.loc[series[::-1].gt(0).idxmax():].size - 1, pos_vals.size]
    metrics += [~data['mask'].iloc[-90:].sum() / ~data['mask'].sum()]

    metrics += [sp.kendalltau(pos_vals.index, pos_vals.values)[0]]

    X, y, lr = data.index.to_numpy().reshape(-1, 1), ~data['mask'], LogisticRegression()
    lr.fit(X, y)
    metrics += [lr.coef_[0][0]]

    metrics += [max(pos_vals) / sum(pos_vals)]

    return metrics


def intermittent_series_clustering(data: pd.DataFrame, group=group, values=values) -> pd.DataFrame:

    # no features permutation
    result = (data.swifter.progress_bar(False)
              .groupby(group)
              .apply(intermittent_series_features)
              .rename(values)
              .reset_index())
    result = (result[group]
              .join(pd.DataFrame(result[values].tolist())))
    result[[1, 3, 5, 7]] = result[[1, 3, 5, 7]].apply(intermittent_series_scaling)

    n, metrics = result.shape[0], pd.DataFrame([])
    metrics['n_clusters'] = [*range(int(n ** .5 / 2), int(n ** .5))]
    silh_score, X = [], result.loc[:, 0:11]

    for i in metrics['n_clusters']:
        result[f'labels_{i}'] = AgglomerativeClustering(n_clusters=i).fit_predict(X)
        silh_score += [silhouette_score(X, result[f'labels_{i}'])]
    else:
        metrics['s_score'] = silh_score

    # series permutation - permute demand separately
    avg_ari_perm = []
    for _ in range(50):
        ari_perm = []
        tmp_result = (data.swifter.progress_bar(False)
                      .groupby(group)
                      .apply(intermittent_series_features, bootstrap=True)
                      .rename(values)
                      .reset_index())
        X = pd.DataFrame(tmp_result[values].tolist())
        X[[1, 3, 5, 7]] = X[[1, 3, 5, 7]].apply(intermittent_series_scaling)

        for i in metrics['n_clusters']:
            y = AgglomerativeClustering(n_clusters=i).fit_predict(X)
            ari_perm += [adjusted_rand_score(result[f'labels_{i}'], y)]
        else:
            avg_ari_perm += [ari_perm.copy()]
    else:
        metrics['avg_ari_perm'] = np.array(avg_ari_perm).mean(axis=0)
        metrics['std_ari_perm'] = np.array(avg_ari_perm).std(axis=0, ddof=1)
        avg_ari_perm.clear()

    n = (metrics.round(2)
         .sort_values(['avg_ari_perm', 'std_ari_perm', 's_score'], 
                      ascending=[False, True, False])
         .iloc[0]
         .at['n_clusters']
         .astype(int))

    return (result[group + [f'labels_{n}']]
            .rename(columns={f'labels_{n}': 'label'}))


temp = train[train['intermittent'].eq(True)]
result = intermittent_series_clustering(temp)
labels = (train[group].join(result.set_index(group), 
                            on=group, how='inner')
          ['label'])
train.loc[labels.index, 'label'] = labels

In [None]:
def smooth_series_features(data: pd.DataFrame, w=90, values=values, bootstrap=False) -> list[float] | None:
    if data.shape[0] < w:
        return None
    else:
        metrics, series = [np.nan], data.iloc[-w:][values]
    
    while np.isnan(metrics).any():
        metrics = []

        if bootstrap:
            stl = STL(series, period=seasonal_period, robust=True).fit()
            series = stl.trend + stl.seasonal + np.random.choice(stl.resid, size=series.size, replace=True)
            series = series.apply(lambda x: 0 if x < 0 else x)

        model = (ExponentialSmoothing(endog=series + 1, 
                                      trend='add', 
                                      seasonal='add',
                                      damped_trend=True,
                                      use_boxcox=True,
                                      seasonal_periods=seasonal_period)
                .fit(optimized=True))
        
        params = ['smoothing_level', 'smoothing_trend', 'smoothing_seasonal', 'damping_trend']
        metrics += [model.params.get(x) for x in params]
        level = model.level.mean()

        metrics += [model.trend.mean() / level]
        metrics += [model.season.var(ddof=1) / (model.season + model.resid).var(ddof=1)]
        metrics += [(model.season.max() - model.season.min()) / level]
        metrics += [model.resid.std(ddof=1) / level]
        metrics += [acf(model.resid.dropna(), nlags=1, missing='conservative')[-1]]
    else:
        return metrics


def smooth_series_clustering(data: pd.DataFrame, group=group, values=values, window=90) -> pd.DataFrame:
    if isinstance(group, str):
        group = [group]
    
    temp = data[group].drop_duplicates()
    
    # no features permutation
    result = (data.swifter.progress_bar(False)
              .groupby(group)
              .apply(smooth_series_features, w=window, values=values)
              .rename(values)
              .reset_index()
              .dropna())

    data = data.join(result[group].set_index(group), on=group, how='inner')

    n, metrics = result.shape[0], pd.DataFrame([])
    metrics['n_clusters'] = [*range(int(n ** .5 / 2), int(n ** .5))]

    silh_score, X = [], scale(result[values].tolist())

    for i in metrics['n_clusters']:
        result[f'labels_{i}'] = AgglomerativeClustering(n_clusters=i).fit_predict(X)
        silh_score += [silhouette_score(X, result[f'labels_{i}'])]
    else:
        metrics['s_score'] = silh_score

    # residual permutation - bootstrap residuals and recompute all features
    avg_ari_perm = []
    for i in range(50):
        ari_perm = []
        tmp_result = (data.swifter.progress_bar(False)
                      .groupby(group)
                      .apply(smooth_series_features,
                             values=values,
                             w=window,
                             bootstrap=True)
                      .rename(values)
                      .reset_index()
                      .dropna())
        
        t = tmp_result[values].apply(lambda x: bool([y for y in x if pd.isna(y)]))
        if any(t):
            print(i, group, values)
            return tmp_result[t]

        X = scale(tmp_result[values].tolist())
        for i in metrics['n_clusters']:
            y = AgglomerativeClustering(n_clusters=i).fit_predict(X)
            ari_perm += [adjusted_rand_score(result[f'labels_{i}'], y)]
        else:
            avg_ari_perm += [ari_perm.copy()]
    else:
        metrics['avg_ari_perm'] = np.array(avg_ari_perm).mean(axis=0)
        metrics['std_ari_perm'] = np.array(avg_ari_perm).std(axis=0, ddof=1)
        avg_ari_perm.clear()

    # clustering with DTW similarities
    tmp_result = (data.swifter.progress_bar(False)
                  .groupby(group)[values]
                  .apply(smooth_series_scaling, return_seasonality=True, w=window)
                  .rename(values)
                  .reset_index())

    X, ari_dtw = cdist_dtw(tmp_result[values], n_jobs=-1), []
    for i in metrics['n_clusters']:
        y = AgglomerativeClustering(n_clusters=i).fit_predict(X)
        ari_dtw += [adjusted_rand_score(result[f'labels_{i}'], y)]
    else:
        metrics['dtw_ari'] = ari_dtw.copy()

    n = (metrics.round(2)
         .sort_values(['avg_ari_perm', 'std_ari_perm', 'dtw_ari'], 
                      ascending=[False, True, False])
         .iloc[0]
         .at['n_clusters']
         .astype(int))

    t = result[f'labels_{n}'].max() + 1
    temp = (temp.merge((result[group + [f'labels_{n}']]
                        .rename(columns={f'labels_{n}': 'label'})),
                        on=group, how='left'))
    
    for i in range(temp.shape[0]):
        if pd.isnull(temp.loc[i, 'label']):
            temp.loc[i, 'label'] = t
            t += 1
    else:
        return temp
    

for cluster in tqdm(train['cluster'].unique()):
    temp = train[train['intermittent'].eq(False) & train['cluster'].eq(cluster)]
    result = smooth_series_clustering(temp)
    labels = (train[group].join(result.set_index(group), 
                                on=group, how='inner')
              ['label'])
    train.loc[labels.index, 'label'] = labels

In [None]:
# train.to_csv('labeled_train.csv')

## Transactions

In [321]:
transactions = pd.read_csv('transactions.csv', engine='pyarrow', parse_dates=['date'])
transactions = (transactions[transactions['date'].ge(min_date)])
transactions.index.name = 'id'
transactions.head()

Unnamed: 0_level_0,date,store_nbr,transactions
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
42868,2015-07-01,1,1947
42869,2015-07-01,2,2112
42870,2015-07-01,3,3486
42871,2015-07-01,4,1600
42872,2015-07-01,5,1517


In [325]:
series_plot(data=transactions, group=group[0], values='transactions', series=10,
            cpd=True, seed=11, w=40, title='Store transactions in Equador stores')

In [326]:
temp = (transactions.swifter.progress_bar(False)
        .groupby(group[0])
        .apply(lambda x: x[x['date'].lt(changepoint_detection(x, values='transactions', w=40))]
               ['transactions'])
        .reset_index()
        ['id'])
transactions.drop(index=temp, inplace=True)

In [330]:
result = smooth_series_clustering(transactions, group=group[0], values='transactions')
labels = (transactions[group[0]]
          .to_frame()
          .join(result.set_index(group[0]), 
                on=group[0], how='inner')
          ['label'])
transactions.loc[labels.index, 'label'] = labels

In [338]:
transactions.to_csv('labeled_transactions.csv')

# Label-shared modeling

In [3]:
tmp = train[(train['store_nbr'] == 31) & (train['family'] == 'LAWN AND GARDEN')]
(px.line(data_frame=tmp, x='date', y='sales')
.update_layout(margin=margin))

[2026-02-10 14:15:15,853 E 6376 17577759] core_worker_process.cc:842: Failed to establish connection to the metrics exporter agent. Metrics will not be exported. Exporter agent status: RpcError: Running out of retries to initialize the metrics agent. rpc_code: 14


In [18]:
result = STL(t['sales'], period=7, robust=True).fit()
t[['trend', 'season', 'resid']] = np.vstack([result.trend, result.seasonal, result.resid]).T

plot = make_subplots(3)
plot.add_trace(go.Scatter(x=t['date'], y=t['trend'], mode='lines', name='trend'), row=1, col=1)
plot.add_trace(go.Scatter(x=t['date'], y=t['season'], mode='lines', name='seasonal'), row=2, col=1)
plot.add_trace(go.Scatter(x=t['date'], y=t['resid'], mode='lines', name='resid'), row=3, col=1)

In [None]:
n = 7
start_date = temp['date'].max() - pd.Timedelta(90, 'd')
label = f'labels_{n}'
t = train[train['date'] > start_date][group + ['date', 'sales']].merge(result[0][group + [label]], on=group)
t[group + [label]].drop_duplicates()[label].value_counts()



In [None]:
# t = train[group + ['date', 'sales']].merge(result[group + ['label']], on=group)

# t['product'] = [f'{y} in {x}' for x, y in zip(t['store_nbr'], t['family'])]
# px.line(data_frame=t[t['label'] == 3], x='date', y='sales', color='product')

# n_clusters = 2
# start_date = temp['date'].max() - pd.Timedelta(90, 'd')

# t = train[train['date'] >= start_date].merge(result[0][['store_nbr', 'family', f'labels_{n_clusters}', f'labels_dtw_{n_clusters}']], 
#                                     on=['store_nbr', 'family'])
# t.head()

# result[0][f'labels_{n_clusters}'].value_counts()

# t['product'] = [f'{y} in {x}' for x, y in zip(t['store_nbr'], t['family'])]
# px.line(data_frame=t[t[f'labels_{n_clusters}'] == 0], x='date', y='sales', color='product')

# result[0][f'labels_dtw_{n_clusters}'].value_counts()

# t['product'] = [f'{y} in {x}' for x, y in zip(t['store_nbr'], t['family'])]
# px.line(data_frame=t[t[f'labels_dtw_{n_clusters}'] == 0], x='date', y='sales', color='product')

In [339]:
swifter.set_defaults(npartitions=1)
ray.shutdown()