In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [32]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, StringType
from pyspark.sql.functions import col, dayofweek, month, hour, from_unixtime, udf
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import expr
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors, VectorUDT

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline
from transformers import AutoTokenizer, AutoModel
import torch

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AmazonReviews")
         .config("spark.driver.memory", "6g")  
         .config("spark.executor.memory", "6g")  
         .config("spark.executor.instances", "2") 
         .getOrCreate())

# check spark session is correctly initialized
if spark is None:
    raise RuntimeError("SparkSession not initialized")

review_filepath = './data/Gift_Cards.jsonl'
metadata_filepath = './data/meta_Gift_Cards.jsonl'

reviews_df = spark.read.json(review_filepath)
metadata_df = spark.read.json(metadata_filepath)

# filter reviews with rating > 3
reviews_df = reviews_df.filter(col("rating") > 3)

In [33]:
print(reviews_df.select("parent_asin").distinct().count())
print(metadata_df.select("parent_asin").distinct().count())

                                                                                

1053
1137


In [34]:
# reviews_df.printSchema()
# metadata_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)



In [36]:
# fill NaN values with 0
reviews_df = reviews_df.fillna(0)
metadata_df = metadata_df.fillna(0)

# reviews_df.printSchema()
# metadata_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = false)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)

root
 |-- author: string (nullable = true)
 |-- average_rating: double (nullable = false)
 |-- bought_together: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-

In [37]:
reviews_df = reviews_df.repartition(10)
metadata_df = metadata_df.repartition(10)

# cache
reviews_df.cache()
metadata_df.cache()

# # checkpoint to truncate the lineage of df and prevent long lineage issues
# spark.sparkContext.setCheckpointDir("/path/to/checkpoint/dir")
# reviews_df = reviews_df.checkpoint()
# metadata_df = metadata_df.checkpoint()

24/07/24 12:33:59 WARN CacheManager: Asked to cache already cached data.


