In [None]:
# Connection to MongoDB Compass

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("ConnectionMongo") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=log4j.properties")\
    .getOrCreate()

In [None]:
# Load dataset

df = spark.read.csv('sample.csv', inferSchema=True, header=True)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
# Modifying schema

from pyspark.sql.types import StringType, StructField, StructType, TimestampType, DoubleType

In [None]:
schema1 = StructType([StructField('event_time', TimestampType(), True),

                      StructField('event_type', StringType(), True),

                      StructField('product_id', StringType(), True),

                      StructField('category_id', StringType(), True),

                      StructField('category_code', StringType(), True),

                      StructField('brand', StringType(), True),

                      StructField('price', DoubleType(), True),

                      StructField('user_id', StringType(), True),

                      StructField('user_session', StringType(), True)])

In [None]:
final_struct = StructType(fields=schema1)

In [None]:
df = spark.read.csv('sample.csv', schema=schema1, header=True)

In [None]:
df.printSchema()

In [None]:
df.show()

In [None]:
df.columns

In [None]:
# Clean data

df = df.na.drop(how='any')

In [None]:
##### EDA

In [None]:
# Summary price

df.describe("price").show()

In [None]:
# Mean price per brand
mean_brand = df.groupBy("brand").mean()

mean_brand.show()

In [None]:
# Number of not unique products per Brand
products_per_brand = df.groupBy("brand").count()
products_per_brand.show()

In [None]:
# Saving precomputation in MongoDB Compass
products_per_brand.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.NumUniqueProducts")\
    .save()

In [None]:
# Number of unique products for: product_id, category_code, brand, user_id

In [None]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [None]:
# product_id
numero_prodotti_distinti = df.select(countDistinct("product_id"))
numero_prodotti_distinti.show()

In [None]:
# category_code
numero_categorie_distinte = df.select(countDistinct("category_code"))
numero_categorie_distinte.show()

In [None]:
# brand
numero_brand = df.select(countDistinct("brand"))
numero_brand.show()

In [None]:
# user_id
numero_utenti = df.select(countDistinct("user_id"))
numero_utenti.show()

In [None]:
# Count for each event

In [None]:
count_events = df.groupBy("event_type").count()
count_events.show()

In [None]:
count_events.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.NumEventTypes")\
    .save()

In [None]:
# PLOTS

In [None]:
import matplotlib.pyplot as plt

In [None]:
avg_brand_price = df.groupBy("brand").mean()

In [None]:
avg_brand_price.select('avg(price)').show()

In [None]:
sorted_df = avg_brand_price.orderBy(avg_brand_price['avg(price)'].desc())#.show()


In [None]:
# TOP 10 brand higher avg prices
# Get the first 10 values from each column
first_10_values = sorted_df.limit(10).select(*[sorted_df[c] for c in sorted_df.columns])

# Show the first 10 values from each column
first_10_values.show()

In [None]:
sorted_df = sorted_df.withColumnRenamed("avg(price)", "prezzo_medio")

In [None]:
sorted_df.show()

In [None]:
sorted_df.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.prezzoMedioBrandOrdinati")\
    .save()

In [None]:
# Getting values from dataframe 'avg_brand_price'. Columns "brand" and "avg(price)"
top10_avgprice = first_10_values.select("avg(price)").rdd.flatMap(lambda x: x).collect()
top10_expensives_brand = first_10_values.select("brand").rdd.flatMap(lambda x: x).collect()

In [None]:
plt.figure(figsize=(14, 8))
plt.bar(top10_expensives_brand,top10_avgprice) 
plt.xlabel('Brand')
plt.ylabel('Avg Price')
plt.title('Top 10 Most expensive brand and avg price', fontweight = "bold")
plt.xticks(rotation=90)
plt.grid(axis= 'y', linewidth= 0.3)
plt.show()

In [None]:
#####  #####  #####  #####  #####  #####  #####  #####  #####

In [None]:
# Trying to understand which is the brand with mnost purchases

In [None]:
# Getting a dataframe only where event is 'purchase'

df_purchase = df.filter((df["event_type"] == "purchase"))

