In [None]:
!pip install pyspark



In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Big Data Project").config("spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()
df_loaded = spark.read.option("header", "true").csv("/content/drive/My Drive/BigData-Project/data/data_cleaned.csv")
df_loaded.show()

+---+--------------+---------------+----------+-----------+--------------------+--------------------+--------------------+------+---------------+
|age| division_name|department_name|class_name|clothing_id|               title|         review_text|alike_feedback_count|rating|recommend_index|
+---+--------------+---------------+----------+-----------+--------------------+--------------------+--------------------+------+---------------+
| 40|       General|        Bottoms|     Jeans|       1028|amazing fit and wash|like reviewer hes...|                   0|     5|              1|
| 62|General Petite|           Tops|   Blouses|        850|   lovely and unique|true bunch fall c...|                  12|     5|              1|
| 47|General Petite|        Bottoms|    Skirts|        993|                 meh|wanted skirt work...|                   3|     1|              0|
| 45|General Petite|        Bottoms|     Pants|       1068|                 wow|love love hesitan...|                   0|  

In [None]:
df = df_loaded.filter((df_loaded["recommend_index"] != 0) & (df_loaded["rating"] != 1) & (df_loaded["rating"] != 2))
df.show()

+---+--------------+---------------+----------+-----------+--------------------+--------------------+--------------------+------+---------------+
|age| division_name|department_name|class_name|clothing_id|               title|         review_text|alike_feedback_count|rating|recommend_index|
+---+--------------+---------------+----------+-----------+--------------------+--------------------+--------------------+------+---------------+
| 40|       General|        Bottoms|     Jeans|       1028|amazing fit and wash|like reviewer hes...|                   0|     5|              1|
| 62|General Petite|           Tops|   Blouses|        850|   lovely and unique|true bunch fall c...|                  12|     5|              1|
| 45|General Petite|        Bottoms|     Pants|       1068|                 wow|love love hesitan...|                   0|     5|              1|
| 37|     Initmates|       Intimate|      Swim|         24|great for bigger ...|absolutely love r...|                   0|  

In [None]:
!pip install nltk
import nltk

nltk.download('vader_lexicon')



