Big Data

# **Individual Assignment:**
### **PepsiCo Dataset Exploration Exercise**
Professor: Daniel Tapiador

Assignment done by: 
João André Pinho

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **Assignment Description**

**TASK 0:** What is PepsiCo’s total volume and revenue in 2011 and 2012 for the category? What are the top 3 regions?

**TASK 1:** What is PepsiCo's market share (for revenue)?

**TASK 2:** Focusing on New York, identify the top customers (retailers).

**TASK 3:** Focusing on New York, list the top products (including PepsiCo & competitors).

**TASK 4:** Focusing on New York, for the 1-3 top-seller items, how strong is the promotional activity? 

**TASK 5:** Focusing on New York, calculate the Volume Sold on Deal (VSoD) for PepsiCo products.

**TASK 6:** Create a time series chart:

1. Configurable for a particular item;

2. For the total New York area;

3. At weekly level;

4. Show revenue, units sold, average price, average time with price reduction (PR), average marketing support (D).


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **Resolution**

### **• Libraries and Module Imports:**

Importing the necessary libraries and modules to manipulate and interact with the datasets.

In [53]:
# Imports: 
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import format_string
from pyspark.sql.functions import concat, lit
from pyspark.sql.functions import sum, year
from pyspark.sql.functions import countDistinct, count, max
from pyspark.sql.functions import avg
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.window import Window

### **• Load the Datasets as Pandas Dataframes:**

Loading the Delivery Stores, Product Attributes and Sell Out datasets as Pandas Dataframes.

In [117]:
# Read the Delivery Stores file.
#delivery_stores_df = pd.read_fwf("Saltsnck_US_1of2\Delivery_Stores")

In [118]:
# Read the Product Attributes file.
#product_attributes_df = pd.read_fwf("Saltsnck_US_1of2\saltsnck_prod_attr")

In [119]:
# Read the Sell Out 1 file.
#sellout1_df = pd.read_fwf("Saltsnck_US_1of2\saltsnck_groc_1687_1739")

In [120]:
# Read the Sell Out 2 file.
#sellout2_df = pd.read_fwf("Saltsnck_US_2of2\saltsnck_groc_1635_1686")

In [121]:
# Read the Week Translation file.
#week_translation_df = pd.read_excel("Saltsnck_US_1of2\IRI week translation_2008_2017.xls")

In [122]:
# Read the Product Salt Snacks file.
#product_saltsnacks_df = pd.read_excel("Saltsnck_US_1of2\prod_saltsnck.xls")

### **• Transform the Pandas Datasets into CSVs:**

Transforming the pandas datasets into CSVs in order to make them readable and manipulable by PySpark.

In [123]:
# Save the modified delivery stores data as CSV.
#delivery_stores_df.to_csv("CSVs/delivery_stores.csv", index = False)

In [124]:
# Save the modified product attributes data as CSV.
#product_attributes_df.to_csv("CSVs/product_attributes.csv", index = False)

In [125]:
# Save the modified sell out 1 data as CSV.
#sellout1_df.to_csv("CSVs/sellout1.csv", index = False)

In [126]:
# Save the modified sell out 2 data as CSV.
#sellout2_df.to_csv("CSVs/sellout2.csv", index = False)

In [127]:
# Save the modified sell out 2 data as CSV.
#week_translation_df.to_csv("CSVs/week_translation.csv", index = False)

In [128]:
#Save the modified product salt snacks data as CSV.
#product_saltsnacks_df.to_csv("CSVs/product_saltsnacks.csv", index = False)

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **Data Preparation**

### **• Create a SparkSession:**

Create a SparkSession to provide a single point of entry to interact with the different Spark DataFrames.

In [54]:
# Create a Spark Session.
spark = SparkSession.builder.getOrCreate()

### **• Create PySpark Dataframes:**

Transform the CSVs into PySpark Dataframes.

In [55]:
# Create a delivery stores PySpark DataFrame.
delivery_stores_df = spark.read.csv("CSVs/delivery_stores.csv", header=True, inferSchema=True)

In [56]:
# Create a product attributes PySpark DataFrame.
product_attributes_df = spark.read.csv("CSVs/product_attributes.csv", header=True, inferSchema=True)

In [57]:
# Create a sell out 1 PySpark DataFrame.
sellout1_df = spark.read.csv("CSVs/sellout1.csv", header=True, inferSchema=True)

In [58]:
# Create a sell out 2 PySpark DataFrame.
sellout2_df = spark.read.csv("CSVs/sellout2.csv", header=True, inferSchema=True)

In [59]:
# Create a week translation PySpark DataFrame.
week_translation_df = spark.read.csv("CSVs/week_translation.csv", header=True, inferSchema=True)

In [60]:
# Create a product salt snacks PySpark DataFrame.
product_saltsnacks_df = spark.read.csv("CSVs/product_saltsnacks.csv", header=True, inferSchema=True)

### **• Check the PySpark Dataframes Schema:**

Checking the different pyspark dataframes schema to assess which transformations may need to be done.

In [61]:
# Check the delivery stores PySpark DataFrame structure.
delivery_stores_df.show(5)

+-------+---+--------+--------------+----+----+--------+
|IRI_KEY| OU| EST_ACV|   Market_Name|Open|Clsd|MskdName|
+-------+---+--------+--------------+----+----+--------+
| 200032| GR|25.28099|      NEW YORK|1539|1743|  Chain5|
| 200156| GR|16.52399|       DETROIT|1689|1799| Chain44|
| 200272| GR|  11.092|   LOS ANGELES| 873|9998|Chain107|
| 200341| GR|25.69398|     SAN DIEGO|1197|9998|Chain107|
| 200379| GR|29.52299|SOUTH CAROLINA|1348|1719| Chain55|
+-------+---+--------+--------------+----+----+--------+
only showing top 5 rows



In [62]:
# Check the product attributes PySpark DataFrame structure.
product_attributes_df.show(10)

+---+---+----+-----+------+------------+-----------+-------------+-----+--------+-------------------+-------+--------------------+--------+---------------+----------+----+-------+--------------------+-------+-----+-----------------+-----------+------+-------+------------+------------+---------+--------+----------+----+-----------+--------+-----------------+-------+-------+--------------+----------------+-------+------+-------------+--------+---------+---------------+-----------+---------+-------+---------+-----+-------+----------------+-------+----+---------+------------+-------+-------+----------+-------------+------+---------+---------+----------+--------+----+--------+------------------+------------+---------+----+-----------+------+---------+------+---------+------+-----------+--------+------+-------+------+--------+------+--------+----+---------+----------+-------+-----+---------+-------+---------+----+---------+---------+---------+---------+----------+--------+-------+---------+-

In [63]:
# Check the sell out 1 PySpark DataFrame structure.
sellout1_df.show(5)

