## Import library

In [0]:
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Mount Blob

In [0]:
try:
    dbutils.fs.mount(
      source = "wasbs://input@projectdrug.blob.core.windows.net",
      mount_point = "/mnt/drug_project",
      extra_configs = {"my-key""})
except:
    print("Directory already mounted")
    

## Import csv

In [0]:
%fs ls /mnt/drug_project

path,name,size
dbfs:/mnt/drug_project/drug_client.csv,drug_client.csv,83126
dbfs:/mnt/drug_project/drug_date.csv,drug_date.csv,63942
dbfs:/mnt/drug_project/drug_product.csv,drug_product.csv,84221
dbfs:/mnt/drug_project/drug_project.csv,drug_project.csv,25700


In [0]:
df_client = spark.read.csv('/mnt/drug_project/drug_client.csv', header=True)
df_date = spark.read.csv('/mnt/drug_project/drug_date.csv', header=True)
df_product = spark.read.csv('/mnt/drug_project/drug_product.csv', header=True)


## Add column

In [0]:
df_date = df_date.withColumn('Date_difference',
      datediff("expiration_date", "transaction_date"))

## Cleansing and filtering date

In [0]:
df_date = df_date.where(F.col('date_difference') > 0)

In [0]:
df_client = df_client.where(F.col('transaction_completed')!= 'false')

## Rename columns

In [0]:
list_new_cols = [
    'Expiration_date',
    'Created_date',
    'Transaction_date'
]

list_old_cols = [
    'expiration_date',
    'created_date',
    'transaction_date'
]

for old_col, new_col in zip(list_old_cols, list_new_cols):
    df_date = df_date.withColumn(new_col, F.to_date(old_col))

In [0]:
list_new_cols = [
    'Product_number',
    'First_name',
    'Last_name',
    'Email',
    'Gender',
    'Country',
    'City',
    'Card_type',
    'Quantity',
    'Transaction_completed'
]
for old_col, new_col in zip(df_client.columns, list_new_cols):
    df_client = df_client.withColumnRenamed(old_col, new_col)

In [0]:
list_new_cols = [
    'Product_number',
    'Company',
    'Brand',
    'Name',
    'Price'
]
for old_col, new_col in zip(df_product.columns, list_new_cols):
    df_product = df_product.withColumnRenamed(old_col, new_col)

## Concatenate Columns

In [0]:
df_client = df_client.withColumn('Full_name', F.concat_ws(' ', df_client.Last_name, df_client.First_name ))

## Join tables

In [0]:
df = df_product.join(df_date, df_date.id==df_product.Product_number, how='inner')\
               .join(df_client, on='Product_number', how='inner')

## Add column

In [0]:
df = df.withColumn('Price', F.col('Price')*F.col('Quantity'))

## Groupby operation

In [0]:
columns = [
    'Product_number',
    'Company',
    'Name',
    'Full_name',
    'Transaction_date',
    'Quantity'
]
df= df.groupby(columns).sum('Price').orderBy('Product_number')

## Rename column

In [0]:
df = df.withColumnRenamed('sum(Price)', 'Price')

In [0]:
df.display()

Product_number,Company,Name,Full_name,Transaction_date,Quantity,Price
1,Jafra cosmetics International,hair minimizing antiperspirant deodorant roll-on,Dulwitch Eyde,2021-11-12,1,72.46
102,"Bath & Body Works, Inc.",Anti-Bacterial Deep Cleansing Hand,Pengilly Rycca,2022-03-16,3,139.35000000000002
105,"Breckenridge Pharmaceutical, Inc.",RISPERIDONE,Hembry Aleece,2021-12-04,4,408.96
106,GlaxoSmithKline LLC,ZOFRAN,Gredden Alec,2021-11-25,3,27.18
107,Kmart Corporation,smart sense allergy and congestion relief,D'eath Frances,2021-07-21,1,146.0
109,"Ventura Corporation (San Juan, P.R)",ESIKA,Heisman Creight,2022-03-12,3,476.49
11,Geritrex Corp.,Asper-Flex,Bailess Mel,2021-04-08,1,78.53
112,"Blenheim Pharmacal, Inc.",Phentermine Hydrochloride,Fenlon Trip,2021-07-23,3,366.3
114,7-11,7-11 Instant Hand Sanitizer With Aloe And Vitamin E,Onraet Cynde,2021-10-31,4,183.44
116,DIRECT RX,LORATADINE,Giraudat Elyn,2021-10-22,2,361.88


In [0]:
pandasDF = df.toPandas()
pandasDF.to_csv('/dbfs/mnt/drug_project/drug_project.csv')