In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract
from pyspark.sql.functions import *
import glob
from pyspark.sql.types import *

# Initialize Spark with more memory
spark = SparkSession.builder \
    .appName("Amazon Reviews") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()
data_path = '/Volumes/One Touch/DMV_reviews/'
df1 = spark.read.json(f'{data_path}*.jsonl')

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/30 14:41:26 WARN Utils: Your hostname, Reehas-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 10.145.123.96 instead (on interface en0)
25/11/30 14:41:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/30 14:41:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/30 14:41:30 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /Volumes/One Touch/DMV_reviews/*.jsonl.
java.io.FileNotFoundException: File /Volumes/One Touch/DMV_reviews/*.jsonl does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFile

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Volumes/One Touch/DMV_reviews/*.jsonl. SQLSTATE: 42K03

In [None]:
# Schema of the dataset
print(f"\nSchema:")
df1.printSchema()

In [None]:
# Number of rows and columns
print(f"Total rows: {df1.count():,}")
print(f"Total columns: {len(df1.columns)}")

In [None]:
# Show Dataset
df1.head(5)

In [None]:
print("RATING DISTRIBUTION")
df1.groupBy('rating').count().orderBy('rating').show()

print("VERIFIED PURCHASE DISTRIBUTION")
df1.groupBy('verified_purchase').count().show()

In [None]:
# Null Values
"""
It says no null values, however, we have columns such as Title, Text, and Images that will have null values unless the data collected was
of reviews with title, text and images only. That isn't the case for this dataset, we get 0 null values simply because the columns are an
empty string and hence not null.
"""
from pyspark.sql.functions import *
df1.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df1.columns
]).show(vertical=True)

In [None]:
df1.groupBy('rating').count().orderBy('rating').show()

In [None]:
print("REVIEWS WITH RATING = 0 (if any):")
df1.filter(col('rating') == 0.0).show(5, truncate=50)

Okay, so from the above we can tell that the reviews with rating 0 are not empty. Additionally, Amazon's lowest rating is 1, not 0. This could indicate that there is an error. For example, the actual rating for item "B00JYH2EHC" is 2 stars (I know because I scrolled through all the ratings for that particular product on Amazon till I found a review left on August 22, 2018 with the Title -> No Good and Text -> Disappointed). Rows with the rating 0 are just 10, so it would make more sense just to drop them.

In [None]:
# Filtering for reviews in the range 1 to 5
df1 = df1[df1['rating']!=0]

In [None]:
print(f"Total rows: {df1.count():,}")

In [None]:
df1.groupBy('rating').count().orderBy('rating').show()

In [None]:
df1_clean = df1.dropDuplicates(['user_id', 'asin'])

In [None]:
print(f"Total rows: {df1_clean.count():,}")

In [None]:
import os

spark.sparkContext.setLogLevel("ERROR")

data_path = '/Users/reehaalthaf/Downloads/DMV_metadata/'
files = glob.glob(f'{data_path}*.jsonl')

print(f"Found {len(files)} metadata files\n")

# Define schema manually to avoid reading 'details' field
schema = StructType([
    StructField("parent_asin", StringType(), True),
    StructField("title", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("average_rating", DoubleType(), True),
    StructField("rating_number", IntegerType(), True),
    StructField("main_category", StringType(), True),
    StructField("store", StringType(), True),
])

dfs = []
for file in files:
    try:
        # Extract category from filename
        filename = os.path.basename(file)  # e.g., "meta_Electronics.jsonl"
        
        # Remove "meta_" and ".jsonl", then clean up
        category_from_file = filename.replace('meta_', '') \
                                    .replace('.jsonl', '') \
                                    .replace('_', ' ') \
                                    .replace('...', ' ')  # Handle truncated names
        
        # Read file
        df_temp = spark.read.schema(schema).json(file)
        
        # ADD COLUMN with filename category
        df_temp = df_temp.withColumn('category_from_file', lit(category_from_file))
        
        dfs.append(df_temp)
        print(f"{data_path}: {df_temp.count():,} products â†’ category: '{category_from_file}'")
        
    except Exception as e:
        print(f" {data_path}: {str(e)}")

        
# Combine all
if dfs:
    df2 = dfs[0]
    for df in dfs[1:]:
        df2 = df2.union(df)
    
    print(f"\n Total metadata loaded: {df2.count():,} products")
    
    # Rename title to avoid conflict
    df2 = df2.withColumnRenamed('title', 'product_title')
    
    print(" Metadata ready!")
    df2.printSchema()
    
else:
    print("\n No files loaded successfully")

In [None]:
print(f"Total rows: {df2.count():,}")
print(f"Total columns: {len(df2.columns)}")

In [None]:
df2.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df2.columns
]).show(vertical=True)

In [None]:
df_missing_price = df2.filter(col('price').isNull())

print(f"Total products with missing price: {df_missing_price.count():,}")

In [None]:
rows_with_both_null = df2.filter(
    col('price').isNull() & col('average_rating').isNull()
).count()
print(f"Rows with BOTH null:  {rows_with_both_null:,}")

In [None]:
distinct_categories = df2.select('category_from_file').distinct().orderBy('category_from_file')
print(f"Total distinct categories: {distinct_categories.count()}")
distinct_categories.show(100, truncate=False)

In [None]:
"""
We drop rows where the price values is null which would result in a majority of rows where average_rating are null values.
To eliminate remaining rows where average_rating value is null we will drop those rows as well.
"""
df2_clean = df2.filter(
    col('price').isNotNull() & 
    col('average_rating').isNotNull()
)

print(f"After:  {df2_clean.count():,} rows")
print(f"Dropped: {df2.count() - df2_clean.count():,} rows")

In [None]:
df2_clean.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df2_clean.columns
]).show(vertical=True)

In [None]:
df2_clean.filter(col('main_category').isNull()).show(50)


In [None]:
df2_clean.show(5)

In [None]:
df2_clean.write.mode('overwrite').parquet(
    '/Users/reehaalthaf/Downloads/metadata_clean.parquet'
)

In [None]:
df1_clean.write.mode('overwrite').parquet(
    '/Users/reehaalthaf/Downloads/reviews_clean.parquet'
)

In [None]:
from pyspark.sql.functions import *

# Join the two dataframes on parent_asin
combined = df1_clean.join(df2_clean, on='parent_asin', how='inner')

print(f" Combined dataset created!")
print(f"Total rows: {combined.count():,}")
print(f"Columns: {len(combined.columns)}")

# Show schema
print("\nSchema:")
combined.printSchema()

In [None]:

# Aggregate after joining
df_aggregated = combined.groupBy('parent_asin', 'price', 'main_category', 'category_from_file', 'user_id').agg(
    # Verified reviews
    avg(when(col('verified_purchase') == True, col('rating'))).alias('verified_avg_rating'),
    count(when(col('verified_purchase') == True, 1)).alias('verified_count'),
    
    # Non-verified reviews
    avg(when(col('verified_purchase') == False, col('rating'))).alias('nonverified_avg_rating'),
    count(when(col('verified_purchase') == False, 1)).alias('nonverified_count'),
    
    # Overall
    avg('rating').alias('overall_avg_rating'),
    count('*').alias('total_reviews')
).withColumn(
    'trust_gap_stars',
    col('nonverified_avg_rating') - col('verified_avg_rating')
).withColumn(
    'trust_gap_pct',
    ((col('nonverified_avg_rating') - col('verified_avg_rating')) / col('verified_avg_rating') * 100)
)

# Filter to products with enough data
df_aggregated = df_aggregated.filter(
    (col('verified_count') >= 5) &  # At least 5 verified reviews
    (col('nonverified_count') >= 5)  # At least 5 non-verified reviews
)

print(f"Aggregated to {df_aggregated.count():,} products")
df_aggregated.show(10)

df_aggregated.write.mode('overwrite').parquet(
    '/Users/reehaalthaf/Downloads/aggregated_trust_gap.parquet'
)

In [None]:
# Convert to Pandas
df_final = df_aggregated.toPandas()

# Save as CSV for Power BI
df_final.to_csv('/Users/reehaalthaf/Downloads/trust_gap.csv', index=False)
print(f"Saved {len(df_final):,} rows to CSV")
print(f"File size: ~{len(df_final) * 0.001:.1f} MB")

In [None]:
df.approxQuantile('price', [0.25, 0.5, 0.75], 0)

In [None]:
df_aggregated.select(
    avg('verified_count').alias('avg_verified_per_product'),
    avg('nonverified_count').alias('avg_nonverified_per_product'),
    count('*').alias('total_products')
).show()

In [None]:
combined = df1_clean.join(df2_clean, on='parent_asin', how='inner')
# Aggregate after joining
df_aggregated = combined.groupBy('parent_asin', 'price', 'main_category', 'category_from_file', 'user_id', 'store', 'timestamp', 'helpful_vote').agg(
    # Verified reviews
    avg(when(col('verified_purchase') == True, col('rating'))).alias('verified_avg_rating'),
    count(when(col('verified_purchase') == True, 1)).alias('verified_count'),
    
    # Non-verified reviews
    avg(when(col('verified_purchase') == False, col('rating'))).alias('nonverified_avg_rating'),
    count(when(col('verified_purchase') == False, 1)).alias('nonverified_count'),
    
    # Overall
    avg('rating').alias('overall_avg_rating'),
    count('*').alias('total_reviews')
).withColumn(
    'trust_gap_stars',
    col('nonverified_avg_rating') - col('verified_avg_rating')
).withColumn(
    'trust_gap_pct',
    ((col('nonverified_avg_rating') - col('verified_avg_rating')) / col('verified_avg_rating') * 100)
)

# Filter to products with enough data
df_aggregated = df_aggregated.filter(
    (col('verified_count') >= 5) &  # At least 5 verified reviews
    (col('nonverified_count') >= 5)  # At least 5 non-verified reviews
)

print(f"Aggregated to {df_aggregated.count():,} products")
df_aggregated.show(10)

df_aggregated.write.mode('overwrite').parquet(
    '/Users/reehaalthaf/Downloads/aggregated_trust_gap.parquet'
)

In [None]:
import os
import glob

# Check your data
data_path = '/Volumes/One Touch/DMV_reviews/'
files = glob.glob(f'{data_path}*.jsonl')

print(f"Number of files: {len(files)}")
print(f"First few files:")
for f in files[:5]:
    size = os.path.getsize(f) / (1024**3)  # Size in GB
    print(f"  {os.path.basename(f)}: {size:.2f} GB")

# Total size
total_size = sum(os.path.getsize(f) for f in files) / (1024**3)
print(f"\nTotal data size: {total_size:.2f} GB")