+-------+----+---+---+-----+-----+-----+-------+----+---+---+
|IRI_KEY|WEEK| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|
+-------+----+---+---+-----+-----+-----+-------+----+---+---+
| 234212|1687|  0|  1|84114|11336|    5|   12.5|NONE|  1|  0|
| 234212|1687|  0|  1|84114|11391|    5|   12.5|NONE|  0|  0|
| 234212|1687|  0|  1|84114|10990|    9|   22.5|NONE|  1|  0|
| 234212|1687|  0|  2|84114|10813|    5|  12.75|NONE|  1|  1|
| 234212|1687|  0|  1|84114|  999|    2|    5.0|NONE|  0|  0|
+-------+----+---+---+-----+-----+-----+-------+----+---+---+
only showing top 5 rows



In [64]:
# Check the sell out 2 PySpark DataFrame structure.
sellout2_df.show(5)

+-------+----+---+---+-----+-----+-----+-------+----+---+---+
|IRI_KEY|WEEK| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|
+-------+----+---+---+-----+-----+-----+-------+----+---+---+
| 234212|1635|  0|  3|78271| 2060|    6|   12.0|NONE|  0|  1|
| 234212|1635|  0|  3|78271| 2061|    4|   8.79|NONE|  0|  1|
| 234212|1635|  0|  1|84114|11336|    5|   15.0|NONE|  0|  0|
| 234212|1635|  0|  1|84114|11391|    4|   12.0|NONE|  0|  0|
| 234212|1635|  0|  1|84114|10990|    8|  21.96|NONE|  0|  1|
+-------+----+---+---+-----+-----+-----+-------+----+---+---+
only showing top 5 rows



In [65]:
# Check the week translation PySpark DataFrame structure.
week_translation_df.show(5)

+--------+-------------------------+-----------------------+----------+-------------+----------+
|IRI Week|Calendar week starting on|Calendar week ending on|Unnamed: 3|Calendar date|IRI Week.1|
+--------+-------------------------+-----------------------+----------+-------------+----------+
|    1479|               2007-12-31|             2008-01-06|      null|   1979-09-03|       1.0|
|    1480|               2008-01-07|             2008-01-13|      null|         null|      null|
|    1481|               2008-01-14|             2008-01-20|      null|         null|      null|
|    1482|               2008-01-21|             2008-01-27|      null|         null|      null|
|    1483|               2008-01-28|             2008-02-03|      null|         null|      null|
+--------+-------------------------+-----------------------+----------+-------------+----------+
only showing top 5 rows



In [66]:
# Check the product salt snacks PySpark DataFrame structure.
product_saltsnacks_df.show(5)

+--------------------+------------+------------+------------+-------+--------------------+-----+-----------------+---+---+-----+----+--------------------------------------------------------------------------------+------+------------+-----------+------------+-----------+--------------+-------------------+-----------+
|                  L1|          L2|          L3|          L4|     L5|                  L9|Level|              UPC| SY| GE| VEND|ITEM|*STUBSPEC 1431RC                                                         00004  |VOL_EQ|PRODUCT TYPE|    PACKAGE|FLAVOR/SCENT|FAT CONTENT|COOKING METHOD|SALT/SODIUM CONTENT|TYPE OF CUT|
+--------------------+------------+------------+------------+-------+--------------------+-----+-----------------+---+---+-----+----+--------------------------------------------------------------------------------+------+------------+-----------+------------+-----------+--------------+-------------------+-----------+
|CATEGORY - SALTY ...|POTATO CHIPS|ACTON CO

In [67]:
# Confirm if PEPSICO appears on column L3 - Parent Company.
value_exists = product_saltsnacks_df.filter(product_saltsnacks_df["L3"].like("PEPSICO%"))

# Check if the value exists.
if value_exists.count() > 0:
    print("Value exists in the column")
else:
    print("Value does not exist in the column")

# Check: The value exists in column L3.

Value exists in the column


### **• Concatenate the two Sell Out PySpark Dataframes:**

Since both sell out pyspark dataframes have the same schema (i.e., same columns and data types), a new pyspark dataframe will be created using the union() method.

In [68]:
# Concatenate both sell out pyspark dataframes to make them easier to manage.
sellout_concat_df = sellout2_df.union(sellout1_df)

In [69]:
# Check the combined number of rows of the two sell out dataframes.
sellout1_df.count() + sellout2_df.count()

40414203

In [70]:
# Check the total number of rows of the new concatenated sellout dataframe.
sellout_concat_df.count()

# Check: The number of rows is still the same.

40414203

### **• Join the IRI Week Translation Column:**

In order to interpret the IRI Week code, the "Calendar week starting on" column will be added using join() method.

In [71]:
# Change the name of the join column in the sell out dataframe before adding in the new column.
sellout_concat_df = sellout_concat_df.withColumnRenamed("Week", "IRI Week")

In [72]:
# Perform the join on the IRI Week column.
sellout_concat_df = sellout_concat_df.join(week_translation_df.select("IRI Week", "Calendar week starting on"), on="IRI Week", how= "left")

In [73]:
# Check the new schema of the sell out dataframe.
sellout_concat_df.show(5)

# Check: The new column was successfully added.

+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+
|IRI Week|IRI_KEY| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|Calendar week starting on|
+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+
|    1635| 234212|  0|  3|78271| 2060|    6|   12.0|NONE|  0|  1|               2010-12-27|
|    1635| 234212|  0|  3|78271| 2061|    4|   8.79|NONE|  0|  1|               2010-12-27|
|    1635| 234212|  0|  1|84114|11336|    5|   15.0|NONE|  0|  0|               2010-12-27|
|    1635| 234212|  0|  1|84114|11391|    4|   12.0|NONE|  0|  0|               2010-12-27|
|    1635| 234212|  0|  1|84114|10990|    8|  21.96|NONE|  0|  1|               2010-12-27|
+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+
only showing top 5 rows



In [74]:
# Check the total number of rows of the transformed dataframe.
sellout_concat_df.count()

# Check: The number of rows is the same.

40414203

### **• Build the UPC - Universal Product Code Column:**

The UPC column will be derived using the available columns in the sell out dataframe, appropriate padding and concatenation, allowing both the parent company (L3) and product type columns to be later retrieved from the product salt snacks dataframe.

In [75]:
# Create a padded SY - System column.
sellout_concat_df = sellout_concat_df.withColumn("SY_padded", format_string("%02d", sellout_concat_df["SY"]))

In [76]:
# Create a padded GE - Generation column.
sellout_concat_df = sellout_concat_df.withColumn("GE_padded", format_string("%02d", sellout_concat_df["GE"]))

In [77]:
# Create a padded VEND - Vendor column.
sellout_concat_df = sellout_concat_df.withColumn("VEND_padded", format_string("%05d", sellout_concat_df["VEND"]))

In [78]:
# Create a padded ITEM - Item column.
sellout_concat_df = sellout_concat_df.withColumn("ITEM_padded", format_string("%05d", sellout_concat_df["ITEM"]))

In [79]:
# Check the new schema of the sell out dataframe.
sellout_concat_df.show(5)

# Check: The new columns were successfully added.

