#### Import library

In [0]:
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

#### Mount Blob

In [0]:
try :
  dbutils.fs.mount(
  source = "wasbs://input@placetostorefiles.blob.core.windows.net",
  mount_point = "/mnt/car-demo",
  extra_configs = {"<conf-key>"})
except:
    print("Directory already mounted")

#### Import csv

In [0]:
df_trasaction = spark.read.csv('/mnt/car-demo/transaction.csv', header=True)
df_product = spark.read.csv('/mnt/car-demo/product.csv', header=True)
df_store = spark.read.csv('/mnt/car-demo/store.csv', header=True)

#### Cleansing and filtering date

In [0]:
df_trasaction = df_trasaction.where(F.col('transaction_completed') != 'false')

#### Rename columns

In [0]:
list_of_new_cols = [
  'date',
  'Product_number',
  'quantity',
  'card_type',
  'transaction_completed'
]
for old_col, new_col in zip(df_trasaction.columns, list_of_new_cols):
    df_trasaction = df_trasaction.withColumnRenamed(old_col, new_col)

In [0]:
list_of_new_cols = [
  'Product_number',
  'Category',
  'Subcategory',
  'gender',
  'Price',
  'Year',
  'date_p'
]
for old_col, new_col in zip(df_product.columns, list_of_new_cols):
    df_product = df_product.withColumnRenamed(old_col, new_col)

In [0]:
list_of_new_cols = [
  'Product_number',
  'Name',
  'Country',
  'ode',
  'City',
]
for old_col, new_col in zip(df_store.columns, list_of_new_cols):
    df_store = df_store.withColumnRenamed(old_col, new_col)

#### Replace column

In [0]:
df_trasaction = df_trasaction.withColumn('date', F.to_date('date').alias('Date'))

#### Join tables

In [0]:
newdf = df_trasaction.join(df_product, on='Product_number', how='inner')\
                     .join(df_store, on='Product_number', how='inner')

#### Add column

In [0]:
cols = [
  'Product_number',
  'Name',
  'Category',
  'Subcategory',
  'Year',
  'Date' 
]

newdf = newdf.withColumn('Price', F.col('Price')*F.col('quantity'))

#### Groupby operation

In [0]:
newdf= newdf.groupby(cols)\
             .sum('Price')

#### Rename column

In [0]:
newdf = newdf.withColumnRenamed('sum(Price)', 'Price')

In [0]:
display(newdf)

Product_number,Name,Category,Subcategory,Year,Date,Price
12,Topicshots,Mitsubishi,Lancer,2006,2021-04-05,66023.37
108,Zoombeat,Ford,E-Series,1993,2021-09-05,337388.38
499,Skynoodle,Volkswagen,Jetta III,1994,2021-06-03,140311.9
699,Edgewire,Aston Martin,V8 Vantage,2010,2021-03-21,45489.6
900,Vinder,Infiniti,JX,2013,2021-09-29,270212.39
815,Abatz,Dodge,Intrepid,1994,2021-04-10,190123.22
18,Dynabox,Mercury,Mountaineer,2001,2021-10-29,486009.5
54,Youspan,GMC,Yukon XL 1500,2013,2021-08-04,36369.8
222,Feednation,Volvo,V70,2001,2021-11-14,139088.06
326,Tazzy,Infiniti,G37,2011,2022-01-28,631439.68


#### Convert PySpark DataFrame to Pandas and write csv to azure blob storage

In [0]:
pandasDF = newdf.toPandas()
pandasDF.to_csv('/dbfs/mnt/car-demo/result.csv')