<a href="https://colab.research.google.com/github/shubacca/pyspark-retail/blob/main/retail_exploration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Install necessary packages
!pip install faker pyspark

Collecting faker
  Downloading Faker-26.2.0-py3-none-any.whl.metadata (15 kB)
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading Faker-26.2.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=31e742df5f474472184e49b4610ec3c89aa1802989c10107541349d7119fb8f7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark, faker
Successfully installed faker-26.2.0 pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType, DateType
from faker import Faker
import random
from datetime import datetime, timedelta

In [4]:
spark = SparkSession.builder.master("local[*]").appName("RetailDataset").getOrCreate()

In [10]:
fake = Faker()

def generate_customers(n):
    customers = []
    for i in range(n):
        customer = {
            "customer_id": i,
            "first_name": fake.first_name(),
            "last_name": fake.last_name(),
            "email": fake.email(),
            "is_active": fake.boolean(),
            "is_loyalty_member": fake.boolean()
        }
        customers.append(customer)
    return customers

def generate_products(n):
    products = []
    for i in range(n):
        product = {
            "product_id": i,
            "product_name": fake.word(),
            "category": fake.random_element(elements=("Electronics", "Clothing", "Food", "Books")),
            "price": round(random.uniform(10, 1000), 2)
        }
        products.append(product)
    return products

def generate_sales(n, num_customers, num_products):
    sales = []
    for i in range(n):
        sale = {
            "sale_id": i,
            "customer_id": random.randint(0, num_customers - 1),
            "product_id": random.randint(0, num_products - 1),
            "quantity": random.randint(1, 10),
            "total_amount": round(random.uniform(20, 2000), 2),
            "sale_date": fake.date_between(start_date='-1y', end_date='today')
        }
        sales.append(sale)
    return sales


In [14]:
# Generate data
num_customers = 500
num_products = 50
num_sales = 10000

customers_data = generate_customers(num_customers)
products_data = generate_products(num_products)
sales_data = generate_sales(num_sales, num_customers, num_products)

# Define schemas
customer_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("is_active", BooleanType(), True),
    StructField("is_loyalty_member", BooleanType(), True)
])

product_schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", FloatType(), True)
])

sales_schema = StructType([
    StructField("sale_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_amount", FloatType(), True),
    StructField("sale_date", DateType(), True)
])

# Create Spark DataFrames
customers_df = spark.createDataFrame(customers_data, schema=customer_schema)
products_df = spark.createDataFrame(products_data, schema=product_schema)
sales_df = spark.createDataFrame(sales_data, schema=sales_schema)


In [15]:
sales_df.show()

+-------+-----------+----------+--------+------------+----------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|
+-------+-----------+----------+--------+------------+----------+
|      0|        450|        38|       9|     1285.13|2023-09-15|
|      1|         17|        45|       4|     1953.44|2024-07-08|
|      2|        104|        38|       4|     1185.28|2024-03-20|
|      3|        481|         4|       8|      318.61|2024-02-18|
|      4|         83|        35|       1|      238.84|2024-03-08|
|      5|        478|        46|       6|      527.09|2023-12-25|
|      6|        403|        27|       2|      893.08|2024-01-03|
|      7|        198|        24|       5|      264.86|2024-05-24|
|      8|         62|        22|       5|      565.63|2024-02-06|
|      9|         20|        39|       3|      965.33|2023-10-13|
|     10|        347|        20|       2|     1464.17|2023-09-23|
|     11|        462|        10|      10|       515.0|2023-08-27|
|     12| 

In [16]:
# How would you select the first_name and last_name columns from the customers_df DataFrame using the col() function?
from pyspark.sql.functions import col

names = customers_df.select(col("first_name"), col('last_name'))
names.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|    Martha|  Ramirez|
|   Tiffany|   Harris|
|   Tristan|   Benson|
| Elizabeth|   Herman|
|     Lance|     Wade|
|   Rebecca|  Jackson|
|      Dana|   Nguyen|
|      Tara|  Watkins|
|   Vanessa|   Garcia|
|  Jonathan|    White|
|    Thomas|  Collins|
|   Spencer| Thompson|
|    Daniel|Hernandez|
|    Deanna| Johnston|
|    Isabel|    Clark|
|    Dennis|    Casey|
|   Michael|     Wood|
|   Michael|  Sanchez|
|     Barry|    Perez|
|      Paul|     Cole|
+----------+---------+
only showing top 20 rows



In [20]:
# Using the expr() function, how would you create a new column in the sales_df DataFrame that calculates the total price as quantity * total_amount?