+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+
|IRI Week|IRI_KEY| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|Calendar week starting on|SY_padded|GE_padded|VEND_padded|ITEM_padded|
+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+
|    1635| 234212|  0|  3|78271| 2060|    6|   12.0|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02060|
|    1635| 234212|  0|  3|78271| 2061|    4|   8.79|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02061|
|    1635| 234212|  0|  1|84114|11336|    5|   15.0|NONE|  0|  0|               2010-12-27|       00|       01|      84114|      11336|
|    1635| 234212|  0|  1|84114|11391|    4|   12.0|NONE|  0|  0|               2010-12-27|       00|       01|      84114|      11391|
|    1635| 234212|  0|  1|84114|10990|    8|  21

In [80]:
# Build the UPC column using the recently added padded columns and concatenation.
sellout_concat_df = sellout_concat_df.withColumn("UPC", concat(sellout_concat_df["SY_padded"],lit("-"),sellout_concat_df["GE_padded"],lit("-"),sellout_concat_df["VEND_padded"],lit("-"),sellout_concat_df["ITEM_padded"]))

In [81]:
# Check the new schema of the sell out dataframe.
sellout_concat_df.show(5)

# Check: The new UPC column was successfully created.

+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+-----------------+
|IRI Week|IRI_KEY| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|Calendar week starting on|SY_padded|GE_padded|VEND_padded|ITEM_padded|              UPC|
+--------+-------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+-----------------+
|    1635| 234212|  0|  3|78271| 2060|    6|   12.0|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02060|00-03-78271-02060|
|    1635| 234212|  0|  3|78271| 2061|    4|   8.79|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02061|00-03-78271-02061|
|    1635| 234212|  0|  1|84114|11336|    5|   15.0|NONE|  0|  0|               2010-12-27|       00|       01|      84114|      11336|00-01-84114-11336|
|    1635| 234212|  0|  1|84114|11391|    4|   12.0|NONE|  0|  0|           

In [82]:
# Check the total number of rows of the sell out dataframe.
sellout_concat_df.count()

# Check: The number of rows is the same.

40414203

### **• Join the Masked and Market Name Columns:**

In order to identify the top selling regions, the "MskdName" and "Market_Name" columns will be added using the join() method.

In [83]:
# Perform a join on the IRI_KEY column to add the MskdName column.
sellout_concat_df = sellout_concat_df.join(delivery_stores_df.select("IRI_KEY", "MskdName"), on="IRI_KEY", how="left")

In [84]:
# Perform a join on the IRI_KEY column to add the Market_Name column.
sellout_concat_df = sellout_concat_df.join(delivery_stores_df.select("IRI_KEY", "Market_Name"), on="IRI_KEY", how="left")

In [85]:
# Check the new schema of the sell out dataframe.
sellout_concat_df.show(5)

# Check: The new columns were successfully created.

+-------+--------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+-----------------+--------+-----------+
|IRI_KEY|IRI Week| SY| GE| VEND| ITEM|UNITS|DOLLARS|   F|  D| PR|Calendar week starting on|SY_padded|GE_padded|VEND_padded|ITEM_padded|              UPC|MskdName|Market_Name|
+-------+--------+---+---+-----+-----+-----+-------+----+---+---+-------------------------+---------+---------+-----------+-----------+-----------------+--------+-----------+
| 234212|    1635|  0|  3|78271| 2060|    6|   12.0|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02060|00-03-78271-02060| Chain42|    CHICAGO|
| 234212|    1635|  0|  3|78271| 2061|    4|   8.79|NONE|  0|  1|               2010-12-27|       00|       03|      78271|      02061|00-03-78271-02061| Chain42|    CHICAGO|
| 234212|    1635|  0|  1|84114|11336|    5|   15.0|NONE|  0|  0|               2010-12-27|       00|       01|      84114|  

In [86]:
# Check the total number of rows of the sell out dataframe.
sellout_concat_df.count()

# Check: The number of rows is the same.

40414203

### **• Drop the Unecessary Columns:**

In order to optimize the transformations performed on the sell out dataframe, uncessary columns like: IRI_KEY, IRI WEEK, SY, GE, F, SY_padded, GE_padded, VEND_padded and ITEM_padded will be removed.

In [87]:
# Drop the unecessary columns of the sellout dataframe.
columns_to_drop = ["IRI_KEY", "IRI WEEK", "SY", "GE", "F", "SY_padded", "GE_padded", "VEND_padded", "ITEM_padded"]

sellout_concat_trimmed_df = sellout_concat_df.drop(*columns_to_drop)

In [88]:
# Check the new schema of the sell out dataframe.
sellout_concat_trimmed_df.show(5)

# Check: The unecessary columns were successfully deleted.

+-----+-----+-----+-------+---+---+-------------------------+-----------------+--------+-----------+
| VEND| ITEM|UNITS|DOLLARS|  D| PR|Calendar week starting on|              UPC|MskdName|Market_Name|
+-----+-----+-----+-------+---+---+-------------------------+-----------------+--------+-----------+
|78271| 2060|    6|   12.0|  0|  1|               2010-12-27|00-03-78271-02060| Chain42|    CHICAGO|
|78271| 2061|    4|   8.79|  0|  1|               2010-12-27|00-03-78271-02061| Chain42|    CHICAGO|
|84114|11336|    5|   15.0|  0|  0|               2010-12-27|00-01-84114-11336| Chain42|    CHICAGO|
|84114|11391|    4|   12.0|  0|  0|               2010-12-27|00-01-84114-11391| Chain42|    CHICAGO|
|84114|10990|    8|  21.96|  0|  1|               2010-12-27|00-01-84114-10990| Chain42|    CHICAGO|
+-----+-----+-----+-------+---+---+-------------------------+-----------------+--------+-----------+
only showing top 5 rows



In [89]:
# Check the total number of rows of the sell out dataframe.
sellout_concat_trimmed_df.count()

# Check: The number of rows is the same.

40414203

### **• Join the L3 - Parent Company and Product Type Columns:**

In order to be able to identify the PepsiCo data entries and the different product types, the L3 and Product Type Columns will be added to the sell out dataframe using the join() method.

In [90]:
# Perform a join on the UPC column to add the L3 - Parent Company column.
sellout_concat_trimmed_df = sellout_concat_trimmed_df.join(product_saltsnacks_df.select("UPC", "L3"), on="UPC", how="left")

In [91]:
# Perform a join on the UPC column to add the Product Type column.
sellout_concat_trimmed_df = sellout_concat_trimmed_df.join(product_saltsnacks_df.select("UPC", "Product Type"), on="UPC", how="left")

In [92]:
# Check the new schema of the sell out dataframe.
sellout_concat_trimmed_df.show(5)

# Check: The new columns were successfully created.

