## Import library

In [1]:
import pyspark
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

## Create spark

In [3]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

## Import csv

In [4]:
df_date = spark.read.csv('drug_date.csv',  header=True)
df_product = spark.read.csv('drug_product.csv',  header=True)
df_client = spark.read.csv('drug_client.csv',  header=True)

## Add column

In [5]:
df_date = df_date.withColumn('Date_difference',
      datediff("expiration_date", "transaction_date"))

## Cleansing and filtering date

In [6]:
df_date = df_date.where(F.col('Date_difference')>0)

In [7]:
df_client = df_client.where(F.col('transaction_completed') == 'true')

## Rename columns

In [8]:
list_new_cols = [
    'Product_number',
    'Expiration_date',
    'Created_date',
    'Transaction_date'
]
for old_col, new_col in zip(df_date.columns, list_new_cols):
    df_date = df_date.withColumnRenamed(old_col, new_col)

In [9]:
list_new_cols = [
    'Product_number',
    'First_name',
    'Last_name',
    'Email',
    'Gender',
    'Country',
    'City',
    'Card_type',
    'Quantity',
    'Transaction_completed'
]
for old_col, new_col in zip(df_client.columns, list_new_cols):
    df_client = df_client.withColumnRenamed(old_col, new_col)

In [10]:
list_new_cols = [
    'Product_number',
    'Company',
    'Brand',
    'Name',
    'Price'
]
for old_col, new_col in zip(df_product.columns, list_new_cols):
    df_product = df_product.withColumnRenamed(old_col, new_col)

## Concatenate columns

In [11]:
df_client = df_client.withColumn('Full_name', F.concat_ws(' ', df_client.First_name, df_client.Last_name))

## Join tables

In [12]:
data_frame = df_product.join(df_client, on='Product_number', how='inner')\
                       .join(df_date, on='Product_number', how='inner')

## Add column

In [13]:
data_frame = data_frame.withColumn('Price_', F.col('Price')*F.col('Quantity'))

In [14]:
data_frame = data_frame.withColumn("Date", F.concat_ws('-', year( "Transaction_date"), month( "Transaction_date")))

## Delete columns

In [15]:
colf_to_drop = ['First_name', 'Last_name', 'Price']
data_frame = data_frame.drop(*colf_to_drop)

## Groupby operation

In [16]:
columns = [
    'Date',
    'Company',
]
data_frame = data_frame.groupby(columns).sum('Price_').orderBy('Date')

In [17]:
data_frame = data_frame.withColumnRenamed('sum(Price_)', 'Price')

## Convert PySpark DataFrame to Pandas

In [18]:
data_frame = data_frame.toPandas()

In [19]:
data_frame.head(10)

Unnamed: 0,Date,Company,Price
0,2021-10,Colgate-Palmolive Company,44.88
1,2021-10,Coltene Whaledent Inc,24.15
2,2021-10,The Kroger Co.,503.9
3,2021-10,Major Pharmaceuticals,123.88
4,2021-10,7-11,183.44
5,2021-10,Neutrogena Corporation,97.45
6,2021-10,Apotheca Company,416.75
7,2021-10,DIRECT RX,361.88
8,2021-10,"Allergan, Inc.",52.4
9,2021-10,Bristol-Myers Squibb Pharma Company,237.46


##  Write csv  

In [20]:
data_frame.to_csv('drug_pyspark.csv')