In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('cleaner').getOrCreate()

In [3]:
customer_df = spark.read.format('csv')\
                .option('inferSchema','true')\
                .option('header', 'true')\
                .load('./dim_customer_dirty.csv')

customer_df.show(10)

+-----------+----------+-----------------+--------------------+-----------------+--------------+---------------------+
|customer_id|first_name|        last_name|             address|          pincode|  phone_number|customer_joining_date|
+-----------+----------+-----------------+--------------------+-----------------+--------------+---------------------+
|     4986.0|   Balveer|        Choudhury|31/30, Devan Naga...|         376853.0|913000656779.0|           2021-04-20|
|     6642.0|    Kalpit|            Divan|15, Zacharia Naga...|         331701.0|911454391537.0|           2022-05-14|
|     7763.0|   Dominic|    Radhakrishnan|834, Rajagopal Ro...|         351147.0|  1346045138.0|           2022-02-28|
|     2420.0|    Raghav|        Ramaswamy|H.No. 24, Memon P...|         668002.0|  6956574158.0|           2022-01-21|
|     8576.0|     Zinal|     Venkataraman|832, Majumdar Roa...|             null|  3399052109.0|           2022-07-20|
|     1637.0|     Anita|           Barman|H.No. 

In [4]:
# Need to remove the records where id/first name/ last name is missing
#  Remove if name is not a string and a number

from pyspark.sql import functions as F

# print(customer_df.count())

customer_df_v2 = (customer_df
.filter(F.col('customer_id').cast('int').isNotNull())
.filter(F.col('first_name').cast('string').isNotNull())
.filter(F.col('last_name').cast('string').isNotNull())
.filter(~F.col('first_name').rlike('[0-9]'))
.filter(~F.col('last_name').rlike('[0-9]'))
)


customer_df_v2.show()

+-----------+----------+-------------+--------------------+-----------------+--------------+---------------------+
|customer_id|first_name|    last_name|             address|          pincode|  phone_number|customer_joining_date|
+-----------+----------+-------------+--------------------+-----------------+--------------+---------------------+
|     4986.0|   Balveer|    Choudhury|31/30, Devan Naga...|         376853.0|913000656779.0|           2021-04-20|
|     6642.0|    Kalpit|        Divan|15, Zacharia Naga...|         331701.0|911454391537.0|           2022-05-14|
|     7763.0|   Dominic|Radhakrishnan|834, Rajagopal Ro...|         351147.0|  1346045138.0|           2022-02-28|
|     2420.0|    Raghav|    Ramaswamy|H.No. 24, Memon P...|         668002.0|  6956574158.0|           2022-01-21|
|     8576.0|     Zinal| Venkataraman|832, Majumdar Roa...|             null|  3399052109.0|           2022-07-20|
|     1637.0|     Anita|       Barman|H.No. 88, Upadhya...|         485277.0|   

In [8]:
# Need to check if both pincode and address are missing drop the row
# if pincode is present, make the address as 'missing address'

print('df2 count: ', customer_df_v2.count())

customer_df_v3 = (customer_df_v2.filter(
    ~(F.col('address').isNull() & F.col('pincode').isNull())))

print('df3 count: ', customer_df_v3.count())

df2 count:  20
df3 count:  20


In [9]:
customer_df_v3.show()
customer_df_v3.count()

+-----------+----------+-------------+--------------------+-----------------+--------------+---------------------+
|customer_id|first_name|    last_name|             address|          pincode|  phone_number|customer_joining_date|
+-----------+----------+-------------+--------------------+-----------------+--------------+---------------------+
|     4986.0|   Balveer|    Choudhury|31/30, Devan Naga...|         376853.0|913000656779.0|           2021-04-20|
|     6642.0|    Kalpit|        Divan|15, Zacharia Naga...|         331701.0|911454391537.0|           2022-05-14|
|     7763.0|   Dominic|Radhakrishnan|834, Rajagopal Ro...|         351147.0|  1346045138.0|           2022-02-28|
|     2420.0|    Raghav|    Ramaswamy|H.No. 24, Memon P...|         668002.0|  6956574158.0|           2022-01-21|
|     8576.0|     Zinal| Venkataraman|832, Majumdar Roa...|             null|  3399052109.0|           2022-07-20|
|     1637.0|     Anita|       Barman|H.No. 88, Upadhya...|         485277.0|   

20

In [51]:
# # Extracting Pincode from the address

# # Split the 'address' column into a list (split by space or comma)
# customer_df_v3_split = (customer_df_v3
#                         .withColumn('address_split', F.split(F.col('address'), ',| |-'))
#                         .filter(F.col('address_split').getItem(F.size(F.col('address_split'))-1).cast('int').isNotNull())
#                        )
# # customer_df_v3_split.select(F.col('address_split'),customer_df_v3_split.address_split.getItem(F.size(F.col('address_split'))-1)).show(truncate=False)
# # Show the resulting DataFrame
# # customer_df_v3_split.select(F.col('address_split').getItem(0)).show()

# customer_df_v3_split.select('address', F.col('address_split').getItem(F.size(F.col('address_split'))-1).alias('pincode')).show(truncate=False)


+------------------------------------------+-----------------+
|address                                   |pincode          |
+------------------------------------------+-----------------+
|31/30, Devan Nagar, Anand 799736          |799736           |
|15, Zacharia Nagar, Buxar-919980          |919980           |
|834, Rajagopal Road, Amaravati 801199     |801199           |
|H.No. 24, Memon Path, Nagercoil 265985    |265985           |
|832, Majumdar Road, Ongole-286732         |286732           |
|H.No. 88, Upadhyay Chowk, Orai 493235     |493235           |
|H.No. 84, Gera, Bhilwara 569429           |569429           |
|13, Balan Street, Medininagar-357773      |357773           |
|H.No. 52, Nanda Circle, Ahmednagar 518831 |518831           |
|175, Bhargava Ganj, Dhule 885357          |885357           |
|H.No. 854, Gulati Chowk, Tadipatri-177232 |177232           |
|23/159, Parmar Zila, Sri Ganganagar-469379|469379           |
|76/97, Samra Road, Motihari-476980        |476980     

In [17]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# if address is there and pincode is missing, write a UDF for extracting pincode from the address
# it's the last group of numbers in address
# Address must contain alpha-numeric otherwise put 'sample address'

# Alphanumenric check, otherwise put sameple address if pincode is present

# Where address is numeric and pincode is null

customer_df_v4 = customer_df_v3.filter(
    ~(F.col('address').rlike('^[0-9]+(\\.[0-9]+)?$') & F.col('pincode').isNull())
)

# When pincode is valid and address is numeric

customer_df_v4 = customer_df_v4.withColumn(
    'address',
    F.when(
     F.col('pincode').cast('int').isNotNull() & F.col('address').rlike('^[0-9]+(\\.[0-9]+)?$'),
        'sampleaddress1'
    ).otherwise(F.col('address'))
)

# When Pincode is Null or not integer and Address is valid
# Not using UDF due to it's performance
# Spliting Address with , or space( ) or -
# Checking if the Pincode is present at the end

customer_df_v4 = (customer_df_v4
                        .withColumn('address_split', F.split(F.col('address'), ',| |-'))
                        .filter(F.col('address_split').getItem(F.size(F.col('address_split'))-1).cast('int').isNotNull())
                       )

# check if Pincode is null or non-integer then place it from the address
# or when pincode len is not 6 digits

customer_df_v4 = customer_df_v4.withColumn(
    'pincode',
    F.when(
        F.col('pincode').cast('int').isNull()  |  F.col('pincode').isNull() |
        (F.col('pincode').isNotNull() & (F.length(F.col('pincode')) != 6)),
        F.col('address_split').getItem(F.size(F.col('address_split'))-1)
    ).otherwise(F.col('pincode'))
)

customer_df_v4 = customer_df_v4.drop(customer_df_v4.address_split)

customer_df_v4.show(truncate=False)

+-----------+----------+-------------+------------------------------------------+-------+--------------+---------------------+
|customer_id|first_name|last_name    |address                                   |pincode|phone_number  |customer_joining_date|
+-----------+----------+-------------+------------------------------------------+-------+--------------+---------------------+
|4986.0     |Balveer   |Choudhury    |31/30, Devan Nagar, Anand 799736          |799736 |913000656779.0|2021-04-20           |
|6642.0     |Kalpit    |Divan        |15, Zacharia Nagar, Buxar-919980          |919980 |911454391537.0|2022-05-14           |
|7763.0     |Dominic   |Radhakrishnan|834, Rajagopal Road, Amaravati 801199     |801199 |1346045138.0  |2022-02-28           |
|2420.0     |Raghav    |Ramaswamy    |H.No. 24, Memon Path, Nagercoil 265985    |265985 |6956574158.0  |2022-01-21           |
|8576.0     |Zinal     |Venkataraman |832, Majumdar Road, Ongole-286732         |286732 |3399052109.0  |2022-07

## Create Phone numbers with total length 10

In [21]:
# Fix phone number
customer_df_v4 = customer_df_v4.withColumn(
    'phone_number',
    F.when(
        F.col('phone_number').cast('int').isNull() | 
        (F.col('phone_number').cast('int').isNotNull() & (F.length(F.col('phone_number'))!= 10)),
        '0000000000'
    ).otherwise(F.col('phone_number'))
)

# Fix date
customer_df_v4 = customer_df_v4.withColumn(
    'customer_joining_date',
    F.when(
        F.col('customer_joining_date').cast('date').isNull(),
        '2021-01-01'
    ).otherwise(F.col('customer_joining_date'))
)

customer_df_v4.show()

+-----------+----------+-------------+--------------------+-------+------------+---------------------+
|customer_id|first_name|    last_name|             address|pincode|phone_number|customer_joining_date|
+-----------+----------+-------------+--------------------+-------+------------+---------------------+
|     4986.0|   Balveer|    Choudhury|31/30, Devan Naga...| 799736|  0000000000|           2021-04-20|
|     6642.0|    Kalpit|        Divan|15, Zacharia Naga...| 919980|  0000000000|           2022-05-14|
|     7763.0|   Dominic|Radhakrishnan|834, Rajagopal Ro...| 801199|  0000000000|           2022-02-28|
|     2420.0|    Raghav|    Ramaswamy|H.No. 24, Memon P...| 265985|  0000000000|           2022-01-21|
|     8576.0|     Zinal| Venkataraman|832, Majumdar Roa...| 286732|  0000000000|           2022-07-20|
|     1637.0|     Anita|       Barman|H.No. 88, Upadhya...| 493235|  29562838.0|           2020-05-18|
|     2482.0|    Rushil|        Datta|H.No. 84, Gera, B...| 569429|  0000

In [57]:
customer_df_v4.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- pincode: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- customer_joining_date: string (nullable = true)
 |-- address_split: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [24]:
from pyspark.sql.types import IntegerType, DateType

# Using `select` to cast the columns
cleaned_customer_df = customer_df_v4.select(
    customer_df_v4.customer_id.cast(IntegerType()).alias('customer_id'),
    customer_df_v4.first_name.alias('first_name'),
    customer_df_v4.last_name.alias('last_name'),
    customer_df_v4.address.alias('address'),
    customer_df_v4.pincode.cast(IntegerType()).alias('pincode'),
    customer_df_v4.phone_number.cast(IntegerType()).alias('phone_number'),
    customer_df_v4.customer_joining_date.cast(DateType()).alias('customer_joining_date')
)

cleaned_customer_df.show()

+-----------+----------+-------------+--------------------+-------+------------+---------------------+
|customer_id|first_name|    last_name|             address|pincode|phone_number|customer_joining_date|
+-----------+----------+-------------+--------------------+-------+------------+---------------------+
|       4986|   Balveer|    Choudhury|31/30, Devan Naga...| 799736|           0|           2021-04-20|
|       6642|    Kalpit|        Divan|15, Zacharia Naga...| 919980|           0|           2022-05-14|
|       7763|   Dominic|Radhakrishnan|834, Rajagopal Ro...| 801199|           0|           2022-02-28|
|       2420|    Raghav|    Ramaswamy|H.No. 24, Memon P...| 265985|           0|           2022-01-21|
|       8576|     Zinal| Venkataraman|832, Majumdar Roa...| 286732|           0|           2022-07-20|
|       1637|     Anita|       Barman|H.No. 88, Upadhya...| 493235|    29562838|           2020-05-18|
|       2482|    Rushil|        Datta|H.No. 84, Gera, B...| 569429|      

In [25]:
cleaned_customer_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- pincode: integer (nullable = true)
 |-- phone_number: integer (nullable = true)
 |-- customer_joining_date: date (nullable = true)