+-----------------+-----+-----+-----+-------+---+---+-------------------------+--------+-----------+----------------+------------+
|              UPC| VEND| ITEM|UNITS|DOLLARS|  D| PR|Calendar week starting on|MskdName|Market_Name|              L3|Product Type|
+-----------------+-----+-----+-----+-------+---+---+-------------------------+--------+-----------+----------------+------------+
|00-03-78271-02060|78271| 2060|    6|   12.0|  0|  1|               2010-12-27| Chain42|    CHICAGO|            null|        null|
|00-03-78271-02061|78271| 2061|    4|   8.79|  0|  1|               2010-12-27| Chain42|    CHICAGO|            null|        null|
|00-01-84114-11336|84114|11336|    5|   15.0|  0|  0|               2010-12-27| Chain42|    CHICAGO|            null|        null|
|00-01-84114-11391|84114|11391|    4|   12.0|  0|  0|               2010-12-27| Chain42|    CHICAGO|            null|        null|
|00-01-84114-10990|84114|10990|    8|  21.96|  0|  1|               2010-12-27| Cha

In [93]:
# Check the total number of rows of the new concate
sellout_concat_trimmed_df.count()

# Check: The number of rows is the same.

40414203

### **• Find the Different Vendor Codes for PepsiCo:**

Since some of rows have a null L3 value, in order to include all the PEPSICO INC entries all Vendor values that correspond to this player need to be traced.

In [94]:
# Identify the distinct PepsiCo VEND values.
distinct_vend_pepsico_values = sellout_concat_trimmed_df.filter(col("L3").like("PEPS%")).select('VEND').distinct()
distinct_vend_pepsico_values.show()

# Check: The following vendor codes: 28400, 30000, 71461 are PepsiCo related.

+-----+
| VEND|
+-----+
|30000|
|28400|
|71461|
+-----+



In [95]:
# Confirm that these VEND values only contain PEPSICO INC L3 values.
distinct_vend_values = sellout_concat_trimmed_df.filter(col('VEND').isin([30000, 28400, 71461])).select('L3').distinct()
distinct_vend_values.show()

# Check: These vendor codes are only attributed to PepsiCo data entries.

+-----------+
|         L3|
+-----------+
|       null|
|PEPSICO INC|
+-----------+



In [96]:
# Confirm that PEPSICO INC is always written in the same way.
distinct_pepsico_values = sellout_concat_trimmed_df.filter(col("L3").like("PEP%")).select("L3").distinct()
distinct_pepsico_values.show()

# Check: PepsiCo is always written in the same way.

+-----------+
|         L3|
+-----------+
|PEPSICO INC|
+-----------+



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 0: Total Volume, Revenue and Top 3 Regions in 2011 and 2012**

### **• Total Volume Sold in 2011**

The Total Volume Sold in 2011 by PepsiCo was of 126,315,847 units.

In [97]:
# Define the PEPSICO INC VEND codes.
pepsico_vend_codes = ["28400", "30000", "71461"]

# Computing the total volume sold in 2011 by PepsiCo.
total_volume_pepsico_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(col("VEND").isin(pepsico_vend_codes))).agg(sum("UNITS").alias("Total Volume in 2011"))

# Show the total volume sold in 2011 by PepsiCo.
total_volume_pepsico_2011.show()

+--------------------+
|Total Volume in 2011|
+--------------------+
|           126315847|
+--------------------+



In [98]:
# Computing the total volume sold in 2011 by the other companies.
total_volume_other_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(~col("VEND").isin(pepsico_vend_codes))).agg(sum("UNITS").alias("Total Volume in 2011 by the Other Companies"))

# Show the total volume sold in 2011 by the other companies.
total_volume_other_2011.show()

+-------------------------------------------+
|Total Volume in 2011 by the Other Companies|
+-------------------------------------------+
|                                  103842971|
+-------------------------------------------+



### **• Total Revenue Generated in 2011**

The Total Revenue generated in 2011 by PepsiCo was approximately $ 332,054,362.

In [99]:
# Computing the total revenue generated in 2011 by PepsiCo.
total_revenue_pepsico_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(col("VEND").isin(pepsico_vend_codes))).agg(sum("DOLLARS").alias("Total Revenue in 2011"))

# Show the total revenue generated in 2011 by PepsiCo.
total_revenue_pepsico_2011.show()

+---------------------+
|Total Revenue in 2011|
+---------------------+
|  3.320543621900403E8|
+---------------------+



In [100]:
# Computing the total revenue generated in 2011 by the other companies.
total_revenue_others_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(~col("VEND").isin(pepsico_vend_codes))).agg(sum("DOLLARS").alias("Total Revenue in 2011 by the Other Companies"))

# Show the total revenue generated in 2011 by the other companies.
total_revenue_others_2011.show()

+--------------------------------------------+
|Total Revenue in 2011 by the Other Companies|
+--------------------------------------------+
|                        2.2948795935035855E8|
+--------------------------------------------+



### **• Top 3 Regions in 2011**

The Top 3 Regions in terms of PepsiCo Revenues in 2011 were:
1. **Los Angeles** - $26,212,674;
2. **New York** - $21,566,451;
3. **Chicago** - $14,315,358.

In [101]:
# Identifying the top 3 regions in 2011 for PepsiCo.
top_3_regions_pepsico_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(col("VEND").isin(pepsico_vend_codes))).groupBy("Market_Name").agg(sum("DOLLARS").alias("Total Revenue")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 3 regions in terms of revenues in 2011 for PepsiCo.
top_3_regions_pepsico_2011.show()

+-----------+--------------------+
|Market_Name|       Total Revenue|
+-----------+--------------------+
|LOS ANGELES|2.6212674080004413E7|
|   NEW YORK|2.1566451109998524E7|
|    CHICAGO|1.4315358299999902E7|
|     BOSTON|1.3723807859999282E7|
|PHOENIX, AZ| 1.133937723999909E7|
+-----------+--------------------+



### **• Total Volume Sold in 2012**

The Total Volume Sold in 2012 by PepsiCo was of 126,998,718 units.

In [102]:
# Computing the total volume sold in 2012 by PepsiCo.
total_volume_pepsico_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(col("VEND").isin(pepsico_vend_codes))).agg(sum("UNITS").alias("Total Volume in 2012"))

# Show the total volume sold in 2012 by PepsiCo.
total_volume_pepsico_2012.show()

+--------------------+
|Total Volume in 2012|
+--------------------+
|           126998718|
+--------------------+



In [103]:
# Computing the total volume sold in 2012 by the other companies.
total_volume_other_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(~col("VEND").isin(pepsico_vend_codes))).agg(sum("UNITS").alias("Total Volume in 2012 by the Other Companies"))

# Show the total volume sold in 2012 by the other companies.
total_volume_other_2012.show()

+-------------------------------------------+
|Total Volume in 2012 by the Other Companies|
+-------------------------------------------+
|                                  105995285|
+-------------------------------------------+



### **• Total Revenue Generated in 2012**

The Total Revenue generated in 2012 by PepsiCo was approximately $344,826,848.

In [104]:
# Computing the total revenue generated in 2012 by PepsiCo.
total_revenue_pepsico_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(col("VEND").isin(pepsico_vend_codes))).agg(sum("DOLLARS").alias("Total Revenue in 2012"))

# Show the total revenue generated in 2012 by PepsiCo.
total_revenue_pepsico_2012.show()