In [None]:
df_purchase.show()

In [None]:
df_purchase.describe().show()

In [None]:
df_purchase.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.purchase")\
    .save()

In [None]:
# 'groupBy' on brand, then count

b_pur = df_purchase.groupBy("brand").count()

In [None]:
b_pur.show()

In [None]:
purchased_brands = b_pur.select("brand").rdd.flatMap(lambda x: x).collect()
number_of_purchases = b_pur.select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
plt.figure(figsize=(16, 10))
plt.bar(x = purchased_brands,height = number_of_purchases)
plt.xticks(rotation=45)
plt.title("Number of purchases per brand", fontweight= "bold")
plt.xlabel("Brand")
plt.ylabel("Number of purchased products")
plt.grid(axis= 'y', linewidth= 0.3)
plt.show()

In [None]:
# top 25 purchased brand 

In [None]:
sorted_df_pur = b_pur.orderBy(b_pur['count'].desc())

In [None]:
sorted_df_pur.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.numberPurchPerBrand")\
    .save()

In [None]:
# Get the first 10 values from each column
first_25_values = sorted_df_pur.limit(25).select(*[sorted_df_pur[c] for c in sorted_df_pur.columns])

# Show the first 10 values from each column
first_25_values.show()

In [None]:
top25_num = first_25_values.select("count").rdd.flatMap(lambda x: x).collect()
top25_brand = first_25_values.select("brand").rdd.flatMap(lambda x: x).collect()

In [None]:
plt.figure(figsize=(16, 10))
plt.bar(x = top25_brand,height = top25_num) 
plt.xticks(rotation=45)
plt.title("Top 25 Purchased Brand", fontweight= "bold")
plt.xlabel("Brand")
plt.ylabel("Number of product purchased")
plt.grid(axis= 'y', linewidth= 0.3)
plt.show()

In [None]:
df_purchase.show()

In [None]:
# Counting purchased categories

df_purchased_categories = df_purchase.groupBy("category_code").count()

In [None]:
df_purchased_categories.show()

In [None]:
df_purchased_categories.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.numberPurchCategories")\
    .save()

In [None]:
purchased_categories = df_purchased_categories.select("category_code").rdd.flatMap(lambda x: x).collect()
number_of_purchases = df_purchased_categories.select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
print(len(purchased_categories))

In [None]:
plt.figure(figsize=(14, 8))
plt.bar(x = purchased_categories,height = number_of_purchases) # Traccia i valori
plt.xticks(rotation=90)
plt.title("Number of purchases for each category", fontweight= "bold")
plt.xlabel("Category")
plt.ylabel("Number of purchases")
plt.grid(axis= 'y', linewidth= 0.3)
plt.show()

In [None]:
##  Let's what if 'event_type == "view"'

In [None]:
df_views = df.filter((df["event_type"] == "view"))

In [None]:
df_views.describe().show()

In [None]:
df_views.show()

In [None]:
df_views.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.views")\
    .save()

In [None]:
# count for each brand

views_brand_count = df_views.groupBy("brand").count()

In [None]:
views_brand_count.show()

In [None]:
views_brand_count.describe().show()

In [None]:
sorted_df_views_brand_count = views_brand_count.orderBy(views_brand_count['count'].desc())

In [None]:
sorted_df_views_brand_count.show()

In [None]:
sorted_df_views_brand_count.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.viewsPerBrandCount")\
    .save()

In [None]:
from pyspark.sql.functions import mean, avg

In [None]:
# Calculate the mean value of 'count' column
mean_views_per_brand = views_brand_count.agg({'count': 'avg'}).collect()[0][0]
print(mean_views_per_brand)

In [None]:
views_greater_mean = views_brand_count.filter((views_brand_count["count"] > mean_views_per_brand))

In [None]:
views_greater_mean.show()

In [None]:
viewed_brands = views_greater_mean.select("brand").rdd.flatMap(lambda x: x).collect()
number_of_views = views_greater_mean.select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
## BRAND PIù VISUALIZZATI