[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

In [None]:
from pyspark.sql.functions import udf, col
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.types import StringType, FloatType

In [None]:
spark = SparkSession.builder.appName("SelectColumns").getOrCreate()
new_data = df.select("department_name","class_name","clothing_id","title", "review_text", "rating","recommend_index")
new_data.show()

+---------------+----------+-----------+--------------------+--------------------+------+---------------+
|department_name|class_name|clothing_id|               title|         review_text|rating|recommend_index|
+---------------+----------+-----------+--------------------+--------------------+------+---------------+
|        Bottoms|     Jeans|       1028|amazing fit and wash|like reviewer hes...|     5|              1|
|           Tops|   Blouses|        850|   lovely and unique|true bunch fall c...|     5|              1|
|        Bottoms|     Pants|       1068|                 wow|love love hesitan...|     5|              1|
|       Intimate|      Swim|         24|great for bigger ...|absolutely love r...|     5|              1|
|           Tops|  Sweaters|        933|love the pattern ...|love sweater im f...|     4|              1|
|           Tops|  Sweaters|        937|beautiful and unique|love sweater soft...|     5|              1|
|           Tops|     Knits|        868|unique

In [None]:
from pyspark.sql.types import IntegerType
new_data = new_data.withColumn("rating", col("rating").cast(IntegerType()))

In [None]:
spark = SparkSession.builder.appName("SentimentAnalysis").getOrCreate()

## **Khởi tạo mô hình VADER**

In [None]:
sia = SentimentIntensityAnalyzer()

In [None]:
def get_polarity_score(review):
    return float(sia.polarity_scores(review)['compound'])

def get_sentiment_label(review):
    score = sia.polarity_scores(review)['compound']
    if score > 0.05:
        return "positive"
    elif score < -0.05:
        return "negative"
    else:
        return "neutral"

polarity_score_udf = udf(get_polarity_score, FloatType())
sentiment_label_udf = udf(get_sentiment_label, StringType())

new_data = new_data.withColumn("polarity_score", polarity_score_udf(col("review_text")))
new_data = new_data.withColumn("Sentiment_Label", sentiment_label_udf(col("review_text")))

new_data.show()

+---------------+----------+-----------+--------------------+--------------------+------+---------------+--------------+---------------+
|department_name|class_name|clothing_id|               title|         review_text|rating|recommend_index|polarity_score|Sentiment_Label|
+---------------+----------+-----------+--------------------+--------------------+------+---------------+--------------+---------------+
|        Bottoms|     Jeans|       1028|amazing fit and wash|like reviewer hes...|     5|              1|        0.6908|       positive|
|           Tops|   Blouses|        850|   lovely and unique|true bunch fall c...|     5|              1|        0.8718|       positive|
|        Bottoms|     Pants|       1068|                 wow|love love hesitan...|     5|              1|        0.9767|       positive|
|       Intimate|      Swim|         24|great for bigger ...|absolutely love r...|     5|              1|        0.5563|       positive|
|           Tops|  Sweaters|        933|l

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SentimentClustering").getOrCreate()

positive_comments = new_data.filter(new_data['Sentiment_Label'] == 'positive')
positive_comments.show()

+---------------+----------+-----------+--------------------+--------------------+------+---------------+--------------+---------------+
|department_name|class_name|clothing_id|               title|         review_text|rating|recommend_index|polarity_score|Sentiment_Label|
+---------------+----------+-----------+--------------------+--------------------+------+---------------+--------------+---------------+
|        Bottoms|     Jeans|       1028|amazing fit and wash|like reviewer hes...|     5|              1|        0.6908|       positive|
|           Tops|   Blouses|        850|   lovely and unique|true bunch fall c...|     5|              1|        0.8718|       positive|
|        Bottoms|     Pants|       1068|                 wow|love love hesitan...|     5|              1|        0.9767|       positive|
|       Intimate|      Swim|         24|great for bigger ...|absolutely love r...|     5|              1|        0.5563|       positive|
|           Tops|  Sweaters|        933|l

In [None]:
from pyspark.sql import functions as F

aggregated_comments = (
    positive_comments
    .groupBy("clothing_id")
    .agg(
        F.first("department_name").alias("department_name"),
        F.first("class_name").alias("class_name"),
        F.first("title").alias("title"),
        F.concat_ws(" ", F.collect_list("review_text")).alias("combined_review_text"),
        F.avg("rating").alias("avg_rating"),
        F.avg("recommend_index").alias("avg_recomment"),
        F.avg("polarity_score").alias("avg_polarity_score"),
        F.first("Sentiment_Label").alias("Sentiment_Label")
    ))

aggregated_comments = aggregated_comments.withColumn(
    "final_recomment",
    F.when(F.col("avg_recomment") > 0.5, 1).otherwise(0)
)

aggregated_comments.show(truncate=False)

+-----------+---------------+----------+-------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ReviewSimilarity").getOrCreate()

new_review_text = "This is so comfortable and beautiful"
new_review_df = spark.createDataFrame([(new_review_text,)], ["combined_review_text"])
tokenized_df = aggregated_comments.select("class_name","combined_review_text",F.split(F.col("combined_review_text"), " ").alias("words"))

tokenized_new_review = new_review_df.select(F.split(F.col("combined_review_text"), " ").alias("words"))

word2Vec = Word2Vec(vectorSize=100, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(tokenized_df)

result_df = model.transform(tokenized_df)

new_result_df = model.transform(tokenized_new_review)
new_vector = new_result_df.collect()[0].result

**Save model**

In [None]:
model.write().overwrite().save("/content/drive/My Drive/BigData-Project/model")
print("Model đã được lưu thành công!")

Model đã được lưu thành công!


In [None]:
from pyspark.ml.feature import Word2VecModel
model = Word2VecModel.load("/content/drive/My Drive/BigData-Project/model")

new_review_text = "This is so comfortable and beautiful"
new_review_df = spark.createDataFrame([(new_review_text,)], ["combined_review_text"])
tokenized_new_review = new_review_df.select(F.split(F.col("combined_review_text"), " ").alias("words"))
new_result_df = model.transform(tokenized_new_review)
new_vector = new_result_df.collect()[0].result

tokenized_df = aggregated_comments.select("class_name","combined_review_text",F.split(F.col("combined_review_text"), " ").alias("words"))
result_df = model.transform(tokenized_df)

def cosine_similarity(v1, v2):
    norm_v1 = float(v1.norm(2))
    norm_v2 = float(v2.norm(2))
    if norm_v1 == 0 or norm_v2 == 0:
        return 0.0
    return float(v1.dot(v2)) / (norm_v1 * norm_v2)

similarities = []
for row in result_df.collect():
    similarity = cosine_similarity(new_vector, row.result)
    similarities.append((row.words, similarity, row.class_name, row.combined_review_text))


In [None]:
similarities_df = spark.createDataFrame(similarities, ["words", "similarity", "class_name", "combined_review_text"])
similarities_df = similarities_df.select("class_name", "similarity") \
    .orderBy("similarity", ascending=False)

In [None]:
similarities_df.show()

+----------+------------------+
|class_name|        similarity|
+----------+------------------+
|    Lounge|0.8240463035813141|
|      Swim| 0.818630007178348|
|    Lounge|0.8118630790012138|
|    Lounge|0.8013311285498402|
| Intimates|0.7771936148549765|
|    Shorts|0.7768788095691238|
|    Shorts|0.7732644691660442|
|  Layering|0.7681678930943401|
|      Swim|0.7664632296083066|
|    Lounge|0.7630835723484342|
|    Lounge|0.7588442006808428|
|    Lounge|0.7584153439790857|
|    Lounge|0.7571036187746352|
|      Swim|0.7562669667408575|
|     Knits|0.7556320689179882|
|    Lounge|0.7545598471439079|
| Intimates|0.7542137216295989|
|     Sleep|0.7538733866297541|
|    Shorts|0.7505879241667859|
|     Sleep|0.7489705467021892|
+----------+------------------+
only showing top 20 rows



In [None]:
top_5_similarities = similarities_df.limit(5)

for row in top_5_similarities.collect():
    print(f"Recommended Products: '{row.class_name}'")

Recommended Products: 'Lounge'
Recommended Products: 'Swim'
Recommended Products: 'Lounge'
Recommended Products: 'Lounge'
Recommended Products: 'Intimates'
