

# <center> <font color="red">Data Cleansing using PySpark </font></center>

## Initializing and Checking for Path variables

In [18]:
import findspark
findspark.init()
findspark.find()
import pyspark

##  Importing Required Packages

In [2]:
from datetime import datetime
from pyspark.sql.types import DateType,TimestampType,StringType
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql import SQLContext

## Initializing Spark & Sql sessions

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('PySparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

## Reading Source file 

In [4]:
hp_input_df = spark.read.csv("hp.csv",inferSchema = True, header = True)

## Schema of the source file

In [5]:
hp_input_df.printSchema()

root
 |-- vendor: integer (nullable = true)
 |-- seqRideID: integer (nullable = true)
 |-- seqRideLegID: integer (nullable = true)
 |-- status: integer (nullable = true)
 |-- rideDate: string (nullable = true)
 |-- primaryRiderExternalMemberID: integer (nullable = true)
 |-- primaryRiderLastName: string (nullable = true)
 |-- primaryRiderFirstName: string (nullable = true)
 |-- primaryRiderMiddleInitial: string (nullable = true)
 |-- cabRideType: integer (nullable = true)
 |-- pickupDate: string (nullable = true)
 |-- appointmentDate: string (nullable = true)
 |-- fromFacilityName: string (nullable = true)
 |-- fromStreet1: string (nullable = true)
 |-- fromStreet2: string (nullable = true)
 |-- fromCity: string (nullable = true)
 |-- fromStateDescription: string (nullable = true)
 |-- fromZip: string (nullable = true)
 |-- fromCounty: string (nullable = true)
 |-- fromPhone: long (nullable = true)
 |-- toFacilityName: string (nullable = true)
 |-- toStreet1: string (nullable = true)
 

## String to date formatting

In [6]:
hp_output_df = hp_input_df.withColumn('DateOfBirth', regexp_replace('primaryRiderDateOfBirth', '-', '/'))
func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
hp_output_df = hp_output_df.withColumn('DOB', func(col('DateOfBirth')))
hp_output_df.show()


+------+---------+------------+------+----------+----------------------------+--------------------+---------------------+-------------------------+-----------+----------------+----------------+--------------------+--------------------+-----------+-----------+--------------------+----------+----------+----------+--------------------+--------------------+---------+-----------+------------------+----------+---------+----------+--------------+----------------+--------------------+--------+--------------------+-------------------+----------------+----------------------------+---------------+------------------+-----------------+------------------+-----------------------+--------------------+-----------+----------+
|vendor|seqRideID|seqRideLegID|status|  rideDate|primaryRiderExternalMemberID|primaryRiderLastName|primaryRiderFirstName|primaryRiderMiddleInitial|cabRideType|      pickupDate| appointmentDate|    fromFacilityName|         fromStreet1|fromStreet2|   fromCity|fromStateDescription|  

##  String to date-time formatting

In [7]:
hp_output_df = hp_output_df.withColumn('PT1', regexp_replace('pickupDate', '-', '/'))
hp_output_df = hp_output_df.withColumn('PT2', regexp_replace('PT1', '  ', ' '))
func1 =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M'),TimestampType())
new_column_1 = expr(
    """IF(PT1 IS NULL, '01/01/1971 00:01', PT1)"""
)
hp_output_df = hp_output_df.withColumn('PT2', new_column_1)
hp_output_df = hp_output_df.withColumn('Pickup Time', func1(col('PT2')))
hp_output_df.show()


+------+---------+------------+------+----------+----------------------------+--------------------+---------------------+-------------------------+-----------+----------------+----------------+--------------------+--------------------+-----------+-----------+--------------------+----------+----------+----------+--------------------+--------------------+---------+-----------+------------------+----------+---------+----------+--------------+----------------+--------------------+--------+--------------------+-------------------+----------------+----------------------------+---------------+------------------+-----------------+------------------+-----------------------+--------------------+-----------+----------+----------------+----------------+-------------------+
|vendor|seqRideID|seqRideLegID|status|  rideDate|primaryRiderExternalMemberID|primaryRiderLastName|primaryRiderFirstName|primaryRiderMiddleInitial|cabRideType|      pickupDate| appointmentDate|    fromFacilityName|         fromS

In [8]:
hp_output_df = hp_output_df.withColumn('AT1', regexp_replace('appointmentDate', '-', '/'))
hp_output_df = hp_output_df.withColumn('AT2', regexp_replace('AT1', '  ', ' '))
func1 =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M'),TimestampType())
new_column_1 = expr(
    """IF(AT1 IS NULL, '01/01/1971 00:01', AT1)"""
)
hp_output_df = hp_output_df.withColumn('AT2', new_column_1)
hp_output_df = hp_output_df.withColumn('Appointment Date', func1(col('AT2')))
hp_output_df.show()


+------+---------+------------+------+----------+----------------------------+--------------------+---------------------+-------------------------+-----------+----------------+----------------+--------------------+--------------------+-----------+-----------+--------------------+----------+----------+----------+--------------------+--------------------+---------+-----------+------------------+----------+---------+----------+--------------+----------------+--------------------+--------+--------------------+-------------------+----------------+----------------------------+---------------+------------------+-----------------+------------------+-----------------------+--------------------+-----------+----------+----------------+----------------+-------------------+----------------+----------------+-------------------+
|vendor|seqRideID|seqRideLegID|status|  rideDate|primaryRiderExternalMemberID|primaryRiderLastName|primaryRiderFirstName|primaryRiderMiddleInitial|cabRideType|      pickupDat

## Mapping Source to target required schema

In [9]:
hp_output_df = hp_output_df.select("*", concat(col("primaryRiderLastName"), lit(" "), col("primaryRiderFirstName"), lit(" "), col("primaryRiderMiddleInitial")).alias("ClientName"))
hp_output_df = hp_output_df.withColumnRenamed("primaryRiderExternalMemberID", "InsuranceNumber")
hp_output_df = hp_output_df.withColumnRenamed("fromStreet1", "Pickup")
hp_output_df = hp_output_df.withColumnRenamed("fromCity","PickupCity")
hp_output_df = hp_output_df.withColumnRenamed("fromStateDescription","PickupState")
hp_output_df = hp_output_df.withColumnRenamed("fromZip", "PickupZip")
hp_output_df = hp_output_df.withColumnRenamed("toStreet1", "Dropoff")
hp_output_df = hp_output_df.withColumnRenamed("toCity","DropoffCity")
hp_output_df = hp_output_df.withColumnRenamed("toStateDescription","DropoffState")
hp_output_df = hp_output_df.withColumnRenamed("toZip", "DropoffZip")
hp_output_df = hp_output_df.withColumnRenamed("primaryRiderPhone", "PhoneNumber")

## Cleaning data

In [10]:
hp_output_df = hp_output_df.withColumn('Pickup Time', regexp_replace('Pickup Time', '1971-01-01 00:01:00', ''))
hp_output_df.show()

+------+---------+------------+------+----------+---------------+--------------------+---------------------+-------------------------+-----------+----------------+----------------+--------------------+--------------------+-----------+-----------+-----------+----------+----------+----------+--------------------+--------------------+---------+-----------+------------+----------+---------+----------+--------------+----------------+--------------------+--------+--------------------+-------------------+----------------+----------------------------+---------------+------------------+-----------+------------------+-----------------------+--------------------+-----------+----------+----------------+----------------+-------------------+----------------+----------------+-------------------+-------------------+
|vendor|seqRideID|seqRideLegID|status|  rideDate|InsuranceNumber|primaryRiderLastName|primaryRiderFirstName|primaryRiderMiddleInitial|cabRideType|      pickupDate| appointmentDate|    from

In [11]:
hp_output_df = hp_output_df.withColumn('Appointment Date', regexp_replace('Appointment Date', '1971-01-01 00:01:00', ''))
hp_output_df.show()

+------+---------+------------+------+----------+---------------+--------------------+---------------------+-------------------------+-----------+----------------+----------------+--------------------+--------------------+-----------+-----------+-----------+----------+----------+----------+--------------------+--------------------+---------+-----------+------------+----------+---------+----------+--------------+----------------+--------------------+--------+--------------------+-------------------+----------------+----------------------------+---------------+------------------+-----------+------------------+-----------------------+--------------------+-----------+----------+----------------+----------------+-------------------+----------------+----------------+-------------------+-------------------+
|vendor|seqRideID|seqRideLegID|status|  rideDate|InsuranceNumber|primaryRiderLastName|primaryRiderFirstName|primaryRiderMiddleInitial|cabRideType|      pickupDate| appointmentDate|    from

## Accesing Required columns from Dataframe

In [12]:
col_names=['InsuranceNumber','ClientName','Pickup Time','Appointment Date',
           'Pickup','PickupCity','PickupState','PickupZip',
           'Dropoff','DropoffCity','DropoffState','DropoffZip',
          'PhoneNumber','DOB']
final_op_df=hp_output_df.select(col_names)

## Printing Target output 

In [13]:
final_op_df.show(25)

+---------------+-------------------+-------------------+-------------------+--------------------+-----------+-----------+----------+--------------------+-----------+------------+----------+-----------+----------+
|InsuranceNumber|         ClientName|        Pickup Time|   Appointment Date|              Pickup| PickupCity|PickupState| PickupZip|             Dropoff|DropoffCity|DropoffState|DropoffZip|PhoneNumber|       DOB|
+---------------+-------------------+-------------------+-------------------+--------------------+-----------+-----------+----------+--------------------+-----------+------------+----------+-----------+----------+
|       30773714|      NISKA ELLEN M|2020-02-07 07:50:00|2020-02-07 08:20:00|3 14TH AVE NE APT...|SAINT CLOUD|         MN|56304-4536|1900 CentraCare C...|Saint Cloud|          MN|56303-5000| 3202913707|1967-03-05|
|       30773714|      NISKA ELLEN M|                   |                   |1900 CentraCare C...|Saint Cloud|         MN|56303-5000|3 14TH AVE 

## Creating formatted file 

In [15]:
final_op_df.coalesce(1).write.format('com.databricks.spark.csv').save("Output/hp_formatted_file.csv",header = 'true')