sales_df = sales_df.withColumn('total_price', expr('quantity * total_amount'))
sales_df.show()

+-------+-----------+----------+--------+------------+----------+-----------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|
+-------+-----------+----------+--------+------------+----------+-----------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|
|      7|        198|        24|       5|      264.86|2024-05-24|  1324.2999|
|      8|         62|        22|       5|      565.63|2024-02-06|    2828.15|
|      9|         20|        39|       3|      965.33|2023-10-13

In [22]:
#  Using the when() and otherwise() functions, how would you create a new column in the products_df DataFrame that labels products as "Expensive" if the price is greater than $500, and "Affordable" otherwise?
from pyspark.sql.functions import when

products_df = products_df.withColumn('product_tag', when(col('price')>500, 'Expensive').otherwise('Affordable'))
products_df.show()

+----------+------------+-----------+------+-----------+
|product_id|product_name|   category| price|product_tag|
+----------+------------+-----------+------+-----------+
|         0|       under|   Clothing|391.76| Affordable|
|         1|          at|       Food|920.41|  Expensive|
|         2|    language|   Clothing|852.81|  Expensive|
|         3|       party|       Food|728.56|  Expensive|
|         4|         few|Electronics|463.27| Affordable|
|         5|       trial|Electronics|924.68|  Expensive|
|         6|     between|Electronics| 526.8|  Expensive|
|         7|    daughter|       Food|434.68| Affordable|
|         8|      choose|      Books|769.56|  Expensive|
|         9|         tax|      Books|277.17| Affordable|
|        10|         her|   Clothing|495.27| Affordable|
|        11|        side|       Food|807.75|  Expensive|
|        12|     history|      Books|724.07|  Expensive|
|        13|       month|      Books|116.21| Affordable|
|        14|          if|      

In [31]:
# calculate the average total_amount for all sales in the sales_df DataFrame?
from pyspark.sql.functions import avg, count

sales_df.groupBy(col('product_id')).agg(avg(col('total_amount')).alias('avg_sales'), count(col('total_amount'))).orderBy(col('product_id')).show()

+----------+------------------+-------------------+
|product_id|         avg_sales|count(total_amount)|
+----------+------------------+-------------------+
|         0|1046.4491488075257|                200|
|         1|1023.1973235679395|                198|
|         2|1031.7267689798393|                204|
|         3|1014.8981739372335|                186|
|         4|1016.7443236346221|                199|
|         5| 1081.031818132944|                193|
|         6| 961.0983561235873|                201|
|         7|1074.5456086819765|                189|
|         8|1040.1062793639546|                207|
|         9|1013.7342090228997|                202|
|        10|1006.7672905699114|                214|
|        11| 965.6692328161123|                196|
|        12|1104.1070320582626|                202|
|        13| 988.2838834357385|                193|
|        14|1052.7073091569837|                182|
|        15| 1004.337927426693|                207|
|        16|

In [32]:
# how would you determine the number of active customers in the customers_df DataFrame?
customers_df.groupBy(col('is_active')).agg(count(col('customer_id'))).show()

+---------+------------------+
|is_active|count(customer_id)|
+---------+------------------+
|     true|               245|
|    false|               255|
+---------+------------------+



In [35]:
# create a new column in the customers_df DataFrame that combines first_name and last_name into a single full_name column?

from pyspark.sql.functions import concat, lit
customers_df.withColumn('full_name', concat(col('first_name'), lit(' '), col('last_name'))).show()

+-----------+----------+---------+--------------------+---------+-----------------+----------------+
|customer_id|first_name|last_name|               email|is_active|is_loyalty_member|       full_name|
+-----------+----------+---------+--------------------+---------+-----------------+----------------+
|          0|    Martha|  Ramirez| bryan66@example.com|     true|             true|  Martha Ramirez|
|          1|   Tiffany|   Harris|annachan@example.com|     true|             true|  Tiffany Harris|
|          2|   Tristan|   Benson|travis09@example.com|     true|            false|  Tristan Benson|
|          3| Elizabeth|   Herman|nathanielanderson...|     true|            false|Elizabeth Herman|
|          4|     Lance|     Wade|rjohnson@example.org|     true|             true|      Lance Wade|
|          5|   Rebecca|  Jackson|brocknatasha@exam...|    false|             true| Rebecca Jackson|
|          6|      Dana|   Nguyen|hholloway@example...|     true|            false|     Dan

In [38]:
# How would you extract the first three characters from the product_name column in the products_df DataFrame
from pyspark.sql.functions import substring, upper
products_df.withColumn('name_code', upper(substring(col('product_name'), 1, 3))).show()

