In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import os
import gc


from matplotlib import pyplot as plt
plt.style.use('seaborn-v0_8')

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
DATA_PATH = "/content/drive/MyDrive/SU Works/CPSC_5305_Intro_to_DS/Rizvans Works/Saved Data/processed_data.parquet"

In [3]:
df = pd.read_parquet(DATA_PATH)

In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 59181090 entries, 0 to 59181089
Data columns (total 31 columns):
 #   Column              Dtype         
---  ------              -----         
 0   date                datetime64[ns]
 1   wm_yr_wk            int16         
 2   weekday             category      
 3   wday                int8          
 4   month               int8          
 5   year                int16         
 6   d                   category      
 7   event_name_1        category      
 8   event_type_1        category      
 9   event_name_2        category      
 10  event_type_2        category      
 11  snap_CA             int8          
 12  snap_TX             int8          
 13  snap_WI             int8          
 14  id                  category      
 15  item_id             category      
 16  dept_id             category      
 17  cat_id              category      
 18  store_id            category      
 19  state_id            category      
 20  

In [5]:
print(df.head().to_markdown())

|    | date                |   wm_yr_wk | weekday   |   wday |   month |   year | d   | event_name_1   | event_type_1   | event_name_2   | event_type_2   |   snap_CA |   snap_TX |   snap_WI | id                            | item_id       | dept_id   | cat_id   | store_id   | state_id   |   sales_count |   sell_price |   day_of_week |   week_of_year |   is_weekend |   sales_lag_28 |   sales_lag_30 |   sales_lag_120 |   sales_lag_365 |   price_change |   price_vs_month_avg |
|---:|:--------------------|-----------:|:----------|-------:|--------:|-------:|:----|:---------------|:---------------|:---------------|:---------------|----------:|----------:|----------:|:------------------------------|:--------------|:----------|:---------|:-----------|:-----------|--------------:|-------------:|--------------:|---------------:|-------------:|---------------:|---------------:|----------------:|----------------:|---------------:|---------------------:|
|  0 | 2011-01-29 00:00:00 |      11101 | Sa

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error
from sklearn.model_selection import TimeSeriesSplit # Not used directly in this version
import gc # For memory management

df.dropna(inplace=True)

# --- Subsample the Data ---
# To make Ridge Regression feasible on a large dataset, let's subsample.
# We will use data from 2014 onwards, similar to the LGBM model.
print("Subsampling data for Ridge Regression model training...")
df_ridge = df[df['date'] >= '2014-01-01'].copy()

# Drop the original date column as it's not a direct feature
df_ridge = df_ridge.drop(columns=['date'])
gc.collect()

print(f"Shape of subsampled data for Ridge Regression: {df_ridge.shape}")


# Define feature columns (X) and target column (y)
target = 'sales_count'
categorical_features = ['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id',
                        'weekday', 'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2']
numeric_features = ['wday', 'month', 'year',
                    'snap_CA', 'snap_TX', 'snap_WI',
                    'sell_price', 'day_of_week', 'week_of_year', 'is_weekend', 'sales_lag_28',
    'sales_lag_30', 'sales_lag_120', 'sales_lag_365', 'price_change',
    'price_vs_month_avg']

# Combine all features
features = categorical_features + numeric_features

X = df_ridge[features]
y = df_ridge[target]

# Clean up df_ridge to save memory
del df_ridge
gc.collect()


# Create a preprocessor
# OneHotEncoder for categorical features
# 'passthrough' for numeric features
preprocessor = ColumnTransformer(
    transformers=[
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features),
        ('num', 'passthrough', numeric_features)
    ],
    remainder='drop'  # Drop any columns not specified
)

# Create the model pipeline
model = Pipeline(steps=[('preprocessor', preprocessor),
                        ('regressor', Ridge(alpha=1.0, random_state=42))])

# --- Train on the entire (subsampled) dataset ---
print("Training Ridge Regression model on the subsampled dataset...")
print("Fitting pipeline...")
model.fit(X, y)

# Predict on the training data itself
print("Evaluating model on training data...")
train_preds = model.predict(X)

# Ensure predictions are non-negative
train_preds[train_preds < 0] = 0

# Calculate RMSE on the training data
rmse = np.sqrt(mean_squared_error(y, train_preds))
print(f"Training RMSE: {rmse:.4f}")

print("\n--- Training Complete ---")


NameError: name 'df' is not defined

# Task
Refactor the existing Ridge Regression model to utilize Dask-ML for parallel and distributed computation. This involves initializing a Dask client, converting the subsampled pandas DataFrame to a Dask DataFrame, replacing scikit-learn components with their Dask-ML equivalents (`dask_ml.preprocessing.ColumnTransformer` and `dask_ml.linear_model.Ridge`), and then training and evaluating the Dask-ML pipeline, ensuring non-negative predictions.

## Set up Dask