+---------------------+
|Total Revenue in 2012|
+---------------------+
|  3.448268479199427E8|
+---------------------+



In [105]:
# Computing the total revenue generated in 2012 by the other companies.
total_revenue_others_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(~col("VEND").isin(pepsico_vend_codes))).agg(sum("DOLLARS").alias("Total Revenue in 2011 by the Other Companies"))

# Show the total revenue generated in 2011 by the other companies.
total_revenue_others_2012.show()

+--------------------------------------------+
|Total Revenue in 2011 by the Other Companies|
+--------------------------------------------+
|                         2.412535131802956E8|
+--------------------------------------------+



### **• Top 3 Regions in 2012**

The Top 3 Regions in terms of PepsiCo Revenues in 2012 were:
1. **Los Angeles** - $27,206,073;
2. **New York** - $22,033,104;
3. **Chicago** - $14,271,019;

In [106]:
# Identifying the top 3 regions in 2012 for PepsiCo.
top_3_regions_pepsico_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(col("VEND").isin(pepsico_vend_codes))).groupBy("Market_Name").agg(sum("DOLLARS").alias("Total Revenue")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 3 regions in terms of revenues in 2012 for PepsiCo.
top_3_regions_pepsico_2012.show()

+-----------+--------------------+
|Market_Name|       Total Revenue|
+-----------+--------------------+
|LOS ANGELES|2.7206073270004123E7|
|   NEW YORK| 2.203310397999847E7|
|    CHICAGO|1.4271018740000326E7|
|     BOSTON|1.3426455239999227E7|
|PHOENIX, AZ|1.1554964549999136E7|
+-----------+--------------------+



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 1: PepsiCo Market Share in 2011 and 2012**

### **• PepsiCo Market Share in 2011**

PepsiCo's market share was 59.13% in 2011.

In [107]:
# Computing the total market revenue generated in 2011.
total_revenue_market_2011 = sellout_concat_trimmed_df.filter(year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011).agg(sum("DOLLARS").alias("Total Revenue in 2011"))

# Computing PepsiCo's market share in 2011 = (PepsiCo's Revenues in 2011 / Total Market Revenues in 2011)
market_share_pepsico_2011 = total_revenue_pepsico_2011.collect()[0]["Total Revenue in 2011"] / total_revenue_market_2011.collect()[0]["Total Revenue in 2011"]

# Printing PepsiCo's market share in 2011.
print("PepsiCo's Market Share in 2011: {:.2%}".format(market_share_pepsico_2011))

PepsiCo's Market Share in 2011: 59.13%


### **• PepsiCo Market Share in 2012**

PepsiCo's market share was 58.84% in 2012.

In [108]:
# Computing the total market revenue generated in 2012.
total_revenue_market_2012 = sellout_concat_trimmed_df.filter(year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012).agg(sum("DOLLARS").alias("Total Revenue in 2012"))

# Computing PepsiCo's market share in 2012 = (PepsiCo's Revenues in 2012 / Total Market Revenues in 2012).
market_share_pepsico_2012 = total_revenue_pepsico_2012.collect()[0]["Total Revenue in 2012"] / total_revenue_market_2012.collect()[0]["Total Revenue in 2012"]

# Printing PepsiCo's market share in 2012.
print("PepsiCo's Market Share in 2012: {:.2%}".format(market_share_pepsico_2012))

PepsiCo's Market Share in 2012: 58.84%


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 2: Top Customers (New York Retailers) in 2011 and 2012**

### **• Top PepsiCo New York Customers in 2011**

The top PepsiCo New York Customers in 2011 were:
1. **Chain 98** - $4,815,087;
2. **Chain 112** - $4,725,130;
3. **Chain 5** - $4,574,273;
4. **Chain 73** - $2,979,815;
5. **Chain 110** - $1,111,449.

In [109]:
# Identifying the top 5 PepsiCo New York customers in 2011.
top_5_retailers_ny_2011_rev_units = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011) & (col("VEND").isin(pepsico_vend_codes)) & (col("Market_Name").like("NEW YORK"))).groupBy("MskdName").agg(sum("DOLLARS").alias("Total Revenue"), sum("UNITS").alias("Total Units Sold")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 5 PepsiCo New York customers in 2011 with their total revenue and total units sold.
top_5_retailers_ny_2011_rev_units.show()

+--------+------------------+----------------+
|MskdName|     Total Revenue|Total Units Sold|
+--------+------------------+----------------+
| Chain98| 4815087.619999763|         1535050|
|Chain112| 4725129.679999712|         1713067|
|  Chain5| 4574273.169999719|         1665812|
| Chain73| 2979815.079999773|         1126291|
|Chain110|1111448.5100000002|          487371|
+--------+------------------+----------------+



### **• Top PepsiCo New York Customers in 2012**

The top PepsiCo New York Customers in 2012 were:
1. **Chain 112** - $4,835,976;
2. **Chain 98** - $4,791,504;
3. **Chain 5** - $4,757,187;
4. **Chain 73** - $3,017,525;
5. **Chain 110** - $1,141,277.

In [110]:
# Identifying the top 5 PepsiCo New York customers in 2012.
top_5_retailers_ny_2012_rev_units = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012) & (col("VEND").isin(pepsico_vend_codes)) & (col("Market_Name").like("NEW YORK"))).groupBy("MskdName").agg(sum("DOLLARS").alias("Total Revenue"), sum("UNITS").alias("Total Units Sold")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 5 PepsiCo New York customers in 2012 with their total revenue and total units sold.
top_5_retailers_ny_2012_rev_units.show()

+--------+------------------+----------------+
|MskdName|     Total Revenue|Total Units Sold|
+--------+------------------+----------------+
|Chain112| 4835975.539999782|         1710526|
| Chain98|4791504.0399998175|         1457287|
|  Chain5| 4757187.459999753|         1724960|
| Chain73|3017525.3799998136|         1088952|
|Chain110|1141277.0799999998|          431213|
+--------+------------------+----------------+



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 3: Top Products (PepsiCo and Competitors) Sold in New York in 2011 and 2012**

### **• Top Products (PepsiCo and Competitors) Sold in New York in 2011**

The Top 5 sold products in New York in 2011 were:

1. **UPC: 00-03-28400-03345** - Generated around $1,022,581;
2. **UPC: 00-01-28400-03875** - Generated around $1,015,517;
3. **UPC: 00-02-28400-06408** - Generated around $975,795;
4. **UPC: 00-02-28400-06399** - Generated around $830,449;
5. **UPC: 00-01-28400-03346** - Generated around $608,710.

In [111]:
# Identifying the top 5 products sold in New York in 2011.
top_5_products_ny_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011)&(col("Market_Name").like("NEW YORK"))).groupBy("UPC").agg(sum("DOLLARS").alias("Total Revenue")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 5 products sold in New York in 2011.
top_5_products_ny_2011.show()

+-----------------+------------------+
|              UPC|     Total Revenue|
+-----------------+------------------+
|00-03-28400-03345|1022580.7299999997|
|00-01-28400-03875|1015516.7399999999|
|00-02-28400-06408| 975795.2800000001|
|00-02-28400-06399| 830449.3699999999|
|00-01-28400-03346| 608710.1100000002|
+-----------------+------------------+



### **• Top Products (PepsiCo and Competitors) Sold in New York in 2012**

The Top 5 sold products in New York in 2012 were:

1. **UPC: 00-01-28400-03875** - Generated around $1,399,523;
2. **UPC: 00-03-28400-03345** - Generated around $1,382,586;
3. **UPC: 00-03-28400-06408** - Generated around $1,112,857;
4. **UPC: 00-01-28400-03854** - Generated around $753,409;
5. **UPC: 00-02-28400-06399** - Generated around $741,246.

In [112]:
# Identifying the top 5 Products sold in New York in 2012.
top_5_products_ny_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)&(col("Market_Name").like("NEW YORK"))).groupBy("UPC").agg(sum("DOLLARS").alias("Total Revenue")).orderBy("Total Revenue", ascending=False).limit(5)

