In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spark = SparkSession.builder\
        .appName("Data Ingestion")\
        .getOrCreate()

schema = StructType([
    StructField("InvoiceNo", IntegerType(), nullable=False),
    StructField("StockCode", IntegerType(), nullable=False),
    StructField("Description", StringType(), nullable=False),
    StructField("Quantity", DoubleType(), nullable=False),
    StructField("InvoiceDate", StringType(), nullable=False),
    StructField("UnitPrice", DoubleType(), nullable=False),
    StructField("CustomerID", IntegerType(), nullable=False),
    StructField("Country", StringType(), nullable=False)
])

df = spark.read.format("csv")\
    .option("header", "true") \
    .option("inferSchema", "true") \
    .schema(schema)\
    .load("/workspaces/DataEngineering/Csvs/online_retail.csv")

df.show(10)

24/07/31 16:37:08 WARN Utils: Your hostname, codespaces-66f68e resolves to a loopback address: 127.0.0.1; using 10.0.1.77 instead (on interface eth0)
24/07/31 16:37:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/31 16:37:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity| InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+------------+---------+----------+--------------+
|   536365|     NULL|WHITE HANGING HEA...|     6.0|12/1/10 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|     6.0|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|     NULL|CREAM CUPID HEART...|     8.0|12/1/10 8:26|     2.75|     17850|United Kingdom|
|   536365|     NULL|KNITTED UNION FLA...|     6.0|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|     NULL|RED WOOLLY HOTTIE...|     6.0|12/1/10 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|     2.0|12/1/10 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|     6.0|12/1/10 8:26|     4.25|     17850|United

24/07/31 16:37:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [7]:
import shutil
#Delete if OutputCsv found already
file_path = "/workspaces/DataEngineering/OutputCsv/Country.csv"
shutil.rmtree(file_path)
#Create Partition Csv for each country
df.write.partitionBy("Country").csv(f"{file_path}")

                                                                                

In [8]:
#Define Schema
schema1 = StructType([
    StructField("InvoiceNo", IntegerType(), nullable=False),
    StructField("StockCode", IntegerType(), nullable=False),
    StructField("Description", StringType(), nullable=False),
    StructField("Quantity", DoubleType(), nullable=False),
    StructField("InvoiceDate", StringType(), nullable=False),
    StructField("UnitPrice", DoubleType(), nullable=False),
    StructField("CustomerID", IntegerType(), nullable=False),
])

In [9]:
#Get All the data according to country name

import os

country = "United Kingdom"

# Directory containing the files
directory_path = f"{file_path}/Country={country}"

# List all files in the directory that end with .csv
csv_files = [f for f in os.listdir(directory_path) if f.endswith('.csv')]

#Create single dataframe for mentioned country
for file in csv_files:
    tempDf = spark.read.schema(schema1).csv(f"{directory_path}/{file}", inferSchema=True)

tempDf.show()


+---------+---------+--------------------+--------+-------------+---------+----------+
|InvoiceNo|StockCode|         Description|Quantity|  InvoiceDate|UnitPrice|CustomerID|
+---------+---------+--------------------+--------+-------------+---------+----------+
|   563031|    21932|SCANDINAVIAN PAIS...|     2.0|8/11/11 14:38|     1.65|     13263|
|   563031|    23209|LUNCH BAG VINTAGE...|     1.0|8/11/11 14:38|     1.65|     13263|
|   563031|    22384|LUNCH BAG PINK PO...|     2.0|8/11/11 14:38|     1.65|     13263|
|   563031|    20727|LUNCH BAG  BLACK ...|     1.0|8/11/11 14:38|     1.65|     13263|
|   563031|    22383|LUNCH BAG SUKI DE...|     2.0|8/11/11 14:38|     1.65|     13263|
|   563031|    23208|LUNCH BAG VINTAGE...|     1.0|8/11/11 14:38|     1.65|     13263|
|   563031|    20725|LUNCH BAG RED RET...|     2.0|8/11/11 14:38|     1.65|     13263|
|   563031|    20728| LUNCH BAG CARS BLUE|     2.0|8/11/11 14:38|     1.65|     13263|
|   563031|    23207|LUNCH BAG ALPHABE...| 

In [10]:
#Get top 5 buyer in the Country
#Most expensive item in Country
#Least expensive item in the Country
#Average money spent by each buyer(remove null & negative outliers)

Get top 5 buyer in the country

In [11]:
#Drop null values in the dataset
tempDf = tempDf.dropna(subset=["Quantity", "UnitPrice", "CustomerID"])

In [12]:
#Get temp table
tableName = "TempTable"
tempDf.createOrReplaceTempView(tableName)

In [13]:
tempSql = spark.sql(f"SELECT CustomerID, SUM(Quantity*UnitPrice) AS TotalPrice FROM {tableName} GROUP BY CustomerID ORDER BY TotalPrice DESC LIMIT 5")
tempSql.show()



+----------+------------------+
|CustomerID|        TotalPrice|
+----------+------------------+
|     18102|129120.06999999996|
|     17450|119534.68999999994|
|     14096| 57120.91000000003|
|     17511| 35551.64999999998|
|     16684| 32770.05999999999|
+----------+------------------+



                                                                                

Most expensive item in the country

In [14]:
tempSql = spark.sql(f"SELECT Description, UnitPrice FROM {tableName} ORDER BY UnitPrice DESC LIMIT 1")
tempSql.show()

+-----------+---------+
|Description|UnitPrice|
+-----------+---------+
|     Manual|  3155.95|
+-----------+---------+



Least expensive item in the country

In [15]:
tempSql = spark.sql(f"SELECT Description, UnitPrice FROM {tableName} ORDER BY UnitPrice ASC LIMIT 1")
tempSql.show()

+--------------------+---------+
|         Description|UnitPrice|
+--------------------+---------+
|HANGING METAL HEA...|      0.0|
+--------------------+---------+



Average money spent by each buyer(remove null/0 & negative outliers).

In [17]:
#remove negative UnitPrice and Quantity

tempSql = tempDf.filter((tempDf.UnitPrice >= 0) & (tempDf.Quantity >= 0))

In [19]:
tempSql = spark.sql(f"SELECT CustomerID, AVG(Quantity*UnitPrice) AS AvgMoneySpent FROM {tableName} GROUP BY CustomerID")
tempSql.show()

+----------+------------------+
|CustomerID|     AvgMoneySpent|
+----------+------------------+
|     15727| 12.89504424778761|
|     16503|19.914117647058823|
|     17389|246.46129032258062|
|     12940| 8.507669902912621|
|     17679| 66.40366666666668|
|     16574|16.122857142857136|
|     16861|             25.74|
|     17420|23.035555555555554|
|     15957|10.211666666666664|
|     13623| 7.021000000000001|
|     17172|          19.57875|
|     18161|18.700666666666667|
|     14514|19.183529411764706|
|     16500|23.586000000000002|
|     18221|14.841176470588236|
|     15738|27.739649122807016|
|     15296|  18.1955294117647|
|     14837|15.194666666666668|
|     14420|12.862333333333334|
|     14997| 21.42777777777778|
+----------+------------------+
only showing top 20 rows