plt.figure(figsize=(14, 8))
plt.bar(x = viewed_brands,height = number_of_views)
plt.xticks(rotation=90)
plt.title("Number of views per brand", fontweight= "bold")
plt.xlabel("Brand")
plt.ylabel("Views")
plt.grid(axis= 'y', linewidth= 0.3)
plt.show()

In [None]:
########################################################################

In [None]:
# Obiettivo: Capire a quali orari avvengono più frequentemente le visualizzazioni (x = orario, y = numero di visualizzazioni )

In [None]:
df_time_views = df.filter((df["event_type"] == "view"))

In [None]:
df_time_views.show()

In [None]:
df_time_views.select(["event_time", 'event_type']).show()

In [None]:
from pyspark.sql.functions import hour, dayofmonth, month, year, col

In [None]:
# Estraiamo l'ora dal timestamp
# Estraiamo anche il giorno e il mese e l'anno, ponendoli in colonne separate
df_time = df_time_views.select(col("event_time"), 
     year(col("event_time")).alias("year"), 
     month(col("event_time")).alias("month"), 
     dayofmonth(col("event_time")).alias("day"),
                    hour(col("event_time")).alias("hour"))

In [None]:
df_time.show()

In [None]:
df_time_schema = df_time \
    .withColumn("year", col('year').cast(StringType()))

In [None]:
df_time_schema = df_time_schema \
    .withColumn("hour", col('hour').cast(StringType()))

In [None]:
df_time_schema = df_time_schema \
    .withColumn("month", col('month').cast(StringType()))

In [None]:
df_time_schema = df_time_schema \
    .withColumn("day", col('day').cast(StringType()))

In [None]:
df_time_schema.printSchema()

In [None]:
df_time.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.TimeViews")\
    .save()

In [None]:
df_time_schema.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.TimeViewsSchema")\
    .save()

In [None]:
df_time_schema.show()

In [None]:
df_time_schema_count = df_time_schema.groupBy('day').count()

In [None]:
df_time_schema_count.orderBy(df_time_schema_count['count'].desc()).show()

In [None]:
df_time.printSchema()

In [None]:
hours_count = df_time.groupBy('hour').count()

In [None]:
hours_count.describe().show()

In [None]:
hours_ordered_df = hours_count.orderBy(hours_count["hour"].asc())#.show()

In [None]:
hours_ordered_df.show()

In [None]:
hours_ordered_df.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.OrderedTimeViewsTotal")\
    .save()

In [None]:
hours = hours_ordered_df.select("hour").rdd.flatMap(lambda x: x).collect()
number_of_views_per_hour = hours_ordered_df.select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
plt.figure(figsize=(14, 8))
plt.plot(hours, number_of_views_per_hour, linestyle='-') # Traccia i valori
plt.xticks(hours)
plt.title("Numero di views per ora", fontweight= "bold")
plt.xlabel("Ora")
plt.ylabel("Views")
plt.grid(linewidth= 0.3)
plt.show()

In [None]:
# dataset 'df_purchase'
df_purchase.show()

In [None]:
from pyspark.sql.functions import hour, dayofmonth, month, year, col

In [None]:
# Get the hour from timestamp
# Extract year, month and day too, creating a variable for each one of them
df_time_pur = df_purchase.select(col("event_time"), 
     year(col("event_time")).alias("year"), 
     month(col("event_time")).alias("month"), 
     dayofmonth(col("event_time")).alias("day"),
                    hour(col("event_time")).alias("hour"))

In [None]:
df_time_pur.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.TimePurchase")\
    .save()

In [None]:
df_time_pur.show()

In [None]:
hours_count_pur = df_time_pur.groupBy('hour').count()

In [None]:
hours_count_pur.show()

In [None]:
hours_ordered_df_pur = hours_count_pur.orderBy(hours_count_pur["hour"].asc())#.show()

In [None]:
hours_ordered_df_pur.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.OrderedTimePurchaseTotal")\
    .save()

In [None]:
hours_ordered_df_pur.show()

In [None]:
hours_pur= hours_ordered_df_pur.select("hour").rdd.flatMap(lambda x: x).collect()
number_of_views_per_hour_pur = hours_ordered_df_pur.select("count").rdd.flatMap(lambda x: x).collect()

In [None]:
plt.figure(figsize=(14, 8))
plt.plot(hours_pur, number_of_views_per_hour_pur, linestyle='-')
plt.xticks(hours)
plt.title("Number of purchases per hour", fontweight= "bold")
plt.xlabel("Hour")
plt.ylabel("Purchases")
plt.grid(linewidth= 0.3)
plt.show()

In [None]:


plt.figure(figsize=(14, 8))
plt.plot(hours_pur, number_of_views_per_hour_pur, linestyle='-', color='red', label="Acquisti") # Traccia i valori
plt.plot(hours, number_of_views_per_hour, linestyle='-', color = 'blue', label='Views')
plt.title("Number of purchases and view per hour", fontweight= "bold")
plt.xlabel("Hour")
plt.ylabel("Number")
plt.xticks(hours)
plt.legend()
plt.grid(linewidth= 0.3)
plt.show()

In [None]:
############################################

In [None]:
# SEQUENTIAL PATTERN MINING

In [None]:
df.describe().show()

In [None]:
################################################

In [None]:
from pyspark.sql.functions import collect_list, struct

# Group by product_id and user_id and collect event_type and event_time into a list of structs
grouped_df = df.groupby('product_id', 'user_id').agg(collect_list(struct('event_type', 'event_time')).alias('event_info'))

# Create the desired list of dictionaries
associations = []

# Iterate over the grouped results and create the structure
for row in grouped_df.collect():
    product_id = row['product_id']
    user_id = row['user_id']
    event_info = row['event_info']

    # Create the list of events for each user
    user_events = []
    for event in event_info:
        event_type = event['event_type']
        event_time = event['event_time']

        # Create the dictionary structure for the event
        event_dict = {'event_type': event_type, 'event_time': event_time}
        user_events.append(event_dict)

    # Create the user dictionary
    user_dict = {f'{user_id}': user_events}

    # Create the product dictionary
    product_dict = {f'{product_id}': [user_dict]}

    # Add the dictionary to the associations list
    associations.append(product_dict)

# Print the resulting list of dictionaries
# for item in associations:
    # print(item)



In [None]:
import pandas as pd
from datetime import datetime


data = []

for assoc in associations:
    for key, value in assoc.items():
        for inner_dict in value:
            for inner_key, inner_value in inner_dict.items():
                event_times = []
                event_types = []
                for event in inner_value:
                    event_times.append(event['event_time'])
                    event_types.append([event['event_type']])
                data.append({
                    'product_id': key,
                    'user_id': inner_key,
                    'event_time': event_times,
                    'event_type': event_types
                })

associations_df = pd.DataFrame(data)
print(associations_df.head())

In [None]:
# Trasformiamolo per usarlo con PySpark

df_associations = spark.createDataFrame(associations_df)

In [None]:
df_associations.show()

In [None]:
df_associations.printSchema()

In [None]:
df_associations.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.ProdAssocTempiUserSequence")\
    .save()

In [None]:
###############

In [None]:
# Renaming the column 'event_types' in 'sequence'
df_associations = df_associations.withColumnRenamed("event_type", "sequence")

In [None]:
pr = df_associations.groupBy("sequence").count()

In [None]:
orderedCountSeq = pr.orderBy(pr["count"].desc())

In [None]:
### Possible sequences

orderedCountSeq.show(truncate=False) 

In [None]:
orderedCountSeq.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("spark.mongodb.output.uri", "mongodb://mongodb:27017/eCommerce.orderedCountSeq")\
    .save()

In [None]:
# Using findFrequentSequentialPatterns

from pyspark.ml.fpm import PrefixSpan

prefixSpan = PrefixSpan()

In [None]:
prefixSpan.getSequenceCol()

In [None]:
# Using PrefixSpan to find the most frequent patterns
prefix_span = PrefixSpan(minSupport=0.1, maxPatternLength=10)
result = prefix_span.findFrequentSequentialPatterns(df_associations).sort('sequence')

In [None]:
# Frequent patterns
result.show(truncate=False)