# Show the top 5 Products sold in New York in 2012.
top_5_products_ny_2012.show()

+-----------------+------------------+
|              UPC|     Total Revenue|
+-----------------+------------------+
|00-01-28400-03875|1399523.4900000002|
|00-03-28400-03345|1382585.5200000003|
|00-03-28400-06408|1112857.1400000001|
|00-01-28400-03854| 753409.0200000004|
|00-02-28400-06399| 741246.2600000001|
+-----------------+------------------+



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 4: Promotional Activity of the Top 3 Selling Products in New York in 2011 and 2012**

### **Method 1: Calculation of Weeks in Promotion for Top-Selling Items (Over all Year Weeks)**

This method involves computing the percentage across all 52 weeks in a year, accounting for both the weeks in which the product was sold and those in which it may not have been sold.

### **• Promotional Activity of the Top 3 Selling Products in New York in 2011**

The Promotional Activity of the Top 3 selling products in New York in 2011 was:

1. **UPC 00-03-28400-03345** - This product was in promotion 73.08% of the weeks in 2011.
2. **UPC 00-01-28400-03875** - This product was in promotion 80.77% of the weeks in 2011.
3. **UPC 00-02-28400-06408** - This product was in promotion 90.38% of the weeks in 2011.

In [113]:
# Define a function to compute the % of weeks in promotion for a top selling item.
def compute_weeks_in_promotion(df, upcs, target_year):
    
    for upc in upcs:
        # Filter the DataFrame for the year 2011 and the specified item in promotion.
        upc_weeks_count = df.filter(
            (year(col("Calendar week starting on")) == target_year) &
            (col("PR") > 0.1) &
            (col("UPC") == upc)
        ).agg(countDistinct("Calendar week starting on").alias("Weeks in Promotion")).collect()[0]["Weeks in Promotion"]

        # Filter the DataFrame for the year 2011.
        total_weeks_count = df.filter(
            year(col("Calendar week starting on")) == target_year
        ).agg(countDistinct("Calendar week starting on").alias("Total Weeks")).collect()[0]["Total Weeks"]

        # Compute the percentage of weeks the specified item was in promotion in 2011.
        if total_weeks_count > 0:
            upc_weeks_percentage = (upc_weeks_count / total_weeks_count) * 100
        else:
            upc_weeks_percentage = 0

        # Print the item weeks percentage.
        print("Percentage of weeks UPC {} was in promotion in {}: {:.2f}% ({}/{} weeks)".format(upc, target_year, upc_weeks_percentage, upc_weeks_count, total_weeks_count))

In [114]:
# Declare the list of top selling items in 2011.
upcs_list_2011 = ["00-03-28400-03345", "00-01-28400-03875", "00-02-28400-06408"]

# Declare the year we want to check.
target_year = 2011

# Call the function to compute and print the % of weeks in promotion for the top 3 selling items in 2011.
compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2011, target_year)

Percentage of weeks UPC 00-03-28400-03345 was in promotion in 2011: 73.08% (38/52 weeks)
Percentage of weeks UPC 00-01-28400-03875 was in promotion in 2011: 80.77% (42/52 weeks)
Percentage of weeks UPC 00-02-28400-06408 was in promotion in 2011: 90.38% (47/52 weeks)


In [197]:
# # Declare the list of top selling items in 2011.
# upcs_list_2011 = ["00-03-28400-03345", "00-01-28400-03875", "00-02-28400-06408","00-02-28400-06399", "00-01-28400-03346","00-01-28400-03854"]

# # Declare the year we want to check.
# target_year = 2011

# # Call the function to compute and print the % of weeks in promotion for the top 5 selling items in 2011.
# compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2011, target_year)

### **• Promotional Activity of the Top 3 Selling Products in New York in 2012**

The Promotional Activity of the Top 3 selling products in New York in 2012 was:

1. **UPC 00-01-28400-03875** - This product was in promotion 100.00% of the weeks in 2012.
2. **UPC 00-03-28400-03345** - This product was in promotion 100.00% of the weeks in 2012.
3. **UPC 00-03-28400-06408** - This product was in promotion 100.00% of the weeks in 2012.

In [115]:
# Declare the list of top selling items in 2012.
upcs_list_2012 = ["00-01-28400-03875", "00-03-28400-03345", "00-03-28400-06408"]

# Declare the year we want to check.
target_year = 2012

# Call the function to compute and print the % of weeks in promotion for the top 3 selling items in 2012.
compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2012, target_year)

Percentage of weeks UPC 00-01-28400-03875 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-03-28400-03345 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-03-28400-06408 was in promotion in 2012: 100.00% (52/52 weeks)


In [199]:
# # Declare the list of top selling items in 2012.
# upcs_list_2012 = ["00-01-28400-03875", "00-03-28400-03345", "00-03-28400-06408","00-01-28400-03854","00-02-28400-06399", "00-01-28400-03346"]

# # Declare the year we want to check.
# target_year = 2012

# # Call the function to compute and print the % of weeks in promotion for the top 3 selling items in 2012.
# compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2012, target_year)

### **Method 2: Mean Price Reduction Flag for Top-Selling Items**

This method is less granular than the previous one and it does not account for time but rather an high-level estimation of the average PR column values.

### **• Promotional Activity of the Top 3 Selling Products in New York in 2011**

The Promotional Activity of the Top 3 selling products in New York in 2011 was:

1. **UPC 00-03-28400-03345** - This product was in promotion 58.50% of the time in 2011.
2. **UPC 00-01-28400-03875** - This product was in promotion 52.84% of the time in 2011.
3. **UPC 00-02-28400-06408** - This product was in promotion 48.72% of the time in 2011.

In [116]:
# Define a function to compute the % of time a top selling item was in promotion.
def calculate_mean_promo_flag(df, upcs, target_year):
    for upc in upcs:
        # Filter the DataFrame for the specified item and target year.
        filtered_df = df.filter((df["UPC"] == upc) & (F.year(df["Calendar week starting on"]) == target_year))
        
        # Calculate the average promotion flag for the specified item.
        upc_promo_avg = filtered_df.agg(F.avg("PR")).collect()[0][0]
        
        # Compute the percentage of time the specified item was in promotion.
        upc_promo_percentage = upc_promo_avg * 100
        
        # Print the item promotion percentage.
        print("UPC {} was in promotion {:.2f}% of the time in {}.".format(upc, upc_promo_percentage, target_year))

