In [1]:
import ray

ray.init()

2024-12-27 02:23:26,568	INFO worker.py:1636 -- Connecting to existing Ray cluster at address: 10.128.0.31:6533...
2024-12-27 02:23:26,589	INFO worker.py:1812 -- Connected to Ray cluster. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.12.8
Ray version:,2.40.0
Dashboard:,http://127.0.0.1:8265




In [2]:
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from datetime import datetime
from scipy.sparse import csr_matrix

In [3]:
df = ray.data.read_bigquery(project_id = 'double-fusion-445405-r6',query = 'SELECT * FROM tridorian_test.online_retail')

2024-12-27 02:23:42,579	INFO bigquery_datasource.py:68 -- Created streams: 1
2024-12-27 02:23:42,581	INFO bigquery_datasource.py:70 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.


In [4]:
dfm = df.to_modin()

2024-12-27 02:23:48,100	INFO bigquery_datasource.py:68 -- Created streams: 1
2024-12-27 02:23:48,101	INFO bigquery_datasource.py:70 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.
2024-12-27 02:23:48,107	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-12-27_01-58-19_400424_33643/logs/ray-data
2024-12-27 02:23:48,108	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadBigQuery]


Running 0: 0.00 row [00:00, ? row/s]

2024-12-27 02:23:51,865	INFO bigquery_datasource.py:68 -- Created streams: 1
2024-12-27 02:23:51,868	INFO bigquery_datasource.py:70 -- The number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.


- ReadBigQuery->SplitBlocks(200) 1: 0.00 row [00:00, ? row/s]

In [5]:
dfm.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,537032,21275,?,-30,2010-12-03 16:50:00,0.0,,United Kingdom
1,538090,20956,?,-723,2010-12-09 14:48:00,0.0,,United Kingdom
2,539494,21479,?,752,2010-12-20 10:36:00,0.0,,United Kingdom
3,540100,22837,?,-106,2011-01-04 16:53:00,0.0,,United Kingdom
4,540558,21258,?,-29,2011-01-10 10:04:00,0.0,,United Kingdom


In [6]:
dfm.describe()

Unnamed: 0,Quantity,InvoiceDate,UnitPrice,CustomerID
count,541909.0,541909,541909.0,406829.0
mean,9.55225,2011-07-04 13:34:57.156386,4.611114,15287.69057
min,-80995.0,2010-12-01 08:26:00,-11062.06,12346.0
25%,1.0,2011-03-28 11:34:00,1.25,13953.0
50%,3.0,2011-07-19 17:17:00,2.08,15152.0
75%,10.0,2011-10-19 11:27:00,4.13,16791.0
max,80995.0,2011-12-09 12:50:00,38970.0,18287.0
std,218.081158,,96.759853,1713.600303


In [7]:
dfm.isnull().sum()/dfm.shape[0]

InvoiceNo      0.000000
StockCode      0.000000
Description    0.000000
Quantity       0.000000
InvoiceDate    0.000000
UnitPrice      0.000000
CustomerID     0.249267
Country        0.000000
dtype: float64

Based on simple descriptive analysis, we can see that there are data anomaly in *quantity* and *unitprice* column. We can see there are negative number which doesn't make sense, so we will remove it. Moreover, the mising *customerid* is 25% which is really high. Since our objective here is to create the recommendation system, so we will use customer with not missing customerid

In [8]:
default_rec = list(dfm.StockCode.value_counts().index[:10])
default_rec

the groupby keys will be sorted anyway, although the 'sort=False' was passed. See the following issue for more details: https://github.com/modin-project/modin/issues/3571.


['85123A',
 '22423',
 '85099B',
 '47566',
 '20725',
 '84879',
 '22720',
 '22197',
 '21212',
 '20727']

In [9]:
dfm.StockCode.value_counts()

StockCode
85123A    2313
22423     2203
85099B    2159
47566     1727
20725     1639
          ... 
84705C       1
84670        1
84664        1
84661b       1
m            1
Name: count, Length: 4070, dtype: int64

In [10]:
dfm = dfm[(dfm.Quantity > 0) & (dfm.UnitPrice > 0) & (dfm.CustomerID.notnull())]