DataFrame[author: string, average_rating: double, bought_together: string, categories: array<string>, description: array<string>, details: struct<Age Range (Description):string,Assembly required:string,Batteries required:string,Best Sellers Rank:struct<For Her:bigint,Gift Card Holders:bigint,Gift Cards:bigint,Kids:bigint,Thank You & Appreciation:bigint>,Brand:string,Brand Name:string,Capacity:string,Color:string,Compatible Devices:string,Cuisine:string,Date First Available:string,Department:string,Domestic Shipping:string,Fabric Type:string,Flavor:string,Form Factor:string,Grade Rating:string,Import:string,Included Components:string,International Shipping:string,Is Autographed:string,Is Discontinued By Manufacturer:string,Item Form:string,Item Weight:string,Item model number:string,Manufacturer:string,Manufacturer Part Number:string,Manufacturer recommended age:string,Material:string,Material Type:string,Material Type Free:string,Memory Storage Capacity:string,Number of Items:string,Nu

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, ArrayType
from pyspark.sql.functions import col, dayofweek, month, from_unixtime, udf
import torch
from transformers import AutoTokenizer, AutoModel

# Define schema for reviews
reviews_schema = StructType([
    StructField("asin", StringType(), True),
    StructField("overall", DoubleType(), True),
    StructField("reviewText", StringType(), True),
    StructField("reviewTime", StringType(), True),
    StructField("reviewerID", StringType(), True),
    StructField("reviewerName", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("unixReviewTime", LongType(), True),
    StructField("rating", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("title", StringType(), True),
    StructField("timestamp", LongType(), True)
])

# Define schema for metadata
metadata_schema = StructType([
    StructField("asin", StringType(), True),
    StructField("title", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("brand", StringType(), True),
    StructField("features", StringType(), True),
    StructField("description", StringType(), True),
    StructField("details", StringType(), True)
])

# Initialize Spark session with increased memory settings
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AmazonReviews")
         .config("spark.driver.memory", "8g")  
         .config("spark.executor.memory", "8g")  
         .config("spark.executor.instances", "2")
         .config("spark.driver.maxResultSize", "4g")
         .config("spark.memory.offHeap.enabled", True)
         .config("spark.memory.offHeap.size", "4g")
         .config("spark.storage.memoryFraction", "0.8")
         .config("spark.shuffle.memoryFraction", "0.4")
         .getOrCreate())

# Check if Spark session is correctly initialized
if spark is None:
    raise RuntimeError("SparkSession not initialized")

# Read JSON files with the defined schema
reviews_df = spark.read.schema(reviews_schema).json(review_filepath)
metadata_df = spark.read.schema(metadata_schema).json(metadata_filepath)

# Filter reviews with rating > 3
reviews_df = reviews_df.filter(col("rating") > 3)

print(reviews_df.select("asin").distinct().count())
print(metadata_df.select("asin").distinct().count())

# Fill NaN values with 0
reviews_df = reviews_df.fillna(0)
metadata_df = metadata_df.fillna(0)

reviews_df.printSchema()
metadata_df.printSchema()

reviews_df = reviews_df.repartition(10)
metadata_df = metadata_df.repartition(10)

# Cache DataFrames
reviews_df.cache()
metadata_df.cache()

# Convert timestamp from ms to s and then to timestamp type
reviews_df = reviews_df.withColumn("timestamp", from_unixtime(col("unixReviewTime") / 1000).cast("timestamp"))

# Extract temporal features
reviews_df = reviews_df.withColumn("day_of_week", dayofweek(col("timestamp")))
reviews_df = reviews_df.withColumn("month", month(col("timestamp")))

# Drop video and image columns
metadata_df = metadata_df.drop('video', 'image')

# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen1.5-0.5B')
model = AutoModel.from_pretrained('Qwen/Qwen1.5-0.5B')

def generate_embeddings(texts):
    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).numpy().tolist()
    return embeddings

generate_embeddings_udf = udf(lambda x: generate_embeddings(x), ArrayType(DoubleType()))

# Generate embeddings for textual features using mapPartitions
def process_partition(partition):
    for row in partition:
        row["text_embedding"] = generate_embeddings(row["text"])
        row["title_embedding"] = generate_embeddings(row["title"])
        yield row

reviews_rdd = reviews_df.rdd.mapPartitions(process_partition)
reviews_df = spark.createDataFrame(reviews_rdd, schema=reviews_df.schema)

# Apply similar processing for other columns as needed
def process_metadata_partition(partition):
    for row in partition:
        row["features_embedding"] = generate_embeddings(row["features"])
        row["description_embedding"] = generate_embeddings(row["description"])
        row["details_embedding"] = generate_embeddings(row["details"])
        yield row

metadata_rdd = metadata_df.rdd.mapPartitions(process_metadata_partition)
metadata_df = spark.createDataFrame(metadata_rdd, schema=metadata_df.schema)

# Show resulting DataFrames
reviews_df.show()
metadata_df.show()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, ArrayType
from pyspark.sql.functions import col, dayofweek, month, hour, from_unixtime, udf
import torch
from transformers import AutoTokenizer, AutoModel

# Define schema for reviews
reviews_schema = StructType([
    StructField("asin", StringType(), True),
    StructField("overall", DoubleType(), True),
    StructField("reviewText", StringType(), True),
    StructField("reviewTime", StringType(), True),
    StructField("reviewerID", StringType(), True),
    StructField("reviewerName", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("unixReviewTime", LongType(), True),
    StructField("rating", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("title", StringType(), True),
    StructField("timestamp", LongType(), True)
])

# Define schema for metadata
metadata_schema = StructType([
    StructField("asin", StringType(), True),
    StructField("title", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("brand", StringType(), True),
    StructField("features", StringType(), True),
    StructField("description", StringType(), True),
    StructField("details", StringType(), True)
])

# Initialize Spark session with increased memory settings
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AmazonReviews")
         .config("spark.driver.memory", "6g")
         .config("spark.executor.memory", "6g")
         .config("spark.executor.instances", "2")
         .getOrCreate())

# Check if Spark session is correctly initialized
if spark is None:
    raise RuntimeError("SparkSession not initialized")

# Read JSON files with the defined schema
reviews_df = spark.read.schema(reviews_schema).json(review_filepath)
metadata_df = spark.read.schema(metadata_schema).json(metadata_filepath)

# Filter reviews with rating > 3
reviews_df = reviews_df.filter(col("rating") > 3)

# print(reviews_df.select("asin").distinct().count())
# print(metadata_df.select("asin").distinct().count())

# Fill NaN values with 0
reviews_df = reviews_df.fillna(0)
metadata_df = metadata_df.fillna(0)

reviews_df.printSchema()
metadata_df.printSchema()

reviews_df = reviews_df.repartition(10)
metadata_df = metadata_df.repartition(10)

# Cache DataFrames
reviews_df.cache()
metadata_df.cache()

# Convert timestamp from ms to s and then to timestamp type
reviews_df = reviews_df.withColumn("timestamp", from_unixtime(col("unixReviewTime") / 1000).cast("timestamp"))

# Extract temporal features
reviews_df = reviews_df.withColumn("day_of_week", dayofweek(col("timestamp")))
reviews_df = reviews_df.withColumn("month", month(col("timestamp")))

# Drop video and image columns
metadata_df = metadata_df.drop('video', 'image')

# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen1.5-0.5B')
model = AutoModel.from_pretrained('Qwen/Qwen1.5-0.5B')

def generate_embeddings(texts):
    inputs = tokenizer(texts, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1).numpy().tolist()
    return embeddings

generate_embeddings_udf = udf(lambda x: generate_embeddings(x), ArrayType(DoubleType()))

# Generate embeddings for textual features using mapPartitions
def process_partition(partition):
    for row in partition:
        row["text_embedding"] = generate_embeddings(row["text"])
        row["title_embedding"] = generate_embeddings(row["title"])
        yield row

reviews_rdd = reviews_df.rdd.mapPartitions(process_partition)
reviews_df = spark.createDataFrame(reviews_rdd, schema=reviews_df.schema)

# Apply similar processing for other columns as needed
def process_metadata_partition(partition):
    for row in partition:
        row["features_embedding"] = generate_embeddings(row["features"])
        row["description_embedding"] = generate_embeddings(row["description"])
        row["details_embedding"] = generate_embeddings(row["details"])
        yield row

metadata_rdd = metadata_df.rdd.mapPartitions(process_metadata_partition)
metadata_df = spark.createDataFrame(metadata_rdd, schema=metadata_df.schema)

# Show resulting DataFrames
reviews_df.show()
metadata_df.show()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = false)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- rating: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- timestamp: long (nullable = true)

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- price: double (nullable = false)
 |-- brand: string (nullable = true)
 |-- features: string (nullable = true)
 |-- description: string (nullable = true)
 |-- details: string (nullable = true)



24/07/24 13:14:53 WARN CacheManager: Asked to cache already cached data.
24/07/24 13:14:53 WARN CacheManager: Asked to cache already cached data.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [None]:
# encode user_id and asin
indexer_user = StringIndexer(inputCol="user_id", outputCol="user_id_encoded")
indexer_asin = StringIndexer(inputCol="asin", outputCol="asin_encoded")

reviews_df = indexer_user.fit(reviews_df).transform(reviews_df)
reviews_df = indexer_asin.fit(reviews_df).transform(reviews_df)

# merging metadata with reviews based on parent_asin
df = reviews_df.join(metadata_df, on='parent_asin', how='left')

In [10]:
# sentiment analysis
def sentiment_analysis(text):
    # Placeholder function for sentiment analysis
    # Implement actual sentiment analysis using a model or library
    return {'label': 'POSITIVE', 'score': 0.9}  # Example output

sentiment_analysis_udf = udf(lambda text: sentiment_analysis(text)['score'] if sentiment_analysis(text)['label'] == 'POSITIVE' else -sentiment_analysis(text)['score'], FloatType())

df = df.withColumn("sentiment_score", sentiment_analysis_udf(col("text")))

# compute similarity scores
def compute_similarity(embeddings):
    similarity_matrix = cosine_similarity(np.vstack(embeddings))
    return similarity_matrix.mean(axis=1).tolist()

compute_similarity_udf = udf(compute_similarity, ArrayType(FloatType()))

df = df.withColumn("similarity_score", compute_similarity_udf(col("text_embedding")))

# define UDF to convert array to vector
def array_to_vector(array):
    return Vectors.dense(array)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

# convert array columns to vector columns
df = df.withColumn("text_embedding", array_to_vector_udf(col("text_embedding")))
df = df.withColumn("title_embedding", array_to_vector_udf(col("title_embedding")))
df = df.withColumn("features_embedding", array_to_vector_udf(col("features_embedding")))
df = df.withColumn("description_embedding", array_to_vector_udf(col("description_embedding")))
df = df.withColumn("details_embedding", array_to_vector_udf(col("details_embedding")))
df = df.withColumn("similarity_score", array_to_vector_udf(col("similarity_score")))


In [11]:
# combining features into a final feature set
features = ['user_id_encoded', 'asin_encoded', 'rating', 'verified_purchase', 
            'helpful_vote', 'day_of_week', 'month', 'sentiment_score', 'similarity_score']
embeddings = ['text_embedding', 'title_embedding', 'features_embedding', 'description_embedding', 'details_embedding']

# split into features and targets
feature_columns = features + embeddings
target_column = 'rating'

# create features, target df
features_df = df.select(*feature_columns)
target_df = df.select(target_column)

# features_df.show()
# target_df.show()        

In [14]:
data = features_df.withColumn("rating", target_df[target_column])

# assemble all feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# train a Random Forest model
rf = RandomForestClassifier(labelCol="rating", featuresCol="features", numTrees=100)
rf_model = rf.fit(data)

# get feature importances
importances = rf_model.featureImportances

# convert importances to a list of tuples (feature, importance)
feature_importances = [(feature, importance) for feature, importance in zip(feature_columns, importances.toArray())]

# sort the features by importance
feature_importances = sorted(feature_importances, key=lambda x: x[1], reverse=True)

# print the feature importances
for feature, importance in feature_importances:
    print(f"Feature: {feature}, Importance: {importance}")

# select the top N features
N = 10
selected_features = [feature for feature, importance in feature_importances[:N]]

# create a new DataFrame with only the selected features
selected_features_df = df.select(*selected_features)
selected_features_df.show()

# keep the target column
target_df = df.select(target_column)

24/07/24 10:37:12 WARN CacheManager: Asked to cache already cached data.        
24/07/24 10:37:12 WARN CacheManager: Asked to cache already cached data.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
24/07/24 10:37:21 ERROR Executor: Exception in task 3.0 in stage 65.0 (TID 186) 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/1l/fxxxl8p13clcn13ph4b5wcjc0000gn/T/ipykernel_48152/1915397370.py", line 42, in <lambda>
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/sql/udf.py", line 423, in wrapper
    return self(*args)
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 339, in __call__
    sc = get_active_spark_context()
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 2

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/1l/fxxxl8p13clcn13ph4b5wcjc0000gn/T/ipykernel_48152/1915397370.py", line 42, in <lambda>
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/sql/udf.py", line 423, in wrapper
    return self(*args)
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 339, in __call__
    sc = get_active_spark_context()
  File "/Users/vynguyen/anaconda3/envs/ecrawl/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 248, in get_active_spark_context
    raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.
