<h3>You are given a DataFrame containing customer information collected from multiple branches of a national bank in India. Due to manual entries and system integration, the dataset contains exact duplicates as well as partial duplicates (where some fields like branch_name or email may vary).

1. Remove completely duplicate rows (i.e., rows where all columns match exactly).
2. Then remove partially duplicate records where customer is considered same if their customer_name, dob, and pan_number match, keeping the most recent entry based on the updated_at column.
3. Finally, return the cleaned DataFrame sorted by state, city, and customer_name</h3>

In [3]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('customer-data-cleaning').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/22 18:13:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
data = [
 ("Rajesh Kumar", "1990-05-14", "ABCDE1234F", "Delhi", "New Delhi", "Connaught Place", "rajesh.k@gmail.com", "2024-01-01"),
 ("Rajesh Kumar", "1990-05-14", "ABCDE1234F", "Delhi", "New Delhi", "Karol Bagh", "rajesh.kumar@outlook.com", "2024-02-01"),
 ("Rajesh Kumar", "1990-05-14", "ABCDE1234F", "Delhi", "New Delhi", "Karol Bagh", "rajesh.kumar@outlook.com", "2024-02-01"), # Exact Duplicate
 ("Priya Sharma", "1992-11-23", "PQRSX5678K", "Maharashtra", "Mumbai", "Andheri", "priya.sharma@gmail.com", "2023-12-15"),
 ("Amit Jain", "1985-03-02", "LMNOP3456Z", "Rajasthan", "Jaipur", "Malviya Nagar", "amit.jain@gmail.com", "2024-03-10"),
 ("Amit Jain", "1985-03-02", "LMNOP3456Z", "Rajasthan", "Jaipur", "Malviya Nagar", "ajain@yahoo.com", "2024-01-01"),
 ("Sneha Reddy", "1993-07-19", "GHJKL7890Y", "Telangana", "Hyderabad", "Banjara Hills", "sneha.reddy@gmail.com", "2024-04-05"),
 ("Sneha Reddy", "1993-07-19", "GHJKL7890Y", "Telangana", "Hyderabad", "Banjara Hills", "sneha.reddy@gmail.com", "2024-04-05"), # Exact Duplicate
]

columns = ["customer_name", "dob", "pan_number", "state", "city", "branch_name", "email", "updated_at"]

In [16]:
customer_df=spark.createDataFrame(data,columns)
customer_df.show()

+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
|customer_name|       dob|pan_number|      state|     city|    branch_name|               email|updated_at|
+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
| Rajesh Kumar|1990-05-14|ABCDE1234F|      Delhi|New Delhi|Connaught Place|  rajesh.k@gmail.com|2024-01-01|
| Rajesh Kumar|1990-05-14|ABCDE1234F|      Delhi|New Delhi|     Karol Bagh|rajesh.kumar@outl...|2024-02-01|
| Rajesh Kumar|1990-05-14|ABCDE1234F|      Delhi|New Delhi|     Karol Bagh|rajesh.kumar@outl...|2024-02-01|
| Priya Sharma|1992-11-23|PQRSX5678K|Maharashtra|   Mumbai|        Andheri|priya.sharma@gmai...|2023-12-15|
|    Amit Jain|1985-03-02|LMNOP3456Z|  Rajasthan|   Jaipur|  Malviya Nagar| amit.jain@gmail.com|2024-03-10|
|    Amit Jain|1985-03-02|LMNOP3456Z|  Rajasthan|   Jaipur|  Malviya Nagar|     ajain@yahoo.com|2024-01-01|
|  Sneha Reddy|1993-07-19|GH

In [12]:
customer_df_distinct=customer_df.dropDuplicates(subset=None)
customer_df_distinct.show()

[Stage 3:>                                                                              (0 + 4) / 4]

+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
|customer_name|       dob|pan_number|      state|     city|    branch_name|               email|updated_at|
+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
|    Amit Jain|1985-03-02|LMNOP3456Z|  Rajasthan|   Jaipur|  Malviya Nagar| amit.jain@gmail.com|2024-03-10|
| Priya Sharma|1992-11-23|PQRSX5678K|Maharashtra|   Mumbai|        Andheri|priya.sharma@gmai...|2023-12-15|
| Rajesh Kumar|1990-05-14|ABCDE1234F|      Delhi|New Delhi|Connaught Place|  rajesh.k@gmail.com|2024-01-01|
|  Sneha Reddy|1993-07-19|GHJKL7890Y|  Telangana|Hyderabad|  Banjara Hills|sneha.reddy@gmail...|2024-04-05|
+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+



                                                                                                    

In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,col

window_spec= Window.partitionBy(col('customer_name'),col('dob'),col('pan_number')).orderBy(col('updated_at').desc())

customer_df_cleaned=customer_df_distinct.withColumn('rn',row_number().over(window_spec)).where("rn=1 and customer_name is not null").drop(col('rn'))

customer_df_cleaned.show()

+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
|customer_name|       dob|pan_number|      state|     city|    branch_name|               email|updated_at|
+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+
|    Amit Jain|1985-03-02|LMNOP3456Z|  Rajasthan|   Jaipur|  Malviya Nagar| amit.jain@gmail.com|2024-03-10|
| Priya Sharma|1992-11-23|PQRSX5678K|Maharashtra|   Mumbai|        Andheri|priya.sharma@gmai...|2023-12-15|
| Rajesh Kumar|1990-05-14|ABCDE1234F|      Delhi|New Delhi|Connaught Place|  rajesh.k@gmail.com|2024-01-01|
|  Sneha Reddy|1993-07-19|GHJKL7890Y|  Telangana|Hyderabad|  Banjara Hills|sneha.reddy@gmail...|2024-04-05|
+-------------+----------+----------+-----------+---------+---------------+--------------------+----------+

