# Customer CLTV Value Prediction 

### This notebook describes how to build 100 million multiple timeseries in parallel using Snowflake's Partioned Custom models.

100 million customers and you would like to predict how much each customer is going to spend monthly in next 12 months based on last 24 months of monthly spending data.

### Import all the required packages

In [None]:
# Import python packages
import pandas as pd
import time
from snowflake.ml.model import custom_model
from snowflake.ml.registry import registry
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import to_timestamp_ntz

# Get the active session
session = get_active_session()


### Set the database, schema and warehouse

In [None]:
session.sql("use database CUSTOMER_SYNTHETIC_DATA").collect()
session.sql("use schema CLTV").collect()
session.sql("use warehouse CUSTOMER_CLTV").collect()

### Get the input dataset

In [None]:
train_dataset = session.table('customer_cltv') #This is a snowpark dataframe

# Format the timestamp column to right format
train_dataset = train_dataset.with_column("TS", to_timestamp_ntz(train_dataset["TS"]))
train_dataset.show()

#### For Local Testing select a subset of the data based on five random customer_id

In [None]:
# Select 5 random customer ids
random_cust_ids = train_dataset.select(train_dataset.col("CUST_ID")).distinct().sample(n=5).collect()
train_dataset_dummy = train_dataset.filter(train_dataset.col('CUST_ID').in_(random_cust_ids))

# Create a subset of data Local testing
train_dataset_dummy_pd = train_dataset_dummy.to_pandas()

### Initialise the model registry in Snowflake

In [None]:
REGISTRY_DATABASE_NAME = session.get_current_database()
REGISTRY_SCHEMA_NAME = session.get_current_schema()

reg = registry.Registry(session=session, database_name=REGISTRY_DATABASE_NAME, 
                        schema_name=REGISTRY_SCHEMA_NAME)

### Partitioned Custom Models

#### The below code is run using snowflake partition model feature
#### Documentation: https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/partitioned-custom-models

#### Note: Please keep input and output of predict/custom model function as pandas. Snowflake will automatically handle distributing for you. Function can be called using snowpark dataframe.

In [None]:
class ForecastingModel(custom_model.CustomModel):

    # Use the same decorator as for methods with FUNCTION inference.
    @custom_model.partitioned_inference_api
    def predict(self, df:pd.DataFrame) -> pd.DataFrame:    #Keep input and output here as pandas   
        
        ################## Replace below with your algorithm code ######################################## 
        import pandas as pd
        from statsmodels.tsa.holtwinters import ExponentialSmoothing
        import warnings
        warnings.filterwarnings("ignore")  # Suppress unnecessary warnings
        
        # Convert 'TS' to datetime
        df['TS'] = pd.to_datetime(df['TS'])

        # Ensure data is sorted by TS
        df = df.sort_values(by=['CUST_ID', 'TS']).set_index('TS')
        
        # Function to forecast CLTV for a single customer
        def forecast_cltv(customer_data):
            
            model = ExponentialSmoothing(customer_data['CLTV'], trend="add", seasonal=None)
            fit = model.fit()
            forecast = fit.forecast(12)  # Forecast for 12 months
            forecast_dates = pd.date_range(start=customer_data.index.max(), periods=12, freq='MS')
            # Convert Series to DataFrame
            forecast_df = pd.DataFrame({
                'TS_FORECAST': forecast_dates,
                'CLTV_FORECAST': forecast.values
            })
            
            return forecast_df

        forecast_df = forecast_cltv(df)
        return forecast_df


In [None]:
#We first test on a subset of pandas data locally before running on entire dataset.
cltv_forecasting_model = ForecastingModel()
local_predictions = cltv_forecasting_model.predict(train_dataset_dummy_pd)
print(local_predictions)

#### Logging the models into the model registry 
Documentation: https://docs.snowflake.com/developer-guide/snowflake-ml/model-registry/overview#registering-models-and-versions

In [None]:
options = {
    "function_type": "TABLE_FUNCTION",
}

mv = reg.log_model(
    cltv_forecasting_model,
    model_name="cltv_forecast",
    conda_dependencies=['pandas', 
                        'statsmodels==0.13.5', 
                        'snowflake-snowpark-python'],
    options=options,
    sample_input_data=train_dataset_dummy,
)

#### Upsize the warehouse before running on 100 million customer data for performance

In [None]:
#If you don't have permission to alter size of the WH either switch to another bigger warehouse or continue with current one.
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} \
    SET WAREHOUSE_SIZE='6X-Large';"
).collect()

# Train and Predict the CLTV values and save results in a table in Snowflake.

In [None]:
start_time = time.time()

# Train and Predict CLTV values for 100 million customers
results = mv.run(
  train_dataset, #Can be a pandas df or snowpark df
  function_name="PREDICT",
  partition_column="CUST_ID"
)
results.write.save_as_table('Prediction_results', mode='overwrite')
end_time = time.time()

# Calculate elapsed time in minutes
elapsed_time_minutes = (end_time - start_time) / 60
print(f"Execution time: {elapsed_time_minutes:.2f} minutes")


In [None]:
#Resize to small warehouse
session.sql(
    f"ALTER WAREHOUSE {session.get_current_warehouse()[1:-1]} SET WAREHOUSE_SIZE='SMALL';"
).collect()

### Print sample prediction results

In [None]:
df = session.table('Prediction_results')
print("Number of rows in prediction = ", df.count())

print("Sample prediction data")
df.select("TS_FORECAST", "CUST_ID", "CLTV_FORECAST").show(12)