In [None]:
# importing required packages
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType

In [None]:
# connecting to data storage and container
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "CLIENT-ID",
"fs.azure.account.oauth2.client.secret": 'SECRETKEY',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/TENANT-ID/oauth2/token"}


# mounting point
dbutils.fs.mount(
source = "abfss://online-sports-retail-data@onlinesportsretail.dfs.core.windows.net",
mount_point = "/mnt/retaildata",
extra_configs = configs)

In [None]:
%fs
ls "/mnt/retaildata"

path,name,size,modificationTime
dbfs:/mnt/retaildata/raw data/,raw data/,0,1694956456000
dbfs:/mnt/retaildata/transformed data/,transformed data/,0,1694956466000


In [None]:
#reading all csv
brand = spark.read.format("csv").option("header","true").load("/mnt/retaildata/raw data/brand.csv")
finance = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/retaildata/raw data/finance.csv")
productinfo = spark.read.format("csv").option("header","true").load("/mnt/retaildata/raw data/productinfo.csv")
reviews = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/retaildata/raw data/reviews.csv")
traffic = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/mnt/retaildata/raw data/traffic.csv")

In [None]:
# checking the brand data
brand.show(5)

+----------+-------------------+
|product_id|       last_visited|
+----------+-------------------+
|    AH2430|2018-05-19 15:13:00|
|    G27341|2018-11-29 16:16:00|
|    CM0081|2018-02-01 10:27:00|
|    B44832|2018-09-07 20:06:00|
|    D98205|2019-07-18 15:26:00|
+----------+-------------------+
only showing top 5 rows



In [None]:
# checking the finance data
finance.show(5)

+----------+-------------+----------+--------+-------+
|product_id|listing_price|sale_price|discount|revenue|
+----------+-------------+----------+--------+-------+
|    AH2430|         NULL|      NULL|    NULL|   NULL|
|    G27341|        75.99|     37.99|     0.5|1641.17|
|    CM0081|         9.99|      5.99|     0.4| 398.93|
|    B44832|        69.99|     34.99|     0.5|2204.37|
|    D98205|        79.99|     39.99|     0.5| 5182.7|
+----------+-------------+----------+--------+-------+
only showing top 5 rows



In [None]:
#checking data type
finance.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- listing_price: double (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- revenue: double (nullable = true)



In [None]:
# or change while reading the data using "inferSchema","true"
#finance = finance.withColumn("listing_price",col("listing_price").cast(DoubleType()))


In [None]:
# checking the productinfo data
productinfo.show(5)

+--------------------+----------+--------------------+
|        product_name|product_id|         description|
+--------------------+----------+--------------------+
|                NULL|    AH2430|                NULL|
|Women's adidas Or...|    G27341|A modern take on ...|
|Women's adidas Sw...|    CM0081|These adidas Puka...|
|Women's adidas Sp...|    B44832|Inspired by moder...|
|Women's adidas Or...|    D98205|This design is in...|
+--------------------+----------+--------------------+
only showing top 5 rows



In [None]:
# checking review data
reviews.show(5)

+----------+------+-------+
|product_id|rating|reviews|
+----------+------+-------+
|    AH2430|  NULL|   NULL|
|    G27341|   3.3|   24.0|
|    CM0081|   2.6|   37.0|
|    B44832|   4.1|   35.0|
|    D98205|   3.5|   72.0|
+----------+------+-------+
only showing top 5 rows



In [None]:
# checking review datatype
reviews.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- reviews: double (nullable = true)



In [None]:
# checking traffic data
traffic.show(5)

+----------+-------------------+
|product_id|       last_visited|
+----------+-------------------+
|    AH2430|2018-05-19 15:13:00|
|    G27341|2018-11-29 16:16:00|
|    CM0081|2018-02-01 10:27:00|
|    B44832|2018-09-07 20:06:00|
|    D98205|2019-07-18 15:26:00|
+----------+-------------------+
only showing top 5 rows



In [None]:
# checking traffic datatype
traffic.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- last_visited: timestamp (nullable = true)



In [None]:
# Find the top selling product in finance table
#most_reviewed_product = reviews.orderBy("reviews", ascending=False).select("product_id","reviews").show(10)
top_selling_product = finance.orderBy("revenue", ascending=False).select("product_id","revenue").show(10)

+----------+--------+
|product_id| revenue|
+----------+--------+
|310805-137|64203.93|
|    FV7826|37150.45|
|    FV6794|34990.54|
|    EG5185|33838.31|
|    EF9623|31246.88|
|    EF0893|30454.31|
|    EG0761|29986.24|
|    EF2335|29698.65|
|    EG1071| 28834.4|
|    EG5933|28762.31|
+----------+--------+
only showing top 10 rows



In [None]:
# updating transformed file back in data storage
brand.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/retaildata/transformed data/brand")
finance.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/retaildata/transformed data/finance")
productinfo.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/retaildata/transformed data/productinfo")
reviews.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/retaildata/transformed data/reviews")
traffic.repartition(1).write.mode("overwrite").option("header",'true').csv("/mnt/retaildata/transformed data/traffic")