In [3]:
from pyspark.sql import SparkSession
spark = SparkSession .builder.appName("final_assignment").getOrCreate()

In [4]:
df = spark.read.csv('TRANS_SRC_XLSX.csv', header = True , inferSchema = True)

In [5]:
df.columns

['TRANSACTION_ID',
 'TRANSACTION_DATE',
 'USER_FULL_NAME',
 'GENDER',
 'EMAIL_CONTACT',
 'PRODUCT_DESCRIPTION',
 'PRODUCT_UNIT_PRICE',
 'QUANTITY',
 'DISCOUNT_AMOUNT',
 'NET_SALE_VALUE',
 'REGION',
 'COUNTRY',
 'CAPITAL_CITY',
 'EVENT_LOG_TIMESTAMP']

In [6]:
df.printSchema() 

root
 |-- TRANSACTION_ID: double (nullable = true)
 |-- TRANSACTION_DATE: string (nullable = true)
 |-- USER_FULL_NAME: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- EMAIL_CONTACT: string (nullable = true)
 |-- PRODUCT_DESCRIPTION: string (nullable = true)
 |-- PRODUCT_UNIT_PRICE: integer (nullable = true)
 |-- QUANTITY: integer (nullable = true)
 |-- DISCOUNT_AMOUNT: integer (nullable = true)
 |-- NET_SALE_VALUE: integer (nullable = true)
 |-- REGION: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- CAPITAL_CITY: string (nullable = true)
 |-- EVENT_LOG_TIMESTAMP: string (nullable = true)



<h1 style="background-color:powderblue; text-align:center;"> First Task : Remove NULL values and store rows in Junk df </h1>

In [7]:
df_filter1 = df.dropna('any')
df_junk = df.subtract(df_filter1)

<h1 style="background-color:powderblue; text-align:center;">Second Task : Email Validation</h1>

In [8]:
from pyspark.sql.functions import when

In [9]:
df_test = df_filter1.withColumn('matched',when(df_filter1.EMAIL_CONTACT.rlike('^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$'),True).otherwise(False))

In [10]:
valid_email_df = df_test.filter(df_test.matched == True)

In [11]:
valid_email_df = valid_email_df.drop('matched')

In [12]:
invalid_email_df = df_test.filter(df_test.matched == False)

In [13]:
invalid_email_df_update = invalid_email_df.drop('matched')

In [14]:
junk_updated=df_junk.unionByName(invalid_email_df_update, allowMissingColumns=True)

In [15]:
junk_updated.count()

9317

In [16]:
junk_updated.show(5)

+--------------+--------------------+--------------+------+--------------------+-------------------+------------------+--------+---------------+--------------+--------------------+---------+------------+--------------------+
|TRANSACTION_ID|    TRANSACTION_DATE|USER_FULL_NAME|GENDER|       EMAIL_CONTACT|PRODUCT_DESCRIPTION|PRODUCT_UNIT_PRICE|QUANTITY|DISCOUNT_AMOUNT|NET_SALE_VALUE|              REGION|  COUNTRY|CAPITAL_CITY| EVENT_LOG_TIMESTAMP|
+--------------+--------------------+--------------+------+--------------------+-------------------+------------------+--------+---------------+--------------+--------------------+---------+------------+--------------------+
|    1.46493E12|2016-Jun-05 10:29:43|          null|  null|         H@gmail.com|         ELECTRICAL|               250|       1|              7|           243|WESTERN EUROPE   ...| Belgium |    Brussels|2016-Jun-03 10:29:43|
|    1.46493E12|2016-Jun-06 10:32:07|          null|  null|w.william.f.ford@...|               ARTS|

<h1 style="background-color:powderblue; text-align:center;">Third Task : Split Name Column</h1>

In [17]:
from pyspark.sql.functions import split
from pyspark.sql.functions import *

In [18]:
splitted_names = valid_email_df.withColumn("First Name", split(col("USER_FULL_NAME"), " ").getItem(0)).withColumn("Last Name", split(col("USER_FULL_NAME"), " ").getItem(1))

In [19]:
splitted_names = splitted_names.drop('USER_FULL_NAME')

<h1 style="background-color:powderblue; text-align:center;"> Fourth Task : Validate Gender Column </h1>

In [20]:
from pyspark.sql.functions import when
df_gender_modify = splitted_names.withColumn("GENDER", when(splitted_names.GENDER == "M","MALE")
                    .when(splitted_names.GENDER == "m","MALE")
                    .when(splitted_names.GENDER == "male","MALE")
                    .when(splitted_names.GENDER == "Male","MALE")
                    .when(splitted_names.GENDER == "f","FEMALE")
                    .when(splitted_names.GENDER == "F","FEMALE")
                    .when(splitted_names.GENDER == "Female","FEMALE")
                    .when(splitted_names.GENDER == "female","FEMALE")
                    .otherwise(splitted_names.GENDER))

In [21]:
df_gender_modify.show(3)

+--------------+--------------------+------+--------------------+-------------------+------------------+--------+---------------+--------------+--------------------+-------------+------------+--------------------+-----------+---------+
|TRANSACTION_ID|    TRANSACTION_DATE|GENDER|       EMAIL_CONTACT|PRODUCT_DESCRIPTION|PRODUCT_UNIT_PRICE|QUANTITY|DISCOUNT_AMOUNT|NET_SALE_VALUE|              REGION|      COUNTRY|CAPITAL_CITY| EVENT_LOG_TIMESTAMP| First Name|Last Name|
+--------------+--------------------+------+--------------------+-------------------+------------------+--------+---------------+--------------+--------------------+-------------+------------+--------------------+-----------+---------+
|    1.46493E12|2016-Jun-09 10:19:19|  MALE|m.millard.b.bucha...|               ARTS|                35|      10|              9|           341|C.W. OF IND. STATES |Turkmenistan |    Ashgabat|2016-Jun-03 10:19:19| Mr.Millard| Buchanan|
|    1.46493E12|2016-Jun-09 10:19:19|FEMALE|       m@hot

<h1 style="background-color:powderblue; text-align:center;"> Fifth Task : Split dates </h1>

In [22]:
from pyspark.sql.functions import year, month
from pyspark.sql.functions import to_date

In [23]:
true_df = df_gender_modify.withColumn("TRANSACTION_DATE",to_date(col("TRANSACTION_DATE"),"yyyy-LLL-dd HH:mm:ss"))

In [24]:
year_df = true_df.withColumn('Year',year(true_df.TRANSACTION_DATE))

In [25]:
month_df = year_df.withColumn('Month',month(year_df.TRANSACTION_DATE))

In [26]:
from pyspark.sql.functions import dayofmonth

day_df = month_df.withColumn('Day',dayofmonth(month_df.TRANSACTION_DATE))

In [27]:
from pyspark.sql.functions import weekofyear
 
week_df = day_df.withColumn('Week',weekofyear(day_df.TRANSACTION_DATE))

In [28]:
from pyspark.sql.functions import quarter

Final = week_df.withColumn('Quarter',quarter(week_df.TRANSACTION_DATE))


In [29]:
import pandas as pd

In [30]:
FinalDF = Final.toPandas()

In [31]:
# FinalDF.to_csv('final_clean.csv',index=False)

In [32]:
Junk = junk_updated.toPandas()

In [33]:
# Junk.to_csv('junk.csv', index = False)