In [20]:
from pyspark.sql.types import DoubleType,StructType, StringType, IntegerType, DateType
import pyspark.sql.functions as F

schema = StructType() \
         .add("InvoiceNo", StringType(), True) \
         .add("StockCode", StringType(), True) \
         .add("Description", StringType(), True) \
         .add("Quantity", IntegerType(), True) \
         .add("InvoiceDate", DateType(), True) \
         .add("UnitPrice", DoubleType(), True) \
         .add("CustomerID", IntegerType(), True) \
         .add("Country", StringType(), True)

# Blob File API
input_path = "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/raw"

dataSet = spark.read.format("csv") \
                .option("header", True) \
                .schema(schema) \
                .option("dateFormat", "MM/dd/yyyy HH:mm")\
                .load(input_path)
                
dataSet.printSchema()
dataSet.show(2)

StatementMeta(SparkPool, 4, 1, Finished, Available)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
only showing top 2 rows

In [None]:
ecommerceDF = dataSet\
       .filter("Quantity IS NOT NULL")\
       .filter("UnitPrice IS NOT NULL")\
       .filter("InvoiceNo IS NOT NULL")\
       .filter("Country IS NOT NULL")



StatementMeta(, , , Waiting, )

In [None]:
# copy the content as parquet into a folder, with default paritions
output_path =  "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/parquet-default-partitions"

ecommerceDF.write.mode("overwrite").parquet(output_path)

In [None]:
# coalesce , reduce the partitions
output_path =  "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/parquet-one-partition"
# all data shall be moved into 1 parition, then the final 1 partition is written to storage
ecommerceDF.coalesce(1).write.mode("overwrite").parquet(output_path)

In [11]:
ecommerceDF.select("Country").distinct().show(100)

output_path =  "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/parquet-countries"

# write the DF into blob storage with partition by Country, this will create many folders
# as per country code, place the content in respective country
ecommerceDF.write.partitionBy("Country").mode("overwrite").parquet(output_path)


StatementMeta(SparkPool, 1, 11, Finished, Available)

+--------------------+
|             Country|
+--------------------+
|              Sweden|
|           Singapore|
|             Germany|
|                 RSA|
|              France|
|              Greece|
|  European Community|
|             Belgium|
|             Finland|
|               Malta|
|         Unspecified|
|               Italy|
|                EIRE|
|           Lithuania|
|              Norway|
|               Spain|
|             Denmark|
|           Hong Kong|
|             Iceland|
|              Israel|
|     Channel Islands|
|                 USA|
|              Cyprus|
|        Saudi Arabia|
|         Switzerland|
|United Arab Emirates|
|              Canada|
|      Czech Republic|
|              Brazil|
|             Lebanon|
|               Japan|
|              Poland|
|            Portugal|
|           Australia|
|             Austria|
|             Bahrain|
|      United Kingdom|
|         Netherlands|
+--------------------+

In [17]:
# ecommerceDF.select("InvoiceDate").distinct().show(100)

ecommerceWithDatesDF = ecommerceDF\
                       .withColumn("Year", F.year("InvoiceDate"))\
                       .withColumn("Month", F.month("InvoiceDate"))\
                       .withColumn("Day", F.dayofmonth("InvoiceDate"))

 
ecommerceWithDatesDF.printSchema()

output_path =  "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/parquet-dates"


# write the DF into blob storage with partition by "Year", "Month", "Day", this will create many folders
# as per country code, place the content in respective "Year", "Month", "Day"
ecommerceWithDatesDF.coalesce(1).write.partitionBy("Year", "Month", "Day").mode("overwrite").parquet(output_path)


output_path =  "abfss://ecommerce@deloittesynapasestorage.dfs.core.windows.net/parquet-country-dates"


# write the DF into blob storage with partition by Country,, "Year", "Month", "Day"
# as per country code, place the content in respective country, "Year", "Month", "Day"
ecommerceWithDatesDF.coalesce(1).write.partitionBy("Country", "Year", "Month", "Day").mode("overwrite").parquet(output_path)


StatementMeta(SparkPool, 1, 17, Finished, Available)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)