In [117]:
# Declare the list of top selling items in 2011.
upcs_list_2011 = ["00-03-28400-03345", "00-01-28400-03875", "00-02-28400-06408"]

# Declare the year we want to check.
target_year = 2011

# Call the function to compute and print the % of time in promotion for the top 3 selling items in 2011.
calculate_mean_promo_flag(sellout_concat_trimmed_df, upcs_list_2011, target_year)

UPC 00-03-28400-03345 was in promotion 58.50% of the time in 2011.
UPC 00-01-28400-03875 was in promotion 52.84% of the time in 2011.
UPC 00-02-28400-06408 was in promotion 48.72% of the time in 2011.


### **• Promotional Activity of the Top 3 Selling Products in New York in 2012**

The Promotional Activity of the Top 3 selling products in New York in 2012 was:

1. **UPC 00-01-28400-03875** - This product was in promotion 100.00% of the weeks in 2012.
2. **UPC 00-03-28400-03345** - This product was in promotion 100.00% of the weeks in 2012.
3. **UPC 00-03-28400-06408** - This product was in promotion 100.00% of the weeks in 2012.

In [119]:
# Declare the list of top selling items in 2012.
upcs_list_2012 = ["00-01-28400-03875", "00-03-28400-03345", "00-03-28400-06408"]

# Declare the year we want to check.
target_year = 2012

# Call the function to compute and print the % time in promotion for the top 3 selling items in 2012.
calculate_mean_promo_flag(sellout_concat_trimmed_df, upcs_list_2012, target_year)

UPC 00-01-28400-03875 was in promotion 59.64% of the time in 2012.
UPC 00-03-28400-03345 was in promotion 62.44% of the time in 2012.
UPC 00-03-28400-06408 was in promotion 46.15% of the time in 2012.


In [120]:
distinct_pr_values = sellout_concat_trimmed_df.filter((col("UPC") == "00-03-28400-06408")&(year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012)).select("PR").distinct()

# Show the distinct values of PR column for UPC = 00-03-28400-06408
distinct_pr_values.show()

+---+
| PR|
+---+
|  1|
|  0|
+---+



### **Method 3: Calculation of % of Weeks in Promotion for Top-Selling Items over the Total Number Weeks in which these were Sold**

This computation method assumes that if a product was sold on promotion (PR > 0.1) at least once during a given week, then the entire week can be considered as a promotional week for that product. Although there would possibly be other methods to compute Promotional Activity, this one besides being a common practice among the sector players it is based on the premise that because promotions typically lead to substantial increases in sales for the rest of the week, sometimes even after promotions end.

### **• Promotional Activity of the Top 3 Selling Products in New York in 2011**

The Promotional Activity of the Top 3 selling products in New York in 2011 was:

1. **UPC 00-03-28400-03345** - This product was in promotion 97.44% of the weeks in 2011.
2. **UPC 00-01-28400-03875** - This product was in promotion 100.00% of the weeks in 2011.
3. **UPC 00-02-28400-06408** - This product was in promotion 100.00% of the weeks in 2011.

In [121]:
# Defining a function to compute the % of weeks in promotion for a top selling item.
def compute_weeks_in_promotion(df, upcs, target_year):
    
    for upc in upcs:
        # Filtering the DataFrame for the target year and the specified item.
        df_upc = df.filter(
            (year(col("Calendar week starting on")) == target_year) &
            (col("UPC") == upc)
        )
        
        # Creating a new column 'promo_week' that is 1 if PR==1 and null otherwise
        df_upc = df_upc.withColumn("promo_week", max(col("PR")).over(Window.partitionBy("Calendar week starting on")))

        # Computing weeks in promotion.
        upc_weeks_count = df_upc.filter(
            col("promo_week") == 1
        ).agg(countDistinct("Calendar week starting on").alias("Weeks in Promotion")).collect()[0]["Weeks in Promotion"]

        # Computing total weeks.
        total_weeks_count = df_upc.agg(countDistinct("Calendar week starting on").alias("Total Weeks")).collect()[0]["Total Weeks"]

        # Computing the percentage of weeks the specified item was in promotion in the target year.
        if total_weeks_count > 0:
            upc_weeks_percentage = (upc_weeks_count / total_weeks_count) * 100
        else:
            upc_weeks_percentage = 0

        # Printing the item weeks percentage.
        print("Percentage of weeks UPC {} was in promotion in {}: {:.2f}% ({}/{} weeks)".format(upc, target_year, upc_weeks_percentage, upc_weeks_count, total_weeks_count))

In [122]:
# Declaring the list of top selling items in 2011.
upcs_list_2011 = ["00-03-28400-03345", "00-01-28400-03875", "00-02-28400-06408","00-02-28400-06399", "00-01-28400-03346","00-01-28400-03854"]

# Declaring the year we want to check.
target_year = 2011

# Calling the function to compute and print the % of weeks in promotion for the top 3 selling items in 2011.
compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2011, target_year)

Percentage of weeks UPC 00-03-28400-03345 was in promotion in 2011: 97.44% (38/39 weeks)
Percentage of weeks UPC 00-01-28400-03875 was in promotion in 2011: 100.00% (42/42 weeks)
Percentage of weeks UPC 00-02-28400-06408 was in promotion in 2011: 100.00% (47/47 weeks)
Percentage of weeks UPC 00-02-28400-06399 was in promotion in 2011: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-01-28400-03346 was in promotion in 2011: 100.00% (47/47 weeks)
Percentage of weeks UPC 00-01-28400-03854 was in promotion in 2011: 100.00% (41/41 weeks)


### **• Promotional Activity of the Top 3 Selling Products in New York in 2012**

The Promotional Activity of the Top 3 selling products in New York in 2012 was:

1. **UPC 00-01-28400-03875** - This product was in promotion 100.00% of the weeks in 2012.
2. **UPC 00-03-28400-03345** - This product was in promotion 100.00% of the weeks in 2012.
3. **UPC 00-03-28400-06408** - This product was in promotion 100.00% of the weeks in 2012.

In [123]:
# Declaring the list of top selling items in 2012.
upcs_list_2012 =  ["00-01-28400-03875", "00-03-28400-03345", "00-03-28400-06408","00-01-28400-03854","00-02-28400-06399", "00-01-28400-03346"]

# Declaring the year we want to check.
target_year = 2012

# Calling the function to compute and print the % of weeks in promotion for the top 3 selling items in 2012.
compute_weeks_in_promotion(sellout_concat_trimmed_df, upcs_list_2012, target_year)

