In [47]:
!pwd

/home/ec2-user


### 3 Data Pipeline

#### Task 1  - Data Ingestion from S3

##### a. Load data S3 -> Pyspark environment

In [4]:
! aws s3 ls

2024-12-07 16:14:26 customer-storage-bucket


In [5]:
! aws s3 ls s3://customer-storage-bucket/

                           PRE processed_data/
                           PRE raw_data/


In [6]:
! aws s3 ls s3://customer-storage-bucket/raw_data/

2024-12-07 16:15:55          0 
2024-12-07 16:19:44   23715344 Online_Retail.xlsx


In [7]:
! aws s3 cp s3://customer-storage-bucket/raw_data/Online_Retail.xlsx  ./

download: s3://customer-storage-bucket/raw_data/Online_Retail.xlsx to ./Online_Retail.xlsx


##### b. data inspection in Pyspark environment

In [8]:
!ls

Online_Retail.xlsx  Project.ipynb  scala-2.13.6.rpm


#### Task 2 - Data Processing with PySpark

In [1]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("MiniProject_Retail") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/07 21:49:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [17]:
spark

In [14]:
!pip3 install pandas

Defaulting to user installation because normal site-packages is not writeable
Collecting pandas
  Downloading pandas-1.3.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
[K     |████████████████████████████████| 11.3 MB 38.6 MB/s eta 0:00:01    |██████████████████████▍         | 7.9 MB 38.6 MB/s eta 0:00:01
Collecting numpy>=1.17.3; platform_machine != "aarch64" and platform_machine != "arm64" and python_version < "3.10"
  Downloading numpy-1.21.6-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.7 MB)
