### Clustering based Collaborative Filtering - Model & Index  
This notebook demonstrates how to build a cluster-based collaborative filtering model using Yelp dataset. You can adjust the model to add more features or change the hyperparameters to improve the model performance. The index is built and stored in the `yelp_UserCF.db` & `yelp_ClusteringCF.db` file.  

#### Pre-requisites
1. Have the processed Yelp dataset in the `../../data/processed_data/yelp_data` folder.  
2. Have a CSV file (e.g., user_clusters.csv) in the `../../data/processed_data` folder with user_id and cluster_id columns.  
3. Have the virtual environment set up and used for the notebook.  

#### Move to Production
1. Copy the `yelp_ClusterCF.db` file to the `../../data/processed_data`folder.
2. Update the `ClusterCF.py` file in the `../backend/models` folder if there are changes in the retrieval process.

In [1]:
import sys
sys.path.append('../')
from utilities import *
import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix
from sparse_dot_topn import sp_matmul_topn
from sklearn.model_selection import train_test_split
import sqlite3
import pickle

In [2]:
# Load Yelp data
db_folder = '../../data/processed_data/yelp_data/'
data_files = ['business', 'categories', 'review']
yelp_data = load_data_from_db(db_folder, data_files)
for table, df in yelp_data.items():
    print(f"Loaded {len(df)} rows from {table} table.")

Loaded 78059 rows from business table.
Loaded 360656 rows from categories table.
Loaded 980418 rows from review table.


In [3]:
# Prepare data
df_business = yelp_data["business"]
df_review = yelp_data["review"]
user_mapping, business_mapping, user_business = get_user_business(df_business, df_review)
print(f"Prepared {len(user_business)} user-business interactions.")

Prepared 985732 user-business interactions.


In [4]:
# Load cluster assignments
cluster_file = '../data_processing/clustered_users.xlsx'
cluster_df = pd.read_excel(cluster_file)
user_to_cluster = pd.DataFrame({
    'user_id': cluster_df['user_id'],
    'cluster_id': cluster_df['cluster'].astype(str)
})
print(f"Loaded {len(cluster_df)} cluster assignments.")

Loaded 99812 cluster assignments.


In [5]:
# Merge user-business interactions with cluster mappings
user_business_clusters = user_business.merge(user_to_cluster, on='user_id', how='inner')
print(f"Merged dataset contains {len(user_business_clusters)} user-business-cluster records.")

Merged dataset contains 918151 user-business-cluster records.


In [6]:
# Aggregate ratings at cluster level (mean stars_review per cluster-business pair)
cluster_business = user_business_clusters.groupby(['cluster_id', 'business_id'])['stars_review'].mean().reset_index()
print(f"Aggregated to {len(cluster_business)} cluster-business interactions.")

Aggregated to 721327 cluster-business interactions.


In [7]:
# Split into train and test (use train for model)
train_data, test_data = train_test_split(cluster_business, test_size=0.2, random_state=42)
cluster_business = train_data.copy()
print(f"Training data: {len(cluster_business)} records, Test data: {len(test_data)} records.")

Training data: 577061 records, Test data: 144266 records.


In [8]:
# Map to indices
cluster_mapping = {str(cid): idx for idx, cid in enumerate(cluster_business['cluster_id'].unique())}
cluster_business['cluster_idx'] = cluster_business['cluster_id'].map(cluster_mapping)
cluster_business['business_idx'] = cluster_business['business_id'].map(business_mapping)
print(f"Created mappings: {len(cluster_mapping)} clusters, {len(business_mapping)} businesses.")

Created mappings: 2503 clusters, 78059 businesses.


In [9]:
# Create sparse cluster-item matrix
cluster_item_sparse = csr_matrix(
    (cluster_business['stars_review'], (cluster_business['cluster_idx'], cluster_business['business_idx'])),
    shape=(len(cluster_mapping), len(business_mapping))
)
cluster_item_sparse.data = np.nan_to_num(cluster_item_sparse.data)
print(f"Created sparse cluster-item matrix with shape {cluster_item_sparse.shape}.")

Created sparse cluster-item matrix with shape (2503, 78059).


In [10]:
# Compute cluster-cluster similarity (cosine similarity)
def sparse_cosine_similarity_topn(A, top_n, threshold=0):
    C = sp_matmul_topn(A, A.T, top_n=top_n, threshold=threshold, n_threads=4, sort=True)
    return C

In [11]:
cluster_similarity_sparse = sparse_cosine_similarity_topn(cluster_item_sparse, top_n=50, threshold=0.01)
print(f"Computed cluster-cluster similarity matrix with {cluster_similarity_sparse.nnz} non-zero elements.")

Computed cluster-cluster similarity matrix with 125150 non-zero elements.


In [12]:
# Database optimization and insertion functions
def optimize_db(conn):
    cursor = conn.cursor()
    cursor.executescript('''
        PRAGMA synchronous = OFF;
        PRAGMA journal_mode = MEMORY;
        PRAGMA temp_store = MEMORY;
        PRAGMA cache_size = 1000000;
    ''')
    conn.commit()