Percentage of weeks UPC 00-01-28400-03875 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-03-28400-03345 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-03-28400-06408 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-01-28400-03854 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-02-28400-06399 was in promotion in 2012: 100.00% (52/52 weeks)
Percentage of weeks UPC 00-01-28400-03346 was in promotion in 2012: 100.00% (52/52 weeks)


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 5: Volume Sold on Deal (VSoD) for PepsiCo products in New York in 2011 and 2012**

### **• Volume Sold on Deal (VSoD) for PepsiCo Products Sold in New York in 2011**

The Volume Sold on Deal (VSoD) for PepsiCo products in 2011 in New York was of 52.01%.

In [124]:
# Filter the dataset to get the number of PepsiCo product units that were sold on deal in 2011 in New York.
units_sold_on_deal_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011) & (col("Market_Name").like("NEW YORK")) & (col("PR") > 0.1) &
    (col("VEND").isin(pepsico_vend_codes))).agg(sum("Units").alias("Units Sold on Deal")).collect()[0]["Units Sold on Deal"]

# Show the number of PepsiCo product units that were sold on deal in 2011 in New York.
print("In 2011, in New York, {} units of PepsiCo products were sold on deal.".format(units_sold_on_deal_2011))

In 2011, in New York, 3956230 units of PepsiCo products were sold on deal.


In [125]:
# Filter the dataset to get the total number of PepsiCo product units sold in New York in 2011.
total_units_sold_2011 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2011) & (col("Market_Name").like("NEW YORK")) &
    (col("VEND").isin(pepsico_vend_codes))).agg(sum("Units").alias("Total Units Sold")).collect()[0]["Total Units Sold"]

# Show the total number of PepsiCo product units sold in New York.
print("In 2011, in New York, {} units of PepsiCo products were sold.".format(total_units_sold_2011))

In 2011, in New York, 7606676 units of PepsiCo products were sold.


In [126]:
# Calculate the Volume Sold on Deal (VSoD) for PepsiCo products in 2011 in New York.
vsod_2011 = (units_sold_on_deal_2011 / total_units_sold_2011) * 100

# Print the Volume Sold on Deal (VSoD) for PepsiCo products in 2011 in New York.
print("Volume Sold on Deal (VSoD) for PepsiCo products in 2011 in New York: {:.2f}%".format(vsod_2011))

Volume Sold on Deal (VSoD) for PepsiCo products in 2011 in New York: 52.01%


### **• Volume Sold on Deal (VSoD) for PepsiCo Products Sold in New York in 2012**

The Volume Sold on Deal (VSoD) for PepsiCo products in 2012 in New York was of 56.53%.

In [127]:
# Filter the dataset to get the number of PepsiCo product units that were sold on deal in 2012 in New York.
units_sold_on_deal_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012) & (col("Market_Name").like("NEW YORK")) & (col("PR") > 0.1) &
    (col("VEND").isin(pepsico_vend_codes))).agg(sum("Units").alias("Units Sold on Deal")).collect()[0]["Units Sold on Deal"]

# Show the number of PepsiCo product units that were sold on deal in 2012 in New York.
print("In 2012, in New York, {} units of PepsiCo products were sold on deal.".format(units_sold_on_deal_2012))

In 2012, in New York, 4255249 units of PepsiCo products were sold on deal.


In [128]:
# Filter the dataset to get the total number of PepsiCo product units sold in New York in 2012.
total_units_sold_2012 = sellout_concat_trimmed_df.filter((year(sellout_concat_trimmed_df["Calendar week starting on"]) == 2012) & (col("Market_Name").like("NEW YORK")) &
    (col("VEND").isin(pepsico_vend_codes))).agg(sum("Units").alias("Total Units Sold")).collect()[0]["Total Units Sold"]

# Show the total number of PepsiCo product units sold in New York.
print("In 2012, in New York, {} units of PepsiCo products were sold.".format(total_units_sold_2012))

In 2012, in New York, 7527159 units of PepsiCo products were sold.


In [129]:
# Calculate the Volume Sold on Deal (VSoD) for PepsiCo products in 2012 in New York.
vsod_2012 = (units_sold_on_deal_2012 / total_units_sold_2012) * 100

# Print the Volume Sold on Deal (VSoD) for PepsiCo products in 2012 in New York.
print("Volume Sold on Deal (VSoD) for PepsiCo products in 2012 in New York: {:.2f}%".format(vsod_2012))

Volume Sold on Deal (VSoD) for PepsiCo products in 2012 in New York: 56.53%


--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## **TASK 6: Time Series Chart**

### **• Data for Building the Chart**

The necessary columns were retrieved in order to satisfy the folowing chart criteria:

1. Configurable for a particular item;

2. For the total New York area;

3. At weekly level;

4. Show revenue, units sold, average price, average time with price reduction (PR), average marketing support (D).

In [130]:
# Filtering the sellout dataset for all the transactions in New York, from PepsiCo.
chart_filtered_data_df = sellout_concat_trimmed_df.filter((col("Market_Name").like("NEW YORK")) & (col("VEND").isin(pepsico_vend_codes)) & ((year(col("Calendar week starting on")) == 2011) | (year(col("Calendar week starting on")) == 2012))).select(*sellout_concat_trimmed_df.columns)

In [131]:
# Calculating the sum of dollars and units and the average Marketing for each combination of UPC, WEEK, and Mskd_Name.
grouped_df = chart_filtered_data_df.groupBy("UPC","Calendar week starting on", "MskdName").agg(
    F.sum("DOLLARS").alias("Sum_Dollars"),
    F.sum("UNITS").alias("Sum_Units"),
    F.avg("D").alias("Average_D"),
    F.avg("PR").alias("Average_PR")
)

# Calculating the average price (AVG_Price) column.
grouped_df = grouped_df.withColumn("AVG_Price", F.col("Sum_Dollars") / F.col("Sum_Units"))

# Adding a Year column.
grouped_df = grouped_df.withColumn('Year', year(col('Calendar week starting on')))

In [132]:
# Defining the desired column order.
column_order = ['UPC', 'Calendar week starting on', 'Year', 'MskdName', 'Sum_Dollars', 'Sum_Units', 'Average_D', 'Average_PR', 'AVG_Price']

# Reorganizing the order of columns
grouped_df = grouped_df.select(column_order)

In [133]:
# Show the grouped chart dataframe.
grouped_df.show()

+-----------------+-------------------------+----+--------+------------------+---------+------------------+-------------------+------------------+
|              UPC|Calendar week starting on|Year|MskdName|       Sum_Dollars|Sum_Units|         Average_D|         Average_PR|         AVG_Price|
+-----------------+-------------------------+----+--------+------------------+---------+------------------+-------------------+------------------+
|00-01-28400-08391|               2011-01-10|2011|  Chain4|            457.15|       65|               1.2|                0.6| 7.033076923076923|
|00-03-28400-01234|               2011-02-07|2011|  Chain4|             39.26|       15|               0.0|                0.8|2.6173333333333333|
|00-03-28400-01371|               2011-02-21|2011|  Chain4|              40.0|       20|               0.2|                0.0|               2.0|
|00-01-28400-04768|               2011-03-07|2011|  Chain4|            121.59|       41|               0.4|           