In [3]:
from datetime import datetime
from pyspark.sql import SparkSession

spark = (SparkSession.builder.appName("pyspark-rdd-demo-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())
# spark.sparkContext.getConf().getAll()

In [4]:
products = spark.read.format("parquet").load("s3a://warehouse/gold/tiki/products.parquet")
users = spark.read.format("parquet").load("s3a://warehouse/gold/tiki/users.parquet")
reviews = spark.read.format("parquet").load("s3a://warehouse/gold/tiki/reviews.parquet")

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from underthesea import word_tokenize
from pyspark.sql.functions import concat, lit

import re

@udf(StringType()) 
def process_text(document):
    # Change to lowercase
    document = document.lower()
    
    # Remove HTTP links (using regular expression)
    document = re.sub(r'((http|https)\:\/\/)?[a-zA-Z0-9\.\/\?\:@\-_=#]+\.([a-zA-Z]){2,6}([a-zA-Z0-9\.\&\/\?\:@\-_=#])*', '', document)
    
    # Remove line breaks (replace with space)
    document = re.sub(r'[\r\n]+', ' ', document)
    
    # Replace '/' and ',' with space
    document = document.replace('/', ' ').replace(',', ' ')
    
    # Remove punctuations using regular expression
    document = re.sub(r'[^\w\s]', '', document)
    
    # Remove extra spaces (replace multiple spaces with a single space)
    document = re.sub(r'[\s]{2,}', ' ', document)
    
    # Tokenize text using word_tokenize from underthesea
    document = word_tokenize(document, format="text")
    
    return document



# Create the 'info' column by concatenating product_name, description, and specifications
products = products.withColumn(
    'info',
    concat(products['product_name'], lit(' '), products['description'], lit(' '), products['specifications'])
)
# Apply the UDF to the 'info' column
products = products.withColumn('processed_info', process_text(products['info']))

# # Show the results
# products.select('processed_info').show()

In [6]:
from gensim import corpora, models, similarities

def load_stopword(STOP_WORDS):
    with open(STOP_WORDS, 'r', encoding = 'utf-8') as file:
        stop_words = file.read()
    stop_words = stop_words.split('\n')
    return stop_words
    
def gensim_rcm(gold_products, stop_words):
    
    gold_products = gold_products.withColumn(
        'content',
        concat(gold_products['product_name'], lit(' '), gold_products['description'], lit(' '), gold_products['specifications'])
    )

    gold_products = gold_products.withColumn('processed_info', process_text(gold_products['content']))

    info = gold_products.select("processed_info").rdd.map(lambda row: row[0].split()).collect()

    dictionary = corpora.Dictionary(info)

    stop_words = load_stopword("vietnamese-stopwords.txt")
    stop_ids = [dictionary.token2id[stopword] for stopword in stop_words if stopword in dictionary.token2id]
    once_ids = [tokenid for tokenid, docfreq in dictionary.dfs.items() if docfreq == 1]
    dictionary.filter_tokens(stop_ids + once_ids)
    dictionary.compactify()

    corpus = [dictionary.doc2bow(text) for text in info]

    tfidf = models.TfidfModel(corpus)

    feature_cnt = len(dictionary.token2id)
    index = similarities.SparseMatrixSimilarity(tfidf[corpus], num_features=feature_cnt)
    
    # Return the dictionary, TF-IDF model, and similarity index for further use
    dictionary.save('dictionary.gensim')
    tfidf.save('tfidf_model.gensim')
    index.save('similarity_index.gensim')
    return dictionary, tfidf, index

stop_words = load_stopword("vietnamese-stopwords.txt")
# _ = gensim_rcm(products, stop_words)
tfidf = models.TfidfModel.load('tfidf_model.gensim')
index = similarities.SparseMatrixSimilarity.load('similarity_index.gensim')
dictionary = corpora.Dictionary.load('dictionary.gensim')

In [8]:
# dictionary.save("s3a://warehouse/gold/tiki/dict.gensim")

In [7]:
import pandas as pd
def get_id_rcm(n, product_id, seller_id, dictionary, tfidf,index, df):

    product_result = df[(df['product_id'] == product_id) & (df['seller_id'] == seller_id)]
    view_product = product_result['processed_info'].values[0]
    view_product = view_product.split()

    bow_vector = dictionary.doc2bow(view_product)
    sim = index[tfidf[bow_vector]]
    
    df_result = pd.DataFrame({'id': range(len(sim)), 'score': sim})
    
    top_scores = df_result.sort_values(by='score', ascending=False).head(n + 1)
    
    product_find = df[df.index.isin(top_scores['id'])]
    
    result = pd.concat([product_find[['product_id', 'seller_id', 'product_name']], top_scores], axis=1)
    
    result = result[(result['product_id'] != product_id) | (result['seller_id'] != seller_id)]
    
    recommended_ids = result.sort_values(by='score', ascending=False)['id'].tolist()
    return result

get_id_rcm(10,11708848,14626,dictionary,tfidf, index, products.toPandas())

KeyError: 'processed_info'

## ALS

In [38]:
from pyspark.sql.functions import explode, col, split, concat, lit, expr
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

def rcm_collaborative_filtering(gold_reviews, n = 10):

    gold_reviews = gold_reviews.withColumn(
        'item_id',
        concat(gold_reviews['product_id'], lit('_'), gold_reviews['seller_id'])
    )

    gold_reviews = gold_reviews.withColumn('original_item_id', gold_reviews['item_id'])

    indexer = StringIndexer(inputCol="item_id", outputCol="item_index")
    gold_reviews = indexer.fit(gold_reviews).transform(gold_reviews)

    als = ALS(userCol="customer_id", itemCol="item_index", ratingCol="rating", coldStartStrategy="drop")
    model = als.fit(gold_reviews)

    user_recs = model.recommendForAllUsers(n)

    exploded_recs = user_recs.withColumn("recommendation", explode(col("recommendations")))
    exploded_recs = exploded_recs.withColumn("item_index", col("recommendation.item_index"))
    exploded_recs = exploded_recs.withColumn("rating", col("recommendation.rating"))
    
    distinct_items = gold_reviews.select("original_item_id", "item_index").dropDuplicates(["item_index"])
    processed_user_recs = exploded_recs.join(
        distinct_items,
        on="item_index"
    )

    processed_user_recs = processed_user_recs.withColumn("product_id", split(col("original_item_id"), "_")[0].cast("int"))
    processed_user_recs = processed_user_recs.withColumn("seller_id", split(col("original_item_id"), "_")[1].cast("int"))


    processed_user_recs = processed_user_recs.select(
        "customer_id",
        "product_id",
        "seller_id",
        "rating"
    )

    processed_user_recs.show()
    
    return processed_user_recs

obj = rcm_collaborative_filtering(reviews, n = 20)


+-----------+----------+---------+---------+
|customer_id|product_id|seller_id|   rating|
+-----------+----------+---------+---------+
|         27|    650250|        1| 4.941955|
|         27| 118130919|   117842| 4.604446|
|         27| 186364165|    96252|  4.22681|
|         27| 146773059|    47984| 4.189372|
|         27| 117927305|     2322|4.1005893|
|         27| 211946959|    90053|4.0803447|
|         27| 172283470|    18138| 4.012916|
|         27| 272030678|   313025|3.9958537|
|         27| 275302988|   261473|3.9737282|
|         27|  13356120|    20142|3.9696078|
|         27|  54439144|        1|  3.96342|
|         27| 103209852|   191019|3.9271016|
|         27| 174619837|     3778|3.9253488|
|         27| 165700192|        1|3.9191248|
|         27|  68785171|     9612|3.8994894|
|         27| 192927412|   206733|3.8965356|
|         27| 161459742|   249806|3.8853023|
|         27| 163576573|     1130|3.8748899|
|         27|  67827661|     4106|  3.82903|
|         

In [39]:
obj.write.mode("overwrite").parquet("s3a://warehouse/recommendation/tiki/als.parquet")

In [40]:
import pandas as pd
als_data = spark.read.format("parquet").load("s3a://warehouse/recommendation/tiki/als.parquet")
als_data = als_data.toPandas()

In [41]:
als_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2678180 entries, 0 to 2678179
Data columns (total 4 columns):
 #   Column       Dtype  
---  ------       -----  
 0   customer_id  int32  
 1   product_id   int32  
 2   seller_id    int32  
 3   rating       float32
dtypes: float32(1), int32(3)
memory usage: 40.9 MB


In [42]:
def als_recommendation(customer_id, data_als, data):
    als_result = data_als[data_als["customer_id"] == customer_id]
    result = pd.merge(als_result, data, on = ["product_id","seller_id"], how = "left")
    return result[["customer_id", "product_id","seller_id", "product_name"]]
    # return list(result["product_id"])

In [44]:
data = products.toPandas()

In [22]:
data = data[["product_id","seller_id"]].astype(int)
als_result = als_data[als_data.customer_id == 8097347]


In [25]:
als_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2812089 entries, 0 to 2812088
Data columns (total 4 columns):
 #   Column       Dtype  
---  ------       -----  
 0   customer_id  int32  
 1   product_id   object 
 2   seller_id    object 
 3   rating       float32
dtypes: float32(1), int32(1), object(2)
memory usage: 64.4+ MB


In [45]:
als_recommendation(8097347,als_data,data)

Unnamed: 0,customer_id,product_id,seller_id,product_name
0,8097347,1695753,138278,Máy Sấy Tóc Philips HP8108/00 - Hàng Chính Hãng
1,8097347,273402112,1,Sữa rửa mặt giúp làm sạch sâu dành cho da dầu ...
2,8097347,213912311,306444,Chân đế máy giặt Electrolux nâng máy lên cao t...
3,8097347,32505959,100773,Remote điều khiển đa năng - Dùng được cho tất ...
4,8097347,1687625,1,Kem chống muỗi Rohto Metholatum Remos Hương Sả...
5,8097347,2752195,103815,Đồng hồ Nam thể thao SKMEI 1155B - DHA473
6,8097347,155925409,165363,Đồng Hồ Nam Dây Nhựa Casio Standard AE-1500WH-...
7,8097347,106390917,195148,Tặng Kèm Tovit Thay Dây đồng hồ Hublot cao su ...
8,8097347,273250656,1,CHÌ KẺ CHÂN MÀY INNISFREE AUTO EYEBROW PENCIL ...
9,8097347,212213013,1,Từ điển tiếng việt dành cho học sinh - khổ to ...
