In [1]:
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, count
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder \
    .appName("GroceryRecommendationSystem") \
    .getOrCreate()


In [2]:
# Load the dataset
file_path = "grocery-1.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# List of item columns
items_columns = ["Item 1", "Item 2", "Item 3", "Item 4", "Item 5"]

# Explode the items columns to create a user-item interaction dataset
exploded_df = None
for col_name in items_columns:
    temp_df = df.select(col_name).withColumnRenamed(col_name, "item").withColumn("transactionId", lit(col_name))
    if exploded_df is None:
        exploded_df = temp_df
    else:
        exploded_df = exploded_df.union(temp_df)

In [3]:
# Remove null items
exploded_df = exploded_df.filter(col("item").isNotNull())

# Indexing string columns to numeric for user and item IDs
indexer_item = StringIndexer(inputCol="item", outputCol="itemIndex")
indexer_transaction = StringIndexer(inputCol="transactionId", outputCol="transactionIndex")
df_indexed = indexer_item.fit(exploded_df).transform(exploded_df)
df_indexed = indexer_transaction.fit(df_indexed).transform(df_indexed)

# Create a column for implicit feedback (binary interaction)
df_indexed = df_indexed.withColumn("rating", lit(1))

# Prepare training and test data
(training, test) = df_indexed.randomSplit([0.8, 0.2])


In [4]:
# Build the ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="transactionIndex",
    itemCol="itemIndex",
    ratingCol="rating",
    coldStartStrategy="drop"
)
model = als.fit(training)

# Evaluate the model
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.099131669854929


In [6]:
# Generate top N item recommendations for a given item name
def get_recommendations(item_name, top_n=5):
    item_index = df_indexed.filter(col("item") == item_name).select("itemIndex").distinct().collect()
    if not item_index:
        print(f"No data found for item: {item_name}")
        return
    item_index = item_index[0][0]

    # Get user IDs that interacted with the given item
    user_ids = df_indexed.filter(col("itemIndex") == item_index).select("transactionIndex").distinct()

    # Generate recommendations for these users
    user_recommendations = model.recommendForUserSubset(user_ids, top_n)

    # Collect recommendations for the given item
    rec_item_ids = set()
    for user_rec in user_recommendations.collect():
        recs = user_rec.recommendations
        for rec in recs:
            rec_item_ids.add(rec.itemIndex)

    # Convert item indices back to item names
    rec_item_names = [row['item'] for row in df_indexed.filter(col("itemIndex").isin(rec_item_ids)).select("item").distinct().collect()]

    if rec_item_names:
        random_recommendations = random.sample(rec_item_names, min(len(rec_item_names), top_n))
        print(f"Recommendations for {item_name}: {random_recommendations}")
    else:
        print(f"No recommendations found for item: {item_name}")


In [7]:
from pyspark.sql.functions import col, count, desc, sum # type: ignore

def get_most_bought_items(top_n=15):
    most_bought_items = None
    for col_name in items_columns:
        if most_bought_items is None:
            most_bought_items = df.groupBy(col(col_name).alias("item")).agg(count("*").alias("count"))
        else:
            most_bought_items = most_bought_items.union(df.groupBy(col(col_name).alias("item")).agg(count("*").alias("count")))
    
    # Filter out rows where item is None
    most_bought_items = most_bought_items.filter(col("item").isNotNull())
    
    # Convert count column to integer
    most_bought_items = most_bought_items.withColumn("count", col("count").cast("int"))
    
    # Aggregate counts and remove duplicates
    most_bought_items = most_bought_items.groupBy("item").agg(sum("count").alias("total_count"))
    
    # Order by total count in descending order
    most_bought_items = most_bought_items.orderBy(desc("total_count")).limit(top_n).collect()
    
    # Print the top N most bought items
    print(f"Top {top_n} Most bought items:")
    for idx, item in enumerate(most_bought_items, start=1):
        print(f"{idx}. {item['item']} - {item['total_count']} transactions")

In [8]:
# Example usage
item_name = input('Enter the item name:')  # Replace with your input item name
get_recommendations(item_name, top_n=5)

Recommendations for sugar: ['brandy', 'decalcifier', 'female sanitary products', 'salad dressing', 'rice']


In [9]:
# Call the function to print the top 15 most bought items
get_most_bought_items(top_n=15)

Top 15 Most bought items:
1. whole milk - 2341 transactions
2. other vegetables - 1799 transactions
3. rolls/buns - 1421 transactions
4. soda - 1185 transactions
5. yogurt - 1143 transactions
6. root vegetables - 1039 transactions
7. tropical fruit - 1027 transactions
8. sausage - 924 transactions
9. citrus fruit - 812 transactions
10. pip fruit - 737 transactions
11. bottled water - 720 transactions
12. canned beer - 636 transactions
13. pastry - 608 transactions
14. frankfurter - 580 transactions
15. pork - 567 transactions