### Subtask:
Initialize a Dask client to enable parallel and distributed computation.


**Reasoning**:
To initialize a Dask client, I need to first import the `Client` class from `dask.distributed` and then create an instance of it.



In [6]:
from dask.distributed import Client

client = Client() # Creates a local Dask client
print(client)

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:39637
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:34797'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43133'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43951'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:37863'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:45679 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:45679
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:45388
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42557 name: 0
INFO:

<Client: 'tcp://127.0.0.1:39637' processes=4 threads=8, memory=50.99 GiB>


**Reasoning**:
The previous cell resulted in a `NameError` because `df` was not defined. I need to re-load `df`, apply `dropna`, subsample it as `df_ridge`, extract `X` and `y` from this subsampled DataFrame, and then convert `X` and `y` into Dask DataFrames and Dask Array, respectively, as part of preparing the data for Dask-ML.



In [7]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
from sklearn.preprocessing import StandardScaler, OneHotEncoder # Keep sklearn for ColumnTransformer structure, though dask_ml will replace it later
from sklearn.linear_model import LogisticRegression, Ridge
from sklearn.metrics import f1_score, accuracy_score, roc_auc_score, mean_squared_error
import gc # For memory management

# DATA_PATH should be defined from earlier cell
# df should be loaded from earlier cell

# Re-load df and process it as it was lost in the previous execution state
df = pd.read_parquet(DATA_PATH)

df.dropna(inplace=True)

print("Subsampling data for Ridge Regression model training...")
df_ridge = df[df['date'] >= '2014-01-01'].copy()

# Drop the original date column as it's not a direct feature
df_ridge = df_ridge.drop(columns=['date'])
gc.collect()

print(f"Shape of subsampled data for Ridge Regression: {df_ridge.shape}")


# Define feature columns (X) and target column (y)
target = 'sales_count'
categorical_features = ['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id',
                        'weekday', 'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2']
numeric_features = ['wday', 'month', 'year',
                    'snap_CA', 'snap_TX', 'snap_WI',
                    'sell_price', 'day_of_week', 'week_of_year', 'is_weekend', 'sales_lag_28',
    'sales_lag_30', 'sales_lag_120', 'sales_lag_365', 'price_change',
    'price_vs_month_avg']

# Combine all features
features = categorical_features + numeric_features

# Extract X and y as pandas DataFrames/Series first
X_pandas = df_ridge[features]
y_pandas = df_ridge[target]

# Clean up df_ridge to save memory
del df_ridge
del df
gc.collect()

# Convert pandas DataFrames to Dask DataFrames/Arrays
print("Converting pandas DataFrames to Dask DataFrames...")
X = dd.from_pandas(X_pandas, npartitions=4) # Adjust npartitions based on your cluster and data size
y = da.from_array(y_pandas.values, chunks=y_pandas.shape[0] // 4) # Adjust chunks for Dask Array

print(f"Shape of Dask X: {X.shape}, type: {type(X)}")
print(f"Shape of Dask y: {y.shape}, type: {type(y)}")

Subsampling data for Ridge Regression model training...
Shape of subsampled data for Ridge Regression: (26617770, 30)
Converting pandas DataFrames to Dask DataFrames...
Shape of Dask X: (<dask_expr.expr.Scalar: expr=df.size() // 26, dtype=int64>, 26), type: <class 'dask.dataframe.dask_expr._collection.DataFrame'>
Shape of Dask y: (26617770,), type: <class 'dask.array.core.Array'>


**Reasoning**:
Now that the data is prepared as Dask DataFrames/Arrays, I need to replace the scikit-learn components (`ColumnTransformer`, `OneHotEncoder`, `Ridge`) with their Dask-ML equivalents to build a Dask-compatible preprocessing pipeline and model.



In [8]:
import dask_ml.preprocessing as dm_preproc
import dask_ml.linear_model as dm_linear_model
from dask_ml.compose import ColumnTransformer as DaskColumnTransformer
from dask_ml.model_selection import KFold # Placeholder, not used for training, but good to import if needed for CV later

# Create a Dask-ML preprocessor
# OneHotEncoder for categorical features
# 'passthrough' for numeric features
preprocessor_dask = DaskColumnTransformer(
    transformers=[
        ('cat', dm_preproc.OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features),
        ('num', 'passthrough', numeric_features)
    ],
    remainder='drop'  # Drop any columns not specified
)

# Create the Dask-ML model pipeline
model_dask = Pipeline(steps=[('preprocessor', preprocessor_dask),
                            ('regressor', dm_linear_model.Ridge(alpha=1.0, random_state=42))])

print("Dask-ML pipeline created successfully.")

ModuleNotFoundError: No module named 'dask_ml'

**Reasoning**:
The previous cell failed because the `dask_ml` library was not found. I need to install `dask-ml` using `pip` before attempting to import its modules.