In [11]:
# Sum 'Quantity' for each 'CustomerID' and 'StockCode' pair
df_agg = dfm.groupby(['CustomerID', 'StockCode'])['Quantity'].sum().reset_index()
df_agg


Unnamed: 0,CustomerID,StockCode,Quantity
0,12346.0,23166,74215
1,12347.0,16008,24
2,12347.0,17021,36
3,12347.0,20665,6
4,12347.0,20719,40
...,...,...,...
266787,18287.0,84920,4
266788,18287.0,85039A,96
266789,18287.0,85039B,120
266790,18287.0,85040A,48


In [12]:
# Create a pivot table with users as rows and items as columns
pivot_table = df_agg.pivot(index='StockCode', columns='CustomerID', values='Quantity').fillna(0)
pivot_table

CustomerID,12346.0,12347.0,12348.0,12349.0,12350.0,12352.0,12353.0,12354.0,12355.0,12356.0,...,18273.0,18274.0,18276.0,18277.0,18278.0,18280.0,18281.0,18282.0,18283.0,18287.0
StockCode,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
10002,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10080,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10120,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10123C,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10124A,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
C2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
DOT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
M,0.0,0.0,0.0,0.0,0.0,3.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0
PADS,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [13]:
matrix = csr_matrix(pivot_table.values)
matrix

<Compressed Sparse Row sparse matrix of dtype 'float64'
	with 266792 stored elements and shape (3665, 4338)>

In [14]:
# Fit the KNN model to data
model_knn = NearestNeighbors(metric='cosine', algorithm='brute')
model_knn.fit(matrix)


In [16]:
#import modin.pandas as pd 
#dfm['invoicedate'] = pd.to_datetime(dfm['invoicedate'], format = '%m/%d/%Y %H:%M')
#default_rec.to_list()

In [17]:
import modin.pandas as pd 

In [None]:
# def rec(items, pivot_table, model_knn, default_recommendation ,n_recommendations=10, ):
#     if items in pivot_table.index :
#         distances,indices=model_knn.kneighbors(pivot_table.loc[items].values.reshape(1,-1),n_neighbors=10+1)
#         ids = []
#         # Get recommendations from similar users
#         for i in range(1, len(distances.flatten())):
#             similar_user_id = pivot_table.index[indices.flatten()[i]]   
#             ids.append(similar_user_id)
#         return ids
#     else:
#         return default_recommendation

# class item_recommendation:
#     # Take the message to return as an argument to the constructor.
#     def __init__(self):
#         self.pivot_table = pd.read_csv('pivot.csv')
#         self.model = joblib.load('model_knn.pkl')
#         self.default_rec = ['85123A', '22423','85099B', '84879', '47566', '20725', '22720', '20727', 'POST', '23203']
        
#     async def __call__(self, starlette_request: Request) -> Dict:
#         payload = await starlette_request.json()
#         stock_code = payload['stock_code']
#         rec_items = recommend_items(stock_code, self.pivot_table, self.model, self.default_rec)
#         return {"result": rec_items}

In [None]:
#item_recommendation().get()

In [18]:
def recommend_items(items, n_recommendations=10, default_recommendation = default_rec):
    if items in pivot_table.index :
        distances,indices=model_knn.kneighbors(pivot_table.loc[items].values.reshape(1,-1),n_neighbors=10+1)
        ids = []
        # Get recommendations from similar users
        for i in range(1, len(distances.flatten())):
            similar_user_id = pivot_table.index[indices.flatten()[i]]   
            ids.append(similar_user_id)
        return ids
    else:
        return default_rec

In [19]:
recommend_items('11')

['85123A',
 '22423',
 '85099B',
 '47566',
 '20725',
 '84879',
 '22720',
 '22197',
 '21212',
 '20727']

In [20]:
recommend_items('10120')

['21839',
 '84201B',
 '84206A',
 '22263',
 '85028L',
 '20838',
 '85039C',
 '62074B',
 '20773',
 '85040B']

In [24]:
import pickle
with open('pivot.pkl', "wb") as f:
    pickle.dump(pivot_table, f)

import pickle
with open('model_knn.pkl', "wb") as f:
    pickle.dump(model_knn, f)


# pivot_table.to_csv('pivot.csv')
# joblib.dump(model_knn, "model_knn.pkl") 