# Performs Market Basket Analysis on the dataset obtained through Data Marketplace in Snowflake with Snowpark, leveraging on the mlxtend python machine learning library

In [81]:
%config Completer.use_jedi = False
from snowflake.snowpark.session import Session
from snowflake.snowpark import DataFrame, Window
from snowflake.snowpark.types import *
from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import col

# Snowflake connection info is saved in config.py
from config import snowflake_conn_prop

## Configure Snowpark Session and initialize the database, warehouse, and schema

In [82]:
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role {}".format(snowflake_conn_prop['role'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='BI_DEMO_WH', CURRENT_DATABASE()='FMCG_RETAIL_ID', CURRENT_SCHEMA()='PUBLIC')]


## Read a table as Snowpark dataframe

In [83]:
df = session.table("RETAIL_POS")

## Selects receipt with >= 2 categories. Since if there's only 1, there's no basket to talk about..
* Queries are translated into SQL, and run on Snowflake natively. Data are not imported out of Snowflake for computation.
* Lazy evaluation. Queries are only run action such as show() and collect() are executed.

In [84]:
df = df.select("RECEIPT_ID","CATEGORY").distinct()
window = Window.partition_by("RECEIPT_ID")
df = df.select("RECEIPT_ID","CATEGORY",F.count("CATEGORY").over(window).alias("COUNT")).sort(col("COUNT").desc())
df = df.filter(col("COUNT")>=2)
df.show()

--------------------------------------------------
|"RECEIPT_ID"  |"CATEGORY"             |"COUNT"  |
--------------------------------------------------
|171159987     |Soy Sauce              |17       |
|236460450     |Jelly & Pudding        |17       |
|246035648     |Stock Soup             |17       |
|170911774     |Sausage                |17       |
|423657600     |Dry Noodle&vermicelli  |17       |
|423657600     |Instant Noodles        |17       |
|173984214     |Powdered Beverage      |17       |
|238183176     |Tissue                 |17       |
|164472976     |Jelly Drink            |17       |
|335749496     |Baking Needs           |17       |
--------------------------------------------------



## For market basket analysis, I'm just interested in binary state : included (1) or not included (1) in the basket of transactions

In [85]:
df = df.select("RECEIPT_ID","CATEGORY",F.when(col("COUNT").is_null(),0).otherwise(1).alias("COUNT"))
df.sort(col("COUNT").desc()).show()

------------------------------------------------
|"RECEIPT_ID"  |"CATEGORY"           |"COUNT"  |
------------------------------------------------
|380134330     |Stock Soup           |1        |
|186484793     |Liquid Milk          |1        |
|152556930     |Face Care            |1        |
|218900953     |Liquid Soap          |1        |
|262682466     |Insecticides         |1        |
|246919629     |Wet Wipes            |1        |
|353627599     |Feminine Protection  |1        |
|205326800     |Baking Needs         |1        |
|186669916     |Baby Products        |1        |
|166151154     |Soy Sauce            |1        |
------------------------------------------------



## Getting all my product categories into a list

In [86]:
product_category = df.select("CATEGORY").distinct().collect()
product_category_names = []
for i in product_category:
    product_category_names.append(i.as_dict()['CATEGORY'])

## Pivot the product categories into columns, and let the receipt ids remain as rows
* Now each row has the same amount of product categories
* For the categories that are not found in a receipt, they will be made null in the pivot process
* Convert those null to 0s

In [87]:
df = df.pivot(col("CATEGORY"),product_category_names).sum("COUNT").na.fill(0)
df = df.drop("RECEIPT_ID")
df.write.save_as_table("ENCODED",mode="overwrite")
df.show()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Create a stored procedure to perform training directly on Snowflake i.e data is not pulled out of Snowflake into the client side for computation
### 1. Write the implementation code of the stored proc

In [97]:
# Implementation code for the stored procedure
def market_basket(session: Session, from_table: str, to_table: str) -> str:
    
    import pandas as pd
    from mlxtend.frequent_patterns import fpgrowth
    from mlxtend.frequent_patterns import association_rules
    
    
    df = session.table(from_table)
    pdf = df.sample(0.01).toPandas()
    
    # The fpgrowth algorithm uses trees, and is faster
    frequent_itemsets = fpgrowth(pdf, min_support=0.01, use_colnames=True)
    
    rules = association_rules(frequent_itemsets, metric="lift", min_threshold=1)
    rules = rules.sort_values(['confidence','lift'],ascending = [False, False])
    rules["antecedents"] = rules["antecedents"].apply(lambda x: ', '.join(list(x))).astype("unicode")
    rules["consequents"] = rules["consequents"].apply(lambda x: ', '.join(list(x))).astype("unicode")
    
    pdf_rules = pd.DataFrame(rules, columns=['antecedents', 'consequents', 'antecedent support',
       'consequent support', 'support', 'confidence', 'lift', 'leverage',
       'conviction'])
    sdf = session.createDataFrame(pdf_rules)
    sdf.write.saveAsTable(to_table,mode="overwrite")
    
    return "SUCCESS"

### 2. Register the stored proc in Snowflake

In [101]:
from snowflake.snowpark.functions import sproc

# This adds the necessary packages to our Python environment in Snowflake
session.clear_packages()
session.add_packages("snowflake-snowpark-python","pandas", "joblib", "numpy", "scikit-learn", "scipy", "setuptools")
session.add_import("mlxtend")

# We need a stage to host the stored proc
_ = session.sql('CREATE STAGE IF NOT EXISTS MODELS').collect()

# Register the stored proc
session.sproc.register(market_basket, name="market_basket", is_permanent=True, stage_location="@models", replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fbd38468c70>

### 3. Finally, call the stored proc to perform market basket analysis

In [99]:
session.call("market_basket", "Encoded", "Market_Basket_Results")

'SUCCESS'

In [None]:
session