In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("appliances_similarity") \
    .config("spark.local.dir", "/media/backup") \
    .config("spark.executor.memory", "12g") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.cores", "2") \
    .config("spark.default.parallelism", "6") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()

# Step 1: Read and Clean Data

In [None]:
spark.conf.set('spark.sql.caseSensitive', True)

df = spark.read\
    .json("/media/backup/datasets/ENSA M2/meta_Appliances.json")\
    .select("asin", "category", "description", "title", "feature", "brand", "main_cat", "price", "imageURLHighRes")

In [3]:
from pyspark.sql.functions import col, regexp_extract, lit, when

# Define the regex pattern
pattern = r"\$\d+\.\d{2}"

# Create a new column with the extracted price
# regexp_extract returns an empty string if no match is found
df_with_extracted_price = df.withColumn(
    "extracted_price_temp",
    regexp_extract(col("price"), pattern, 0)
)

# Now, use when/otherwise to set the final 'cleaned_price' column
# If 'extracted_price_temp' is not empty, use its value.
# Otherwise (if no match was found by regexp_extract), set it to an empty string.
# Also handle original nulls in 'price' column, setting them to empty string as well.
df_cleaned_price = df_with_extracted_price.withColumn(
    "cleaned_price",
    when(col("price").isNull(), lit("")) # Handle original nulls first
    .when(col("extracted_price_temp") != lit(""), col("extracted_price_temp")) # If a match was found
    .otherwise(lit("")) # If no match (regexp_extract returned empty string or price was not null but not matched)
)

# Drop the temporary column
df = df_cleaned_price.drop("extracted_price_temp")

In [None]:
from pyspark.sql import functions as F

# Count duplicates per ASIN
duplicates = df.groupBy("asin") \
    .agg(F.count("*").alias("count")) \
    .filter(F.col("count") > 1) \
    .orderBy(F.desc("count"))

# Show duplicates (if any)
print(f"Total duplicate ASINs: {duplicates.count()}")

In [5]:
df_deduplicated_by_asin = df.dropDuplicates(["asin"])

In [6]:
from pyspark.sql.functions import col, coalesce, lit, concat_ws, lower, regexp_replace, row_number, collect_list

# Clean nulls and convert arrays to strings
# Convert ALL array columns to strings first
df_clean = df.withColumn(
    "category",
    coalesce(concat_ws(" ", col("category")), lit(""))  # Array → String
).withColumn(
    "description",
    coalesce(concat_ws(" ", col("description")), lit(""))  # Fix here
).withColumn(
    "title",
    coalesce(col("title"), lit(""))  # Already a string
).withColumn(
    "feature",
    coalesce(concat_ws(" ", col("feature")), lit(""))  # Array → String
).select(
   c
)

# Step 2: Text Preprocessing

In [7]:
from pyspark.sql.functions import col, lower, regexp_replace, trim

# Combine text columns
df_combined = df_clean.withColumn(
    "combined_text",
    concat_ws(" ", col("category"), col("description"), col("title"), col("feature"))
)

# Enhanced text cleaning
df_clean_text = df_combined.withColumn(
    "cleaned_text",
    trim(  # Final whitespace trim
        regexp_replace(  # Replace multiple spaces
            regexp_replace(  # Remove non-alphanumeric
                regexp_replace(  # Remove HTML tags
                    regexp_replace(  # Replace &gt;
                        regexp_replace(  # Replace &lt;
                            regexp_replace(  # Replace &amp;
                                lower(col("combined_text")),
                                "&amp;", "&"
                            ),
                            "&lt;", "<"
                        ),
                        "&gt;", ">"
                    ),
                    "<[^>]+>", ""  # Remove HTML tags
                ),
                "[^a-zA-Z0-9\\s]", ""  # Non-alphanumeric removal
            ),
            "\\s+", " "  # Multiple spaces to single
        )
    )
).select("asin", "cleaned_text")

# Step 3: TF-IDF Pipeline

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenization and stopwords removal remains the same
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="tokens")
df_tokens = tokenizer.transform(df_clean_text)

remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
df_filtered = remover.transform(df_tokens)

# Better term filtering using CountVectorizer
cv = CountVectorizer(
    inputCol="filtered_tokens",
    outputCol="raw_features",
    vocabSize=2**10,  # Match your numFeatures
    minDF=5,         # Ignore terms in <5 docs
    maxDF=0.9        # Ignore terms in >90% docs
)
cv_model = cv.fit(df_filtered)
df_tf = cv_model.transform(df_filtered)

# IDF calculation
idf = IDF(inputCol="raw_features", outputCol="tf_idf_features")
idf_model = idf.fit(df_tf)
df_tfidf = idf_model.transform(df_tf)

In [None]:
print(f"Total rows before removing zero vectors: {df_tfidf.count()}")

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# Check if vector has non-zero values
has_non_zero = udf(lambda v: v.numNonzeros() > 0, BooleanType())  # Add () to call the method

# Filter empty vectors
df_tfidf = df_tfidf.filter(has_non_zero("tf_idf_features"))

In [None]:
# Check results
print(f"Total rows after filtering: {df_tfidf.count()}")

# Step 4: Jaccard Similarity

In [12]:
from pyspark.ml.feature import MinHashLSH