+----------+------------+-----------+------+-----------+---------+
|product_id|product_name|   category| price|product_tag|name_code|
+----------+------------+-----------+------+-----------+---------+
|         0|       under|   Clothing|391.76| Affordable|      UND|
|         1|          at|       Food|920.41|  Expensive|       AT|
|         2|    language|   Clothing|852.81|  Expensive|      LAN|
|         3|       party|       Food|728.56|  Expensive|      PAR|
|         4|         few|Electronics|463.27| Affordable|      FEW|
|         5|       trial|Electronics|924.68|  Expensive|      TRI|
|         6|     between|Electronics| 526.8|  Expensive|      BET|
|         7|    daughter|       Food|434.68| Affordable|      DAU|
|         8|      choose|      Books|769.56|  Expensive|      CHO|
|         9|         tax|      Books|277.17| Affordable|      TAX|
|        10|         her|   Clothing|495.27| Affordable|      HER|
|        11|        side|       Food|807.75|  Expensive|      

In [46]:
# convert the sale_date column in the sales_df DataFrame to a format like "MM-dd-yyyy"?
from pyspark.sql.functions import date_format

sales_df.withColumn('sale_date_reformat', date_format(col('sale_date'), "MM-dd-yyyy")).show()

+-------+-----------+----------+--------+------------+----------+-----------+------------------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|sale_date_reformat|
+-------+-----------+----------+--------+------------+----------+-----------+------------------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|        09-15-2023|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|        07-08-2024|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|        03-20-2024|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|        02-18-2024|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|        03-08-2024|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|        12-25-2023|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|        01-03-2024|
|      7|        198|        2

In [48]:
# how would you calculate a new column in the sales_df DataFrame that adds 6 months to each sale_date?
from pyspark.sql.functions import add_months

sales_df.withColumn('six_mth_from_sale_date', add_months(col('sale_date'), 6)).show()

+-------+-----------+----------+--------+------------+----------+-----------+----------------------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|six_mth_from_sale_date|
+-------+-----------+----------+--------+------------+----------+-----------+----------------------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|            2024-03-15|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|            2025-01-08|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|            2024-09-20|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|            2024-08-18|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|            2024-09-08|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|            2024-06-25|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|            20

In [50]:
# How would you create a new column in the sales_df DataFrame that calculates the square root of the total_amount for each sale?
from pyspark.sql.functions import sqrt, round

sales_df.withColumn('sqrt_total_amount', round(sqrt(col('total_amount')), 2)).show()

+-------+-----------+----------+--------+------------+----------+-----------+-----------------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|sqrt_total_amount|
+-------+-----------+----------+--------+------------+----------+-----------+-----------------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|            35.85|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|             44.2|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|            34.43|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|            17.85|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|            15.45|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|            22.96|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|            29.88|
|      7|        198|        24|       5

In [52]:
# In the sales_df DataFrame, how would you compute a new column that represents the absolute difference between total_amount and a fixed value, say $100?
from pyspark.sql.functions import abs

sales_df.withColumn('total_amount_shift', abs(100 - col('total_amount'))).show()


+-------+-----------+----------+--------+------------+----------+-----------+------------------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|total_amount_shift|
+-------+-----------+----------+--------+------------+----------+-----------+------------------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|           1185.13|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|           1853.44|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|           1085.28|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|         218.60999|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|            138.84|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|         427.09003|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|            793.08|
|      7|        198|        2

In [53]:
# In the customers_df DataFrame, how would you check which rows have a null value in the email column?
customers_df.filter(col('email').isNull()).show()


+-----------+----------+---------+-----+---------+-----------------+
|customer_id|first_name|last_name|email|is_active|is_loyalty_member|
+-----------+----------+---------+-----+---------+-----------------+
+-----------+----------+---------+-----+---------+-----------------+



In [55]:
# How would you filter out all rows from the sales_df DataFrame where the total_amount column contains NaN values?
sales_df.filter(col('total_amount').isNull()).show()

+-------+-----------+----------+--------+------------+---------+-----------+
|sale_id|customer_id|product_id|quantity|total_amount|sale_date|total_price|
+-------+-----------+----------+--------+------------+---------+-----------+
+-------+-----------+----------+--------+------------+---------+-----------+



In [56]:
# How would you convert the price column in the products_df DataFrame from a float to an integer?
products_df.withColumn('price', col('price').cast('integer')).show()

+----------+------------+-----------+-----+-----------+
|product_id|product_name|   category|price|product_tag|
+----------+------------+-----------+-----+-----------+
|         0|       under|   Clothing|  391| Affordable|
|         1|          at|       Food|  920|  Expensive|
|         2|    language|   Clothing|  852|  Expensive|
|         3|       party|       Food|  728|  Expensive|
|         4|         few|Electronics|  463| Affordable|
|         5|       trial|Electronics|  924|  Expensive|
|         6|     between|Electronics|  526|  Expensive|
|         7|    daughter|       Food|  434| Affordable|
|         8|      choose|      Books|  769|  Expensive|
|         9|         tax|      Books|  277| Affordable|
|        10|         her|   Clothing|  495| Affordable|
|        11|        side|       Food|  807|  Expensive|
|        12|     history|      Books|  724|  Expensive|
|        13|       month|      Books|  116| Affordable|
|        14|          if|       Food|  489| Affo

In [57]:
# In the sales_df DataFrame, how would you change the sale_date column from a date type to a string type?
sales_df.withColumn('sale_date', col('sale_date').cast('string')).show()

+-------+-----------+----------+--------+------------+----------+-----------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|
+-------+-----------+----------+--------+------------+----------+-----------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|
|      7|        198|        24|       5|      264.86|2024-05-24|  1324.2999|
|      8|         62|        22|       5|      565.63|2024-02-06|    2828.15|
|      9|         20|        39|       3|      965.33|2023-10-13

In [63]:
# How would you create a new column in the products_df DataFrame that stores the product_name and category as an array?
from pyspark.sql.functions import array, explode

products_df.withColumn('product_info', array(col('product_name'), col('category'))).show()

# how would i explode this and show all columns?
products_df.withColumn('product_info', array(col('product_name'), col('category'))).select('*', explode(col('product_info'))).show()

+----------+------------+-----------+------+-----------+--------------------+
|product_id|product_name|   category| price|product_tag|        product_info|
+----------+------------+-----------+------+-----------+--------------------+
|         0|       under|   Clothing|391.76| Affordable|   [under, Clothing]|
|         1|          at|       Food|920.41|  Expensive|          [at, Food]|
|         2|    language|   Clothing|852.81|  Expensive|[language, Clothing]|
|         3|       party|       Food|728.56|  Expensive|       [party, Food]|
|         4|         few|Electronics|463.27| Affordable|  [few, Electronics]|
|         5|       trial|Electronics|924.68|  Expensive|[trial, Electronics]|
|         6|     between|Electronics| 526.8|  Expensive|[between, Electro...|
|         7|    daughter|       Food|434.68| Affordable|    [daughter, Food]|
|         8|      choose|      Books|769.56|  Expensive|     [choose, Books]|
|         9|         tax|      Books|277.17| Affordable|        

In [64]:
# In the sales_df DataFrame, how would you construct a map column where the keys are product_id and the values are quantity?
from pyspark.sql.functions import create_map

sales_df.withColumn('product_quantity', create_map(col('product_id'), col('quantity'))).show()

+-------+-----------+----------+--------+------------+----------+-----------+----------------+
|sale_id|customer_id|product_id|quantity|total_amount| sale_date|total_price|product_quantity|
+-------+-----------+----------+--------+------------+----------+-----------+----------------+
|      0|        450|        38|       9|     1285.13|2023-09-15|   11566.17|       {38 -> 9}|
|      1|         17|        45|       4|     1953.44|2024-07-08|    7813.76|       {45 -> 4}|
|      2|        104|        38|       4|     1185.28|2024-03-20|    4741.12|       {38 -> 4}|
|      3|        481|         4|       8|      318.61|2024-02-18|    2548.88|        {4 -> 8}|
|      4|         83|        35|       1|      238.84|2024-03-08|     238.84|       {35 -> 1}|
|      5|        478|        46|       6|      527.09|2023-12-25|    3162.54|       {46 -> 6}|
|      6|        403|        27|       2|      893.08|2024-01-03|    1786.16|       {27 -> 2}|
|      7|        198|        24|       5|      264

### Analysis questions

In [75]:
# How would you find the top 5 customers who have spent the most money on products in the "Electronics" category? i need only top 5 results
from pyspark.sql.functions import desc

# Join the DataFrames on the product_id column
# Filter for "Electronics" category
# Select the required columns
# Order by total_price in descending order
# Show the top 5 customers

sales_df.join(products_df, products_df.product_id == sales_df.product_id, how='left').where(col('category') == 'Electronics').select('customer_id', 'total_price').orderBy(desc(col('total_price'))).show(5)

+-----------+-----------+
|customer_id|total_price|
+-----------+-----------+
|        191|    19923.7|
|        308|    19676.3|
|        368|    19484.1|
|        345|    19382.7|
|        415|    19278.9|
+-----------+-----------+
only showing top 5 rows