[K     |████████████████████████████████| 15.7 MB 46.9 MB/s eta 0:00:01
Installing collected packages: numpy, pandas
Successfully installed numpy-1.21.6 pandas-1.3.5


In [16]:
!pip3 install openpyxl

Defaulting to user installation because normal site-packages is not writeable
Collecting openpyxl
  Downloading openpyxl-3.1.3-py2.py3-none-any.whl (251 kB)
[K     |████████████████████████████████| 251 kB 20.5 MB/s eta 0:00:01
[?25hCollecting et-xmlfile
  Downloading et_xmlfile-1.1.0-py3-none-any.whl (4.7 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.1.3


In [3]:
## As our raw file is in xlsx format - lets convert to csv - 
import pandas as pd

# Convert Excel to CSV using pandas
excel_file = "Online_Retail.xlsx"
csv_file = "Online_Retail.csv"
pd.read_excel(excel_file).to_csv(csv_file, index=False)


In [4]:
#### Load data set
df = spark.read.csv(csv_file, header=True, inferSchema=True)
df.show()

                                                                                

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [5]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [23]:
print(f"Total Records: {df.count()}")

Total Records: 541909


In [24]:
### Checking null values if any (as per source there are no null values)
from pyspark.sql.functions import col, sum

# Check if any column has null values
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


[Stage 9:>                                                          (0 + 1) / 1]

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



                                                                                

In [6]:
### droped null values
df = df.dropna(subset=["CustomerID", "Description"])

In [26]:
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

[Stage 12:>                                                         (0 + 1) / 1]

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



                                                                                

In [27]:
print(f"Total Records: {df.count()}")

[Stage 15:>                                                         (0 + 1) / 1]

Total Records: 406829


                                                                                

##### 1. Data Transformations

In [7]:
from pyspark.sql.functions import col, to_timestamp, year, month, concat_ws
# Convert InvoiceDate to timestamp type
df = df.withColumn("InvoiceDate", to_timestamp(col("InvoiceDate")))

In [8]:
# Add TotalPrice column
df = df.withColumn("Revenue", col("Quantity") * col("UnitPrice"))

In [9]:
# Add InvoiceYearMonth column
df = df.withColumn("InvoiceYearMonth", concat_ws("-", year(col("InvoiceDate")), month(col("InvoiceDate"))))


In [33]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|           Revenue|InvoiceYearMonth|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|15.299999999999999|         2010-12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|         2010-12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|              22.0|         2010-12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|         2

##### 2. Data Aggregation

In [10]:
df.select("Country").distinct().show()

[Stage 3:>                                                          (0 + 1) / 1]

+------------------+
|           Country|
+------------------+
|            Sweden|
|         Singapore|
|           Germany|
|               RSA|
|            France|
|            Greece|
|European Community|
|           Belgium|
|           Finland|
|             Malta|
|       Unspecified|
|             Italy|
|              EIRE|
|         Lithuania|
|            Norway|
|             Spain|
|           Denmark|
|           Iceland|
|            Israel|
|   Channel Islands|
+------------------+
only showing top 20 rows



                                                                                

In [11]:
from pyspark.sql import functions as F
# Compute Total Revenue by Country
df = df.withColumn("Revenue", F.col("Quantity") * F.col("UnitPrice"))  # Create a column for revenue

# Total Revenue by Country
totalRevenueByCountry = df.groupBy("Country").agg(F.sum("Revenue").alias("Total_Revenue")).orderBy(F.desc("Total_Revenue"))

totalRevenueByCountry.show(10)

[Stage 6:>                                                          (0 + 1) / 1]

+--------------+------------------+
|       Country|     Total_Revenue|
+--------------+------------------+
|United Kingdom| 6767873.394002574|
|   Netherlands|284661.54000000015|
|          EIRE|250285.21999999872|
|       Germany|221698.20999999862|
|        France| 196712.8399999999|
|     Australia|137077.26999999973|
|   Switzerland| 55739.40000000004|
|         Spain| 54774.57999999997|
|       Belgium| 40910.95999999998|
|        Sweden|          36595.91|
+--------------+------------------+
only showing top 10 rows



                                                                                

In [12]:
customerTransactionValue = df.groupBy("CustomerID").agg(
    F.sum("Revenue").alias("Total_Transaction_Value")
).orderBy(F.desc("Total_Transaction_Value"))

customerTransactionValue.show(10)

[Stage 9:>                                                          (0 + 1) / 1]

+----------+-----------------------+
|CustomerID|Total_Transaction_Value|
+----------+-----------------------+
|   14646.0|      279489.0199999999|
|   18102.0|     256438.49000000005|
|   17450.0|     187482.17000000013|
|   14911.0|      132572.6199999998|
|   12415.0|     123725.44999999987|
|   14156.0|     113384.13999999985|
|   17511.0|      88125.37999999996|
|   16684.0|      65892.07999999999|
|   13694.0|      62653.10000000003|
|   15311.0|      59419.34000000011|
+----------+-----------------------+
only showing top 10 rows



                                                                                

In [13]:
# Compute Total Quantity Sold by Region**
totalQuantityCountry = df.groupBy("Country").agg(
    F.sum("Quantity").alias("Total_Quantity_Sold")
).orderBy(F.desc("Total_Quantity_Sold"))

totalQuantityCountry.show(10)

[Stage 12:>                                                         (0 + 1) / 1]

+--------------+-------------------+
|       Country|Total_Quantity_Sold|
+--------------+-------------------+
|United Kingdom|            4008533|
|   Netherlands|             200128|
|          EIRE|             136329|
|       Germany|             117448|
|        France|             109848|
|     Australia|              83653|
|        Sweden|              35637|
|   Switzerland|              29778|
|         Spain|              26824|
|         Japan|              25218|
+--------------+-------------------+
only showing top 10 rows



                                                                                

In [14]:
# Monthly Spending Trends**
monthly_spending_trends = df.groupBy("InvoiceYearMonth").agg(
    F.sum("Revenue").alias("Total_Revenue")
).orderBy("InvoiceYearMonth")

monthly_spending_trends.show(12)

[Stage 15:>                                                         (0 + 1) / 1]

+----------------+------------------+
|InvoiceYearMonth|     Total_Revenue|
+----------------+------------------+
|         2010-12|  554604.020000018|
|          2011-1|475074.38000001636|
|         2011-10| 974603.5899999909|
|         2011-11|1132407.7399999578|
|         2011-12| 342506.3800000034|
|          2011-2| 436546.1500000147|
|          2011-3| 579964.6100000151|
|          2011-4| 426047.8510000125|
|          2011-5|  648251.080000003|
|          2011-6| 608013.1600000106|
|          2011-7|  574238.481000012|
|          2011-8| 616368.0000000092|
+----------------+------------------+
only showing top 12 rows



                                                                                

In [15]:
# Compute Average Transaction Value per Customer
averageTransactionPerCustomer = df.groupBy("CustomerID").agg(
    F.avg("Revenue").alias("Average_Transaction_Value")
).orderBy(F.desc("Average_Transaction_Value"))

averageTransactionPerCustomer.show(10)

[Stage 18:>                                                         (0 + 1) / 1]

+----------+-------------------------+
|CustomerID|Average_Transaction_Value|
+----------+-------------------------+
|   15195.0|                   3861.0|
|   13135.0|                   3096.0|
|   17846.0|                   2033.1|
|   16532.0|                   1687.2|
|   15749.0|       1435.7266666666667|
|   16000.0|       1377.0777777777778|
|   16754.0|                   1001.2|
|   12798.0|        872.1299999999999|
|   17553.0|                    743.8|
|   17949.0|        667.7321518987343|
+----------+-------------------------+
only showing top 10 rows



                                                                                

#### Task 3: Store Processed Data Back to S3

##### a.Step 1 Export data in CSV

In [16]:
! aws s3 ls

2024-12-07 16:14:26 customer-storage-bucket
2024-12-07 21:03:09 sagemaker-studio-724772065279-gf2z5vlral
2024-12-07 21:03:11 sagemaker-us-east-1-724772065279


In [19]:
!mkdir processed

In [20]:
## lets save to local first
df.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/processed")  # Save in csv format


                                                                                

In [21]:
### all the aggregated data  # Save in csv format

totalRevenueByCountry.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/totalRevenueByCountry") 
customerTransactionValue.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/customerTransactionValue") 
totalQuantityCountry.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/totalQuantityCountry") 
monthly_spending_trends.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/monthly_spending_trends") 
averageTransactionPerCustomer.write.mode("overwrite").option("header", "true").csv("/home/ec2-user/averageTransactionPerCustomer") 

                                                                                

##### b.	Step 2  Upload the processed data to a designated S3 location

In [2]:
### Now lets copy all of them to S3 using AWS CLI
!aws s3 ls

2024-12-07 16:14:26 customer-storage-bucket


In [22]:
! aws s3 cp /home/ec2-user/processed/ s3://customer-storage-bucket/processed_data --recursive


upload: processed/._SUCCESS.crc to s3://customer-storage-bucket/processed_data/._SUCCESS.crc
upload: processed/_SUCCESS to s3://customer-storage-bucket/processed_data/_SUCCESS
upload: processed/.part-00000-f046cc7f-87ee-4b39-b6cb-d2aedf34dde9-c000.csv.crc to s3://customer-storage-bucket/processed_data/.part-00000-f046cc7f-87ee-4b39-b6cb-d2aedf34dde9-c000.csv.crc
upload: processed/part-00000-f046cc7f-87ee-4b39-b6cb-d2aedf34dde9-c000.csv to s3://customer-storage-bucket/processed_data/part-00000-f046cc7f-87ee-4b39-b6cb-d2aedf34dde9-c000.csv


In [23]:
# creating folders in aws s3
!aws s3 cp /dev/null  s3://customer-storage-bucket/totalRevenueByCountry
!aws s3 cp /dev/null  s3://customer-storage-bucket/customerTransactionValue
!aws s3 cp /dev/null  s3://customer-storage-bucket/totalQuantityCountry
!aws s3 cp /dev/null  s3://customer-storage-bucket/monthly_spending_trends
!aws s3 cp /dev/null  s3://customer-storage-bucket/averageTransactionPerCustomer

                                                              
                                                              
                                                              
                                                              
                                                              


In [24]:
#copying
! aws s3 cp /home/ec2-user/totalRevenueByCountry/ s3://customer-storage-bucket/totalRevenueByCountry --recursive
! aws s3 cp /home/ec2-user/customerTransactionValue/ s3://customer-storage-bucket/customerTransactionValue --recursive
! aws s3 cp /home/ec2-user/totalQuantityCountry/ s3://customer-storage-bucket/totalQuantityCountry --recursive
! aws s3 cp /home/ec2-user/monthly_spending_trends/ s3://customer-storage-bucket/monthly_spending_trends --recursive
! aws s3 cp /home/ec2-user/averageTransactionPerCustomer/ s3://customer-storage-bucket/averageTransactionPerCustomer --recursive

upload: totalRevenueByCountry/_SUCCESS to s3://customer-storage-bucket/totalRevenueByCountry/_SUCCESS
upload: totalRevenueByCountry/._SUCCESS.crc to s3://customer-storage-bucket/totalRevenueByCountry/._SUCCESS.crc
upload: totalRevenueByCountry/.part-00000-e67377ec-eb59-40ab-85d4-47b2c1b61305-c000.csv.crc to s3://customer-storage-bucket/totalRevenueByCountry/.part-00000-e67377ec-eb59-40ab-85d4-47b2c1b61305-c000.csv.crc
upload: totalRevenueByCountry/part-00000-e67377ec-eb59-40ab-85d4-47b2c1b61305-c000.csv to s3://customer-storage-bucket/totalRevenueByCountry/part-00000-e67377ec-eb59-40ab-85d4-47b2c1b61305-c000.csv
upload: customerTransactionValue/_SUCCESS to s3://customer-storage-bucket/customerTransactionValue/_SUCCESS
upload: customerTransactionValue/._SUCCESS.crc to s3://customer-storage-bucket/customerTransactionValue/._SUCCESS.crc
upload: customerTransactionValue/part-00000-97f63484-a1d3-42da-b0e0-0dd69bc314eb-c000.csv to s3://customer-storage-bucket/customerTransactionValue/part-00

#### Task 4 - Data Analysis Using Spark SQL 

In [41]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|           Revenue|InvoiceYearMonth|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+------------------+----------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|15.299999999999999|         2010-12|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|         2010-12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|              22.0|         2010-12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|             20.34|         2

In [42]:
df.createOrReplaceTempView("Retail_data")

In [45]:
# Total Revenue by Country
spark.sql("""
SELECT 
    Country,
    SUM(Revenue) AS TotalRevenue
FROM Retail_data
GROUP BY Country
ORDER BY TotalRevenue DESC
""").show()

[Stage 41:>                                                         (0 + 1) / 1]

+---------------+------------------+
|        Country|      TotalRevenue|
+---------------+------------------+
| United Kingdom| 6767873.394002574|
|    Netherlands|284661.54000000015|
|           EIRE|250285.21999999872|
|        Germany|221698.20999999862|
|         France| 196712.8399999999|
|      Australia|137077.26999999973|
|    Switzerland| 55739.40000000004|
|          Spain| 54774.57999999997|
|        Belgium| 40910.95999999998|
|         Sweden|          36595.91|
|          Japan|          35340.62|
|         Norway| 35163.46000000004|
|       Portugal|          29059.81|
|        Finland| 22326.73999999997|
|Channel Islands|20086.289999999957|
|        Denmark|18768.140000000003|
|          Italy|16890.510000000002|
|         Cyprus|12946.289999999999|
|        Austria|10154.319999999996|
|      Singapore|           9120.39|
+---------------+------------------+
only showing top 20 rows



                                                                                

In [47]:
# Monthly Spending Trends

spark.sql("""
SELECT 
    InvoiceYearMonth, 
    SUM(Revenue) AS MonthlyRevenue
FROM Retail_data
GROUP BY InvoiceYearMonth
ORDER BY InvoiceYearMonth
""").show()

[Stage 44:>                                                         (0 + 1) / 1]

+----------------+------------------+
|InvoiceYearMonth|    MonthlyRevenue|
+----------------+------------------+
|         2010-12|  554604.020000018|
|          2011-1|475074.38000001636|
|         2011-10| 974603.5899999909|
|         2011-11|1132407.7399999578|
|         2011-12| 342506.3800000034|
|          2011-2| 436546.1500000147|
|          2011-3| 579964.6100000151|
|          2011-4| 426047.8510000125|
|          2011-5|  648251.080000003|
|          2011-6| 608013.1600000106|
|          2011-7|  574238.481000012|
|          2011-8| 616368.0000000092|
|          2011-9| 931440.3719999959|
+----------------+------------------+



                                                                                

In [48]:
#Top 10 Customers by Revenue
spark.sql("""
SELECT 
    CustomerID,
    SUM(Revenue) AS TotalCustomerRevenue
FROM Retail_data
GROUP BY CustomerID
ORDER BY TotalCustomerRevenue DESC
LIMIT 10
""").show()

[Stage 47:>                                                         (0 + 1) / 1]

+----------+--------------------+
|CustomerID|TotalCustomerRevenue|
+----------+--------------------+
|   14646.0|   279489.0199999999|
|   18102.0|  256438.49000000005|
|   17450.0|  187482.17000000013|
|   14911.0|   132572.6199999998|
|   12415.0|  123725.44999999987|
|   14156.0|  113384.13999999985|
|   17511.0|   88125.37999999996|
|   16684.0|   65892.07999999999|
|   13694.0|   62653.10000000003|
|   15311.0|   59419.34000000011|
+----------+--------------------+



                                                                                

In [49]:
# Count of Unique Transactions
spark.sql("""
SELECT 
    COUNT(DISTINCT InvoiceNo) AS UniqueTransactionCount
FROM Retail_data
""").show()

[Stage 50:>                                                         (0 + 1) / 1]

+----------------------+
|UniqueTransactionCount|
+----------------------+
|                 22190|
+----------------------+



                                                                                

In [50]:
## Average Revenue Per Transaction
spark.sql("""
SELECT 
    AVG(Revenue) AS AvgRevenuePerTransaction
FROM Retail_data
""").show()

[Stage 56:>                                                         (0 + 1) / 1]

+------------------------+
|AvgRevenuePerTransaction|
+------------------------+
|      20.401853884564613|
+------------------------+



                                                                                