# 1. Switch to MinHashLSH for cosine/Jaccard similarity
mh = MinHashLSH(
    inputCol="tf_idf_features",
    outputCol="hashes",
    numHashTables=15  # More tables = better recall
)
model = mh.fit(df_tfidf)

In [None]:
# 3. Full processing with memory optimizations
df_tfidf.cache().count()  # Force cache

# Process in larger chunks (20% each)
chunks = df_tfidf.randomSplit([0.2] * 5)

for i, chunk in enumerate(chunks):
    print(f"Processing chunk {i + 1}/{len(chunks)}")

    # Process against full dataset but limit output
    similar_chunk = model.approxSimilarityJoin(
        chunk.limit(10000),  # Safety limit
        df_tfidf,
        0.6,
        distCol="jaccardDist"
    ).filter(col("datasetA.asin") != col("datasetB.asin"))

    # Write with overwrite to save space
    similar_chunk.write.mode("overwrite").parquet(f"/media/backup/datasets/ENSA M2/BigData/output/chunk_{i}")

In [None]:
from pyspark.sql.functions import col, least, greatest, row_number, collect_list
from pyspark.sql.window import Window # Import Window

# --- 1. Read Raw Data ---
# Make sure this path correctly points to your chunked Parquet files.
input_parquet_path = "/media/backup/datasets/ENSA M2/BigData/output/*"
similar_items_raw = spark.read.parquet(input_parquet_path)

print("Original schema of the 'similar_items_raw' DataFrame:")
similar_items_raw.printSchema()

# --- 2. Flatten and Alias Columns ---
# This creates top-level columns 'asin_A', 'asin_B', and 'distance'
items_flattened = similar_items_raw.select(
    col("datasetA.asin").alias("asin_A"),
    col("datasetB.asin").alias("asin_B"),
    col("jaccardDist").alias("distance") # Ensure this matches your actual column name
)

print("\nSchema after flattening: 'items_flattened' DataFrame:")
items_flattened.printSchema()

# --- 3. Deduplicate Pairs (including reciprocal pairs) ---
# This ensures each unique (item1, item2) pair appears only once.
# The result is stored in 'unique_similar_items'.
unique_similar_items = items_flattened.withColumn(
    "temp_asin_A", least(col("asin_A"), col("asin_B"))
).withColumn(
    "temp_asin_B", greatest(col("asin_A"), col("asin_B"))
).dropDuplicates(["temp_asin_A", "temp_asin_B"]) \
 .drop("temp_asin_A", "temp_asin_B") # Drop temporary columns

print("\nSchema after deduplication: 'unique_similar_items' DataFrame:")
unique_similar_items.printSchema()

# --- 4. Select and Rename for Ranking ---
# NOW, use 'unique_similar_items' which has 'asin_A', 'asin_B', and 'distance'.
# Rename them for clarity in the ranking step.
ranked_similar = unique_similar_items.select(
    col("asin_A").alias("asin"),             # Rename to 'asin' for partitioning
    col("asin_B").alias("similar_asin"),     # Rename to 'similar_asin'
    col("distance").alias("jaccard_distance") # Use the 'distance' alias from above
)

print("\nSchema for ranking: 'ranked_similar' DataFrame:")
ranked_similar.printSchema()

# --- 5. Define Window and Add Rank ---
# Define the window using the new column names ('asin' and 'jaccard_distance')
window = Window.partitionBy("asin").orderBy(col("jaccard_distance").asc())

# Add rank and filter to get top N similar items
ranked_similar = ranked_similar.withColumn("rank", row_number().over(window)) \
    .filter(col("rank") <= 20) # Keeping top 20 similar items

# --- 6. Aggregate Results ---
# Group by the primary ASIN and collect the list of top similar ASINs
final_output = ranked_similar.groupBy("asin") \
    .agg(collect_list("similar_asin").alias("similar_items")) # Renamed to 'similar_items' as you requested

print("\nSchema of the final output DataFrame:")
final_output.printSchema()

print("\nSample of final output (first 20 rows):")
final_output.show(20, truncate=False)

# --- 7. Visualize Matches for Sample Product (Optional) ---
# Ensure 'display' is available if you're in a notebook environment like Databricks or Colab.
# If not, use .show() or .toPandas() and then print()
try:
    sample_asins_df = ranked_similar.limit(25).toPandas()
    print("\nTop Matches Example (for ranking validation):")
    from IPython.display import display # For display in notebooks
    display(sample_asins_df[['asin', 'similar_asin', 'jaccard_distance', 'rank']])
except Exception as e:
    print(f"\nCould not display sample: {e}. If not in a notebook, use .show() or .toPandas().")
    ranked_similar.show(25, truncate=False)


print("\nFinal DataFrame 'final_output' has the structure {'asin': string, 'similar_items': list<string>}")

In [None]:
# Assuming 'final_output' is your DataFrame to export
output_path_parquet = "/media/backup/datasets/ENSA M2/BigData/project/similar_products_parquets"

final_output.write.mode("overwrite").parquet(output_path_parquet)

print(f"DataFrame successfully exported to Parquet at: {output_path_parquet}")

In [None]:
# Save to Parquet
output_path_parquet = "/media/backup/datasets/ENSA M2/BigData/project/products_metadata_parquets" # Or "s3a://bucket/path/..."
df.write.mode("overwrite").parquet(output_path_parquet)
print(f"Filtered DataFrame exported to Parquet at: {output_path_parquet}")