In [0]:
%pip install openpyxl

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
import pandas as pd


### Data Cleaning and Preprocessing:
1. Identify and handle missing data: PAN numbers may have missing values. These missing values need to be handled appropriately, either by removing rows or imputing values (depending on the context).
2. Check for duplicates: Ensure there are no duplicate PAN numbers. If duplicates exist, remove them.
3. Handle leading/trailing spaces: PAN numbers may have extra spaces before or after the actual number. Remove any such spaces.
4. Correct letter case: Ensure that the PAN numbers are in uppercase letters (if any lowercase letters are present).

In [0]:
pan_raw_df=spark.createDataFrame(pd.read_excel('PAN Number Validation Dataset.xlsx'))

In [0]:
display(pan_raw_df)

Pan_Numbers
VGLOD3180G
PHOXD7232L
MGEPH6532A
JJCHK4574O
XTQIJ2330L
HTJYM3835H
YQTAP6661X
hvofe5635y
hyuij7902r
idsmt3429e


In [0]:
pan_raw_df.printSchema()

root
 |-- Pan_Numbers: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
pan_raw_df=pan_raw_df.withColumn('Pan_Numbers',upper(trim('Pan_Numbers')))
display(pan_raw_df)

Pan_Numbers
VGLOD3180G
PHOXD7232L
MGEPH6532A
JJCHK4574O
XTQIJ2330L
HTJYM3835H
YQTAP6661X
HVOFE5635Y
HYUIJ7902R
IDSMT3429E


In [0]:
pan_df1=pan_raw_df.filter(col('Pan_Numbers').isNotNull())
display(pan_df1)

Pan_Numbers
VGLOD3180G
PHOXD7232L
MGEPH6532A
JJCHK4574O
XTQIJ2330L
HTJYM3835H
YQTAP6661X
HVOFE5635Y
HYUIJ7902R
IDSMT3429E


In [0]:
pan_df2=pan_df1.dropDuplicates(['Pan_Numbers'])
display(pan_df2)

Pan_Numbers
JQEYE9502I
PSOVG0368R
BCFXZ7028Y
QHVER2630M
UWARF2956P
RXMZU7510F
OMRUG9712D
TBDMP2648Z
NAFQJ3441M
WCVZB9633P


### Validation


In [0]:
def has_adj_characters(pan):
    return any(pan[i]==pan[i+1] for i in range(len(pan)-1))

In [0]:
def form_sequence(pan):
    return all((ord(pan[i+1])-ord(pan[i]))==1 for i in range(len(pan)-1))
    

In [0]:
print('AHGVE1276F'[5:9])

1276


In [0]:
import re
def validate_pan(pan):
    if re.match(r'^[A-Z]{5}[0-9]{4}[A-Z]$',pan)==None:
        return False
    
    if has_adj_characters(pan):
        return False
    
    if form_sequence(pan[:5]):
        return False
    if form_sequence(pan[5:9]):
        return False
    
    return True


In [0]:
print(validate_pan('AHGVE1276F'))

True


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

In [0]:
validate_pan_udf=udf(validate_pan,BooleanType())

In [0]:
from pyspark.sql.functions import col
pan_df3=pan_df2.withColumn('Status',validate_pan_udf(col('Pan_Numbers')))
display(pan_df3)

Pan_Numbers,Status
JQEYE9502I,True
PSOVG0368R,True
BCFXZ7028Y,True
QHVER2630M,True
UWARF2956P,True
RXMZU7510F,True
OMRUG9712D,True
TBDMP2648Z,True
NAFQJ3441M,False
WCVZB9633P,False


In [0]:
pan_df3=pan_df3.withColumn('Status',when(col('Status')=='true','Valid PAN').otherwise('Invalid PAN'))

In [0]:
display(pan_df3)

Pan_Numbers,Status
JQEYE9502I,Valid PAN
PSOVG0368R,Valid PAN
BCFXZ7028Y,Valid PAN
QHVER2630M,Valid PAN
UWARF2956P,Valid PAN
RXMZU7510F,Valid PAN
OMRUG9712D,Valid PAN
TBDMP2648Z,Valid PAN
NAFQJ3441M,Invalid PAN
WCVZB9633P,Invalid PAN


In [0]:
display(pan_df3.groupBy('Status').count())

Status,count
Invalid PAN,5840
Valid PAN,3186


In [0]:

pan_df3.write.mode("overwrite") \
    .saveAsTable("PAN_Output_Validation_Result_Python")

In [0]:
display(dbutils.fs.ls('dbfs:/Workspace/PAN Card Validation Project'))

path,name,size,modificationTime
dbfs:/Workspace/PAN Card Validation Project/PAN Number Validation - Problem Statement.pdf,PAN Number Validation - Problem Statement.pdf,52994,1762592714398
dbfs:/Workspace/PAN Card Validation Project/PAN Number Validation Dataset.xlsx,PAN Number Validation Dataset.xlsx,163440,1762592714925
dbfs:/Workspace/PAN Card Validation Project/PAN Validation.ipynb,PAN Validation.ipynb,12692,1762663119971


### Create a summary report that provides the following:
1. Total records processed
2. Total valid PANs
3. Total invalid PANs
4. Total missing or incomplete PANs (if applicable)

In [0]:
Total_Records_Processed=pan_df1.count()
Total_Valid_Pans,Total_Invalid_Pans=pan_df3.filter(col('Status')=='Valid PAN').count(),pan_df3.filter(col('Status')=='Invalid PAN').count()


In [0]:
Total_Missing_Pans=pan_raw_df.count()-Total_Records_Processed

In [0]:
from pyspark.sql import Row
summary_report=spark.createDataFrame([
    Row(Metrics='Total_Records_Processed',Values=pan_df1.count()),
    Row(Metrics='Total_Valid_PANs',Values=pan_df3.filter(col('Status')=='Valid PAN').count()),
    Row(Metrics='Total_Invalids_PANs',Values=pan_df3.filter(col('Status')=='Invalid PAN').count()),
    Row(Metrics='Total_Missing_PANs',Values=pan_raw_df.count()-pan_df1.count())
])
display(summary_report)

Metrics,Values
Total_Records_Processed,9035
Total_Valid_PANs,3186
Total_Invalids_PANs,5840
Total_Missing_PANs,965