def insert_cluster_item(cluster_business, conn, batch_size=50000):
    cursor = conn.cursor()
    cursor.execute('BEGIN TRANSACTION')
    total_records = len(cluster_business)
    data = cluster_business[['cluster_id', 'business_id', 'stars_review']].values.tolist()
    try:
        for i in range(0, total_records, batch_size):
            batch = data[i:i + batch_size]
            cursor.executemany('''INSERT OR IGNORE INTO cluster_item_index (cluster_id, business_id, stars_review)
                                  VALUES (?, ?, ?)''', batch)
            if i % (batch_size * 5) == 0:
                conn.commit()
                print(f"Inserted {i + len(batch)} / {total_records} cluster-item records.")
        conn.commit()
        print(f"Total {total_records} cluster-item records inserted.")
    except sqlite3.Error as e:
        print(f"Error inserting cluster-item records: {e}")
        conn.rollback()

def insert_cluster_vectors(cluster_similarity_sparse, cluster_mapping, conn, batch_size=5000, progress_interval=50000):
    cursor = conn.cursor()
    cursor.execute('BEGIN TRANSACTION')
    total_inserted = 0
    batch = []
    cluster_keys = list(cluster_mapping.keys())
    try:
        for row_idx in range(cluster_similarity_sparse.shape[0]):
            row_vector = cluster_similarity_sparse.getrow(row_idx)
            serialized_row = pickle.dumps((row_vector.indices, row_vector.data))
            cluster_id = cluster_keys[row_idx]
            batch.append((cluster_id, serialized_row))
            if len(batch) >= batch_size:
                cursor.executemany('''INSERT OR REPLACE INTO cluster_cluster_similarity (cluster_id, similarity_vector)
                                      VALUES (?, ?)''', batch)
                total_inserted += len(batch)
                if total_inserted % progress_interval == 0:
                    print(f"Inserted {total_inserted} cluster vectors...")
                batch = []
        if batch:
            cursor.executemany('''INSERT OR REPLACE INTO cluster_cluster_similarity (cluster_id, similarity_vector)
                                  VALUES (?, ?)''', batch)
            total_inserted += len(batch)
        conn.commit()
        print(f"Total {total_inserted} cluster vectors inserted.")
    except sqlite3.Error as e:
        print(f"Error inserting cluster vectors: {e}")
        conn.rollback()

def insert_mappings(mapping, conn, table_name, key_col, val_col, batch_size=50000):
    cursor = conn.cursor()
    cursor.execute('BEGIN TRANSACTION')
    data = list(mapping.items())
    total_records = len(data)
    try:
        for i in range(0, total_records, batch_size):
            batch = data[i:i + batch_size]
            cursor.executemany(f'''INSERT OR REPLACE INTO {table_name} ({key_col}, {val_col})
                                   VALUES (?, ?)''', batch)
            if i % (batch_size * 5) == 0:
                conn.commit()
                print(f"Inserted {i + len(batch)} / {total_records} {table_name} records.")
        conn.commit()
        print(f"Total {total_records} {table_name} records inserted.")
    except sqlite3.Error as e:
        print(f"Error inserting {table_name} records: {e}")
        conn.rollback()

In [13]:
# Set up database
db_path = './cluster_data/yelp_ClusterUserCF.db'
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
optimize_db(conn)

cursor.execute('''CREATE TABLE IF NOT EXISTS cluster_item_index (
    cluster_id TEXT, business_id TEXT, stars_review REAL, PRIMARY KEY (cluster_id, business_id)
)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS idx_cluster_item ON cluster_item_index(cluster_id, business_id)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS cluster_cluster_similarity (
    cluster_id TEXT PRIMARY KEY, similarity_vector BLOB
)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS idx_cluster_similarity ON cluster_cluster_similarity(cluster_id)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS cluster_mapping (
    cluster_id TEXT PRIMARY KEY, cluster_idx INTEGER
)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS business_mapping (
    business_id TEXT PRIMARY KEY, business_idx INTEGER
)''')
conn.commit()

In [14]:
# Insert data
insert_cluster_item(cluster_business, conn)
insert_cluster_vectors(cluster_similarity_sparse, cluster_mapping, conn)
insert_mappings(cluster_mapping, conn, 'cluster_mapping', 'cluster_id', 'cluster_idx')
insert_mappings(business_mapping, conn, 'business_mapping', 'business_id', 'business_idx')

Inserted 50000 / 577061 cluster-item records.
Inserted 300000 / 577061 cluster-item records.
Inserted 550000 / 577061 cluster-item records.
Total 577061 cluster-item records inserted.
Total 2503 cluster vectors inserted.
Inserted 2503 / 2503 cluster_mapping records.
Total 2503 cluster_mapping records inserted.
Inserted 50000 / 78059 business_mapping records.
Total 78059 business_mapping records inserted.


In [15]:
# Close connection
conn.close()