In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import plotly.express as px
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
# Create the Spark Session
spark = SparkSession.builder\
        .master("local")\
        .appName("Alkemy")\
        .getOrCreate()
spark

# Import the dataset
df = spark.read.csv("sales_data_all.csv", header=True, inferSchema=True)

# Create the quarter column based on date
df = df.withColumn("quarter", quarter(col("sale_date")))

# Calculate the profits
df = df.withColumn('profit', (df.regular_price - df.purchase_price) * df.quantity)

# Group by quarter and product id and sum quantities of products
df = df.groupBy('quarter', 'product_id').agg(F.sum('quantity').alias('quantity'))

22/12/09 17:21:50 WARN Utils: Your hostname, MacBook-Pro-di-Vincenzo.local resolves to a loopback address: 127.0.0.1; using 172.20.25.108 instead (on interface en0)
22/12/09 17:21:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/09 17:21:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/09 17:21:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


                                                                                

In [3]:
# Workaround for partition-by
def Fwhen(stringa, funzione):

    # Execute groupBy quarter, changing the aggregating function and the column
    exec("global temp; temp = df.groupBy('quarter').agg(F.{}('{}'))".format(funzione, stringa))

    # Sort the resulting dataset based on quarters
    df_sum = temp.sort(temp.quarter.asc())

    #Extract the sum/max/min of columns
    value_list = df_sum.rdd.map(lambda x: x[1]).collect()

    # For each quarter assign the respective values from value_list
    df_sum = df.withColumn('{}_{}'.format(funzione, stringa),
                           F.when((F.col("quarter") == '1'),    value_list[0])\
                           .when((F.col("quarter") == '2'),     value_list[1])\
                           .when((F.col("quarter") == '3'),     value_list[2])\
                           .otherwise(value_list[3]))

    return df_sum

# Find the sum of quantities by quarter to calculate the popularity index == quantity / sum(quantity)
df = Fwhen('quantity', 'sum')
df = df.select('quarter', 'product_id', F.col('quantity') / F.col('sum_quantity'))
df = df.withColumnRenamed("(quantity / sum_quantity)", "pop_index")

# Find the max of popularity by quarter to calculate the popularity index == quantity / sum(quantity)
df = Fwhen('pop_index', 'max')

# Find the min of popularity by quarter to calculate the popularity index == quantity / sum(quantity)
df = Fwhen('pop_index', 'min')

# Scale the popularity index and clean the dataframe
df = df.withColumn('pop_index_perc', (df.pop_index - df.min_pop_index) / (df.max_pop_index - df.min_pop_index))
df = df.drop("max_pop_index", "min_pop_index")

                                                                                

+-------+----------+--------------------+--------------------+
|quarter|product_id|           pop_index|      pop_index_perc|
+-------+----------+--------------------+--------------------+
|      1|    127592|2.574333891105676...|0.017964071856287428|
|      1|    128705|0.001544600334663...| 0.11776447105788423|
|      1|    139926|2.059467112884541...|0.013972055888223553|
|      1|    156390|1.287166945552838...|0.007984031936127746|
|      1|    151974|2.059467112884541...|0.013972055888223553|
|      1|    159384|2.831767280216244...|0.019960079840319365|
|      1|    142551|1.029733556442270...|0.005988023952095808|
|      1|    109720|2.574333891105676...|                 0.0|
|      1|    140759|7.723001673317029E-5|0.003992015968063872|
|      1|    108936|2.316900501995109E-4| 0.01596806387225549|
|      1|    154248|4.118934225769082...|0.029940119760479042|
|      1|    157276|4.118934225769082...|0.029940119760479042|
|      1|    144694|1.287166945552838...|0.007984031936

In [4]:
# Group by product id and find the standard deviation for popularity index
df_sd = df.groupBy("product_id").agg(stddev("pop_index_perc"))
df_sd = df_sd.withColumnRenamed("stddev_samp(pop_index_perc)", "sd_pop_index")

# Sort the dataset based on st.dev. of popularity index
df_sd = df_sd.sort(df_sd.sd_pop_index.desc())

+----------+-------------------+
|product_id|       sd_pop_index|
+----------+-------------------+
|    112582| 0.5539892756386005|
|    110853|0.47662561703373113|
|    110675| 0.3799399603595993|
|    126707|  0.345275892846712|
|    160952|0.34415898467118106|
|    160649| 0.3045034473687332|
|    150667|0.29558476805015993|
|    156485|0.29336230550590225|
|    106100|0.29217946230333314|
|    131910| 0.2921512393217598|
+----------+-------------------+
only showing top 10 rows



In [5]:
# Find the top 10 products whose popularity index changes over time
## THEY'RE CANDIDATE SESONAL PRODUCTS ##
prod_rank = df_sd.rdd.map(lambda x: x[0]).collect()
prod_rank = prod_rank[0:10]

In [6]:
df_top = df.filter(F.col("product_id").isin(prod_rank))
df_top.show()
print("Numbers of rows: ", df_top.count())

+-------+----------+--------------------+--------------------+
|quarter|product_id|           pop_index|      pop_index_perc|
+-------+----------+--------------------+--------------------+
|      4|    131910|2.809665248454684E-4| 0.03757225433526012|
|      1|    126707|1.029733556442270...|0.005988023952095808|
|      2|    106100|0.004430963998417513|  0.6208178438661711|
|      2|    156485|1.318739285243307...| 0.01486988847583643|
|      4|    110675|0.005498916271975596|  0.7890173410404625|
|      3|    110675|2.321681929575648...|0.022038567493112945|
|      4|    110853|4.013807497792406E-5|0.002890173410404624|
|      2|    126707|5.274957140973229...|0.003717472118959...|
|      3|    150667|2.321681929575648...|0.022038567493112945|
|      4|    126707|0.004856707072328811|  0.6965317919075146|
|      1|    106100|1.544600334663405...|0.009980039920159679|
|      2|    150667|0.003745219570090993|  0.5241635687732341|
|      4|    112582|3.010355623344304...| 0.04046242774

In [7]:
df_plot = df_top.toPandas()

In [9]:
plot = px.line(df_plot,
               x="quarter",
               y="pop_index_perc",
               color='product_id',
               title='Top 10 products with high standard deviation in popularity index between quarters')
plot