In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,mean,lower,trim,to_date,when

In [2]:
spark = SparkSession.builder \
    .appName('Transformation')\
    .master('local[*]')\
    .getOrCreate()
print('spark session was initialized successfully')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/01 09:13:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


spark session was initialized successfully


In [3]:
raw_data = spark.read.csv('/home/sinatavakoli2022/Retail-Data-Pipeline/Airflow/data/retail_raw_data.csv',header=True)
df = pd.read_csv('/home/sinatavakoli2022/Retail-Data-Pipeline/Airflow/data/retail_raw_data.csv')

In [4]:
raw_data.printSchema()

root
 |-- Transaction_ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Income: string (nullable = true)
 |-- Customer_Segment: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Total_Purchases: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Total_Amount: string (nullable = true)
 |-- Product_Category: string (nullable = true)
 |-- Product_Brand: string (nullable = true)
 |-- Product_Type: string (nullable = true)
 |-- Feedback: string (nullable = tr

In [5]:
raw_data.show(5,truncate=False)

25/04/01 09:13:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------+-----------+-------------------+-------------------+------------+---------------------------+----------+---------------+-------+---------+----+------+------+----------------+----------+------+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+-----------------+
|Transaction_ID|Customer_ID|Name               |Email              |Phone       |Address                    |City      |State          |Zipcode|Country  |Age |Gender|Income|Customer_Segment|Date      |Year  |Month    |Time    |Total_Purchases|Amount     |Total_Amount|Product_Category|Product_Brand|Product_Type|Feedback |Shipping_Method|Payment_Method|Order_Status|Ratings|products         |
+--------------+-----------+-------------------+-------------------+------------+---------------------------+----------+---------------+-------+---------+----+------+------+----------------+----------+------+------

In [6]:
casted_data = raw_data.select(

    col('Transaction_ID').cast('long'),
    col('Customer_ID').cast('long'),
    col('Name'),
    col('Email'),
    col('Phone').cast('string'),
    col('Address'),
    col('City'),
    col('State'),
    col('Zipcode'),
    col('Country'),
    col('Age').cast('integer'),
    col('Gender'),
    col('Income'),
    col('Customer_Segment'),
    to_date(col('Date'), 'M/d/yyyy').alias('Date'),
    col('Year').cast('integer'),
    col('Month'),
    col('Time'),
    col('Total_Purchases').cast('integer'),
    col('Amount').cast('double'),
    col('Total_Amount').cast('double'),
    col('Product_Category'),
    col('Product_Brand'),
    col('Product_Type'),
    col('Feedback'),
    col('Shipping_Method'),
    col('Payment_Method'),
    col('Order_Status'),
    col('Ratings').cast('double'),
    col('products')
)

In [7]:
casted_data.show(5)

+--------------+-----------+-------------------+-------------------+------------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------+---------------+--------------+------------+-------+-----------------+
|Transaction_ID|Customer_ID|               Name|              Email|       Phone|             Address|      City|          State|Zipcode|  Country|Age|Gender|Income|Customer_Segment|      Date|Year|    Month|    Time|Total_Purchases|     Amount|Total_Amount|Product_Category|Product_Brand|Product_Type| Feedback|Shipping_Method|Payment_Method|Order_Status|Ratings|         products|
+--------------+-----------+-------------------+-------------------+------------+--------------------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-

In [8]:
df.isnull().sum()

Transaction_ID      333
Customer_ID         308
Name                382
Email               347
Phone               362
Address             315
City                248
State               281
Zipcode             340
Country             271
Age                 173
Gender              317
Income              290
Customer_Segment    215
Date                359
Year                350
Month               273
Time                350
Total_Purchases     361
Amount              357
Total_Amount        350
Product_Category    283
Product_Brand       281
Product_Type          0
Feedback            184
Shipping_Method     337
Payment_Method      297
Order_Status        235
Ratings             184
products              0
dtype: int64

In [9]:
raw_data =raw_data.dropDuplicates()

In [10]:
age_average = int(df['Age'].mean())
rating_average = int(df['Ratings'].mean())

In [11]:
clean_data = casted_data.dropna(subset=['Transaction_ID','Customer_ID','Amount','Total_Amount'])
clean_data = clean_data.fillna({'Ratings':rating_average,'City':'Unknown','State':'Unknown','Country':'Unknown','Gender':'Unknown','Income':'Unknown',
                               'Customer_Segment':'Unknown','Product_Category':'Unknown','Product_Brand':'Unknown','Shipping_Method':'Unknown','Payment_Method':'Unknown','Age':age_average})

In [12]:
clean_data = clean_data.drop('Name','Email','Phone','Address','Feedback','Order_Status')

In [13]:
clean_data.show(5)

+--------------+-----------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------------+--------------+-------+-----------------+
|Transaction_ID|Customer_ID|      City|          State|Zipcode|  Country|Age|Gender|Income|Customer_Segment|      Date|Year|    Month|    Time|Total_Purchases|     Amount|Total_Amount|Product_Category|Product_Brand|Product_Type|Shipping_Method|Payment_Method|Ratings|         products|
+--------------+-----------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------------+--------------+-------+-----------------+
|       8691788|      37249|  Dortmund|         Berlin|77985.0|  Germany| 21|  Male|   Low|         Regular|2023-09-18|2023|September|22:03:55

In [14]:
clean_data = clean_data.filter(col('Amount') >= 0)
clean_data = clean_data.filter(col('Total_Amount') >=0 )
clean_data = clean_data.filter((col('Age') >=0 )&(col('Age') <= 100))
clean_data = clean_data.filter((col('Ratings') >=1 )&(col('Ratings') <= 5))

In [15]:
clean_data = clean_data.withColumn('City',lower(trim(col('City'))))
clean_data = clean_data.withColumn('State',lower(trim(col('State'))))
clean_data = clean_data.withColumn('Country',lower(trim(col('Country'))))
clean_data = clean_data.withColumn('Gender',lower(trim(col('Gender'))))
clean_data = clean_data.withColumn('Income',lower(trim(col('Income'))))
clean_data = clean_data.withColumn('Customer_Segment',lower(trim(col('Customer_Segment'))))
clean_data = clean_data.withColumn('Product_Category',lower(trim(col('Product_Category'))))
clean_data = clean_data.withColumn('Product_Brand',lower(trim(col('Product_Brand'))))
clean_data = clean_data.withColumn('Product_Type',lower(trim(col('Product_Type'))))
clean_data = clean_data.withColumn('Shipping_Method',lower(trim(col('Shipping_Method'))))
clean_data = clean_data.withColumn('Payment_Method',lower(trim(col('Payment_Method'))))
clean_data = clean_data.withColumn('products',lower(trim(col('products'))))

In [16]:
def to_snake_case(column_name):
    return column_name.lower().replace(' ','_').replace('-','_')

final_clean_data = clean_data.select(
    [col(column).alias(to_snake_case(column)) for column in clean_data.columns]
)

In [17]:
final_clean_data.show(5)

+--------------+-----------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------------+--------------+-------+-----------------+
|transaction_id|customer_id|      city|          state|zipcode|  country|age|gender|income|customer_segment|      date|year|    month|    time|total_purchases|     amount|total_amount|product_category|product_brand|product_type|shipping_method|payment_method|ratings|         products|
+--------------+-----------+----------+---------------+-------+---------+---+------+------+----------------+----------+----+---------+--------+---------------+-----------+------------+----------------+-------------+------------+---------------+--------------+-------+-----------------+
|       8691788|      37249|  dortmund|         berlin|77985.0|  germany| 21|  male|   low|         regular|2023-09-18|2023|September|22:03:55

In [18]:
final_clean_data.printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- city: string (nullable = false)
 |-- state: string (nullable = false)
 |-- zipcode: string (nullable = true)
 |-- country: string (nullable = false)
 |-- age: integer (nullable = false)
 |-- gender: string (nullable = false)
 |-- income: string (nullable = false)
 |-- customer_segment: string (nullable = false)
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- time: string (nullable = true)
 |-- total_purchases: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- product_category: string (nullable = false)
 |-- product_brand: string (nullable = false)
 |-- product_type: string (nullable = true)
 |-- shipping_method: string (nullable = false)
 |-- payment_method: string (nullable = false)
 |-- ratings: double (nullable = false)
 |-- products: string (nullable = true)



In [19]:
final_clean_data.repartition(1).write.options(header='true',delimiter=',').csv('/home/sinatavakoli2022/Retail-Data-Pipeline/Airflow/data/final_clean_data.csv',mode='overwrite')

                                                                                