In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, regexp_replace, sum, when, sum as spark_sum, avg, rank, desc, explode, split
from pymongo import MongoClient
from pyspark.sql.window import Window
import logging

In [2]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [3]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Product categories") \
    .config("spark.executor.memory", "1g") \
    .getOrCreate()

In [4]:
# Define the HDFS directories
product_data_dir = "hdfs://namenode:9000/csv_files/products"
sales_data_dir = "hdfs://namenode:9000/csv_files/interactions"



In [5]:

# Create a static DataFrame to infer the schema for customer data
try:
        # Create a static DataFrame to infer the schema for interaction data
    sales_static_df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .load(sales_data_dir)
    sales_static_df.printSchema()
    #interaction_static_df.printSchema()
    # Create a streaming DataFrame for interaction data
   
    # Create a static DataFrame to infer the schema for product data
    product_static_df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .load(product_data_dir)
    product_static_df.printSchema()
    #interaction_static_df.printSchema()
    
except Exception as e:
    logger.error(f"Error reading static data: {e}")
    spark.stop()
    raise

ERROR:__main__:Error reading static data: [PATH_NOT_FOUND] Path does not exist: hdfs://namenode:9000/sales.


AnalysisException: [PATH_NOT_FOUND] Path does not exist: hdfs://namenode:9000/sales.

In [None]:
# Create a streaming DataFrame for customer data
try:
     # Create a streaming DataFrame for interaction data
    product_streaming_df = spark.readStream \
        .schema(product_static_df.schema) \
        .option("header", "true") \
        .csv(product_data_dir)
    sales_streaming_df = spark.readStream \
        .schema(sales_static_df.schema) \
        .option("header", "true") \
        .csv(sales_data_dir)
    
except Exception as e:
    logger.error(f"Error creating streaming DataFrame: {e}")
    spark.stop()
    raise

In [None]:
# MongoDB connection setup
try:
    #client = MongoClient(mongo_host, mongo_port)
    client = MongoClient("mongodb://mongodb:27017")
    db = client['Products']
    coll_product_categories = db['product_categories']
    coll_product_categories_highst = db['product_categories_highst']

except Exception as e:
    logger.error(f"Error connecting to MongoDB: {e}")
    spark.stop()
    raise

In [None]:



def process_high_interaction_categories(sales_batch_df, batch_id):
   
  try:
    # Join sales and product dataframes on the product ID (Unique Id)
    joined_df = sales_batch_df.join(product_static_df, sales_batch_df["product id"] == product_static_df["Uniqe Id"])
    
    # Split the Category column and explode into individual rows
    exploded_df = joined_df.withColumn("Category", explode(split(col("Category"), " \\| ")))

     # Group by Category and count interactions
    category_interactions = exploded_df.groupBy("Category").agg(
        count("product id").alias("TotalInteractions")
    )
    # Order by TotalInteractions in descending order
    category_interactions = category_interactions.orderBy(col("TotalInteractions").desc())
   
    # Debug: Show the grouped DataFrame
    

    # Filter categories with more than 1000 interactions
    high_interaction_categories = category_interactions.filter(col("TotalInteractions") > 1000)
    
    # Debug: Show the filtered DataFrame
    
    category_interactions_list = [row.asDict() for row in category_interactions.collect()]    
    high_interaction_categories_list = [row.asDict() for row in high_interaction_categories.collect()] 
    if category_interactions_list:
          coll_product_categories.insert_many(category_interactions_list)
          category_interactions.show()
          logger.info(f"Batch {batch_id} processed category_interactions and inserted into MongoDB")
    else:
          logger.info(f"Batch {batch_id} processed but no category_interactions found")
    if high_interaction_categories_list:
          coll_product_categories_highst.insert_many(high_interaction_categories_list)
          high_interaction_categories.show()
          logger.info(f"Batch {batch_id} processed high_interaction_categories and inserted into MongoDB")
    else:
          logger.info(f"Batch {batch_id} processed but no high_interaction_categories found")
    
  
  except Exception as e:
        logger.error(f"Error processing batch {batch_id}: {e}")
              

In [None]:
# Write the streaming DataFrame to the console using the function
try:
   # Write the streaming DataFrame to the console using the function
   query_high_interaction_categories = sales_streaming_df.writeStream \
    .outputMode("append") \
    .foreachBatch(lambda batch_df, batch_id: process_high_interaction_categories(batch_df, batch_id)) \
    .start()
    # Await termination
   query_high_interaction_categories.awaitTermination()
except Exception as e:
    logger.error(f"Error in streaming query: {e}")
finally:
    # Stop Spark session
    spark.stop()
