In [2]:
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .appName("IAS") \
    .master("spark://127.0.0.1:7077") \
    .config("spark.driver.memory", "4G") \
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.kryoserializer.buffer.max", "2000m") \
    .getOrCreate()

In [4]:
spark

In [42]:
data = spark.read.csv("IAS.csv", header=True, inferSchema=True, nullValue='')
data.show(5, truncate=False)

+------+--------------------------+------+----------+-------------------+---------------------+--------------+-----+------+------+-------------+---------------+---------------+----------------------------------------+----------------------------------------------------+----------------+---------------------+---------+
|ID    |Name                      |Gender|DOB       |Date of Appointment|Source of Recruitment|Allotment Year|Cadre|Qual 1|Qual 2|Spec 1       |Spec 2         |Domicile       |Current Post                            |Department                                          |Location        |With Effect From Date|Pay Level|
+------+--------------------------+------+----------+-------------------+---------------------+--------------+-----+------+------+-------------+---------------+---------------+----------------------------------------+----------------------------------------------------+----------------+---------------------+---------+
|007600|Shri Pradip Kumar Tripathi|Male 

In [43]:
data.limit(5).toPandas()

Unnamed: 0,ID,Name,Gender,DOB,Date of Appointment,Source of Recruitment,Allotment Year,Cadre,Qual 1,Qual 2,Spec 1,Spec 2,Domicile,Current Post,Department,Location,With Effect From Date,Pay Level
0,7600,Shri Pradip Kumar Tripathi,Male,1964-06-18,1987-08-24,RR,1987,UT,B.E.,M.Tech,Civil Engg,Structural Engg,Uttar Pradesh,Secretary (Coordination),Cabinet Secretariat,New Delhi,2022-02-05,Level 17
1,18400,Shri Naresh Kumar,Male,1963-11-18,1987-08-24,RR,1987,UT,B.E.,M.B.A.,Mech. Engg.,Public Admn,Delhi,Chief Secretary,Govt. of National capital Territory of Delhi (...,Delhi,2022-04-21,Level 17
2,18800,Shri Chetan Bhushan Sanghi,Male,1965-06-26,1988-08-25,RR,1988,UT,M.A.,B.Sc.,Public Policy,Hons,Andhra Pradesh,Financial Commissioner,Govt. of National capital Territory of Delhi (...,New Delhi,2021-01-04,Level 17
3,19002,Dr.(Ms.) Renu Sharma,Female,1964-10-19,1988-08-25,RR,1988,UT,M.Phil,Ph.D.,Political Sc.,Political Sc.,Delhi,Chief Secretary to Government of Mizoram,,Aizawl (Mizoram),2021-02-11,Level 17
4,8200,Shri Atal Dulloo,Male,1966-10-24,1989-08-21,RR,1989,UT,B.Tech,,Civil Engg,,Jammu & Kashmir,Chief Secretary,,Jammu & Kashmir,2023-01-12,Level 17


In [44]:
data.groupBy('Gender').count().orderBy('count', ascending=False).show()
data.groupBy('Source of Recruitment').count().orderBy('count', ascending=False).show()
data.groupBy('Cadre').count().orderBy('count', ascending=False).show()
data.groupBy('Pay Level').count().orderBy('count', ascending=False).show()

+------+-----+
|Gender|count|
+------+-----+
|  Male| 3769|
|Female| 1773|
+------+-----+

+---------------------+-----+
|Source of Recruitment|count|
+---------------------+-----+
|                   RR| 3987|
|                  SCS| 1419|
|              Non-SCS|  136|
+---------------------+-----+

+-----+-----+
|Cadre|count|
+-----+-----+
|   UP|  574|
|   UT|  412|
|   MP|  393|
|   MH|  346|
|   TN|  333|
|   WB|  303|
|   KN|  273|
|   RJ|  262|
|   GJ|  260|
|   BH|  244|
|   AM|  213|
|   AP|  210|
|   JH|  192|
|   PB|  191|
|   OR|  189|
|   HY|  174|
|   TG|  170|
|   CG|  169|
|   KL|  152|
|   HP|  113|
+-----+-----+
only showing top 20 rows

+---------+-----+
|Pay Level|count|
+---------+-----+
| Level 12| 1228|
| Level 11| 1177|
| Level 14|  855|
| Level 13|  851|
| Level 10|  660|
| Level 15|  416|
| Level 17|  353|
|    N.A. |    1|
| Level 18|    1|
+---------+-----+



In [45]:
from pyspark.sql.functions import trim
from pyspark.sql.functions import col

# Keep only rows where 'Pay Level' is not 'N.A.' and not null
data = data.filter((trim(col('Pay Level')) != 'N.A.') & (col('Pay Level').isNotNull()))
data = data.filter((trim(col('Pay Level')) != 'Level 18') & (col('Pay Level').isNotNull()))

In [46]:
data.groupBy('Pay Level').count().orderBy('count', ascending=False).show()


+---------+-----+
|Pay Level|count|
+---------+-----+
| Level 12| 1228|
| Level 11| 1177|
| Level 14|  855|
| Level 13|  851|
| Level 10|  660|
| Level 15|  416|
| Level 17|  353|
+---------+-----+



In [47]:
from pyspark.sql.functions import col, count, when
from pyspark.sql import DataFrame

def missing_percentage(df: DataFrame):
    total_rows = df.count()
    return df.select([
        (100 * count(when(col(c).isNull(), c)) / total_rows).alias(c)
        for c in df.columns
    ])

# Show percentage of missing values per column, rounded to 2 decimals
missing_percentage(data).toPandas().round(2).T


Unnamed: 0,0
ID,0.0
Name,0.0
Gender,0.0
DOB,0.0
Date of Appointment,0.0
Source of Recruitment,0.0
Allotment Year,0.0
Cadre,0.0
Qual 1,0.09
Qual 2,68.23


In [48]:
from pyspark.sql.functions import col, to_date, year, current_date, datediff, floor, when

data = data.withColumn("Date of Appointment", to_date(col("Date of Appointment"), "yyyy-MM-dd"))
data = data.withColumn("DOB", to_date(col("DOB"), "yyyy-MM-dd"))
data = data.withColumn("With Effect From Date", to_date(col("With Effect From Date"), "yyyy-MM-dd"))

data = data.withColumn("Allotment Year", year(col("Date of Appointment")))

# Age in years
data = data.withColumn("Age", floor(datediff(current_date(), col("DOB")) / 365))

# Service Tenure in years
data = data.withColumn("Service Tenure", floor(datediff(current_date(), col("Date of Appointment")) / 365))

# Current Tenure with logic for NaT and negative values
data = data.withColumn("Current Tenure",
    when(col("With Effect From Date").isNull(), None)
    .otherwise(
        when(datediff(current_date(), col("With Effect From Date")) < 0, None)
        .otherwise(floor(datediff(current_date(), col("With Effect From Date")) / 365))
    )
)

data.select("DOB", "Date of Appointment", "With Effect From Date", "Age", "Service Tenure", "Current Tenure").show(5, truncate=False)


+----------+-------------------+---------------------+---+--------------+--------------+
|DOB       |Date of Appointment|With Effect From Date|Age|Service Tenure|Current Tenure|
+----------+-------------------+---------------------+---+--------------+--------------+
|1964-06-18|1987-08-24         |2022-02-05           |61 |37            |3             |
|1963-11-18|1987-08-24         |2022-04-21           |61 |37            |3             |
|1965-06-26|1988-08-25         |2021-01-04           |60 |36            |4             |
|1964-10-19|1988-08-25         |2021-02-11           |60 |36            |4             |
|1966-10-24|1989-08-21         |2023-01-12           |58 |35            |2             |
+----------+-------------------+---------------------+---+--------------+--------------+
only showing top 5 rows



In [49]:
from pyspark.sql.functions import col, count

# Calculate mode (most frequent value) of 'Qual 1'
mode_value = data.groupBy("Qual 1") \
    .count() \
    .orderBy(col("count").desc()) \
    .first()[0]  # extract the top value

from pyspark.sql.functions import when

data = data.withColumn(
    "Qual 1",
    when(col("Qual 1").isNull(), mode_value).otherwise(col("Qual 1"))
)

mode_value = data.groupBy("Location") \
    .count() \
    .orderBy(col("count").desc()) \
    .first()[0]  # extract the top value

from pyspark.sql.functions import when

data = data.withColumn(
    "Location",
    when(col("Location").isNull(), mode_value).otherwise(col("Location"))
)

mode_value = data.groupBy("Domicile") \
    .count() \
    .orderBy(col("count").desc()) \
    .first()[0]  # extract the top value

from pyspark.sql.functions import when

data = data.withColumn(
    "Domicile",
    when(col("Domicile").isNull(), mode_value).otherwise(col("Domicile"))
)


In [50]:
data = data.withColumn(
    "Spec 1",
    when(col("Spec 1").isNull(), "Unknown").otherwise(col("Spec 1"))
)

data = data.withColumn(
    "Qual 2",
    when(col("Qual 2").isNull(), "Unknown").otherwise(col("Qual 2"))
)

data = data.withColumn(
    "Spec 2",
    when(col("Spec 2").isNull(), "Unknown").otherwise(col("Spec 2"))
)

In [51]:
from pyspark.sql.functions import udf, col, greatest
from pyspark.sql.types import IntegerType, StringType

def get_qual_type(x):
    if x is None:
        return 0
    x = x.strip()
    if x == 'Unknown':
        return 0
    elif x in ['M.B.B.S.', 'B.H.M.S.']:
        return 1
    elif x == 'Ph.D.':
        return 3
    elif 'M.' in x:
        return 2
    elif 'P' in x and 'G' in x:
        return 2
    else:
        return 1

qual_type_udf = udf(get_qual_type, IntegerType())

data = data.withColumn("qual1_type", qual_type_udf(col("Qual 1")))
data = data.withColumn("qual2_type", qual_type_udf(col("Qual 2")))

data = data.withColumn("qual_code", greatest(col("qual1_type"), col("qual2_type")))

from pyspark.sql.functions import when

data = data.withColumn(
    "Qual Type",
    when(col("qual_code") == 1, "Graduate")
    .when(col("qual_code") == 2, "Post Graduate")
    .when(col("qual_code") == 3, "Doctorate")
    .otherwise('Unknown')
)

data = data.drop("qual1_type", "qual2_type", "qual_code")


In [52]:
from pyspark.sql.functions import col, when, lower, trim, udf
from pyspark.sql.types import BooleanType, StringType

def is_secretary_udf_func(x):
    if x is not None:
        x = x.lower().strip()
        return ('secretary' in x) or ('adviser' in x) or ('advisor' in x) or ('electoral' in x)
    return False

is_secretary_udf = udf(is_secretary_udf_func, BooleanType())

data = data.withColumn("secretary_flag", is_secretary_udf(col("Current Post")))

data = data.withColumn(
    "Department",
    when(col("secretary_flag"), "Secretariat").otherwise(col("Department"))
)


In [53]:
from pyspark.sql.functions import regexp_replace

data = data.withColumn("Location", trim(col("Location")))
data = data.withColumn("Location", regexp_replace(col("Location"), "Govt\. of", ""))
data = data.withColumn("Location", regexp_replace(col("Location"), "Govt of", ""))

data = data.drop("secretary_flag")


In [54]:
from pyspark.sql.functions import col, when, lower, trim, lit, regexp_replace

data = data.withColumn("is_training", col("Current Post").contains("On Training"))

data = data.withColumn(
    "is_waiting",
    when(col("Current Post").isNotNull(), lower(col("Current Post")).contains("waiting")).otherwise(False)
)

data = data.withColumn(
    "lbsnaa_flag",
    when(col("Location").isNotNull(), col("Location").contains("LBSNAA ")).otherwise(False)
)
    

In [55]:
data = data.withColumn(
    "Department",
    when(col("is_training") | col("is_waiting"), "Unknown").otherwise(col("Department"))
)

data = data.withColumn(
    "Current Post",
    when(col("is_training") & col("lbsnaa_flag"), col("Current Post") + lit(" LBSNAA"))
    .otherwise(col("Current Post"))
)

data = data.withColumn(
    "Location",
    when(col("lbsnaa_flag"), regexp_replace(col("Location"), "LBSNAA ", ""))
    .otherwise(col("Location"))
)

data = data.withColumn(
    "Current Post",
    when(col("Current Post").isNull(), "Unknown").otherwise(col("Current Post"))
)

data = data.drop("is_training", "is_waiting", "lbsnaa_flag")


In [56]:
data = data.withColumn(
    "is_suspended_leave_deputation",
    when(
        col("Current Post").isNotNull(),
        lower(trim(col("Current Post"))).contains("suspension") |
        lower(trim(col("Current Post"))).contains("leave") |
        lower(trim(col("Current Post"))).contains("deputation")
    ).otherwise(False)
)

data = data.withColumn(
    "Department",
    when(col("is_suspended_leave_deputation"), "Unknown").otherwise(col("Department"))
)

data = data.withColumn(
    "Location",
    when(col("is_suspended_leave_deputation"), "Unknown").otherwise(col("Location"))
)

data = data.drop("is_suspended_leave_deputation")

In [57]:
from pyspark.sql.functions import when, col, lower, lit, trim, isnull, coalesce

data = data.withColumn("training", col("Current Post").contains("On Training"))

data = data.withColumn(
    "waiting",
    when(col("Current Post").isNotNull(), lower(col("Current Post")).contains("waiting")).otherwise(False)
)

data = data.withColumn(
    "suspended",
    when(col("Current Post").isNotNull(),
         lower(trim(col("Current Post"))).contains("suspension") |
         lower(trim(col("Current Post"))).contains("leave") |
         lower(trim(col("Current Post"))).contains("deputation")
    ).otherwise(False)
)

data = data.withColumn("null_post", col("Current Post").isNull())
data = data.withColumn("null_dept", col("Department").isNull())
data = data.withColumn("null_loc", col("Location").isNull())

# Combine status flags
data = data.withColumn("status1", col("training") | col("waiting") | col("suspended"))

# Set Department to NULL where training or waiting
data = data.withColumn(
    "Department",
    when(col("training") | col("waiting"), None).otherwise(col("Department"))
)

# Compute sec_tr_susp_leave_dept (null_dept & (training | waiting | suspended))
data = data.withColumn(
    "Department",
    when(col("null_dept") & col("status1"), "Unknown")
    .when(col("null_dept") & ~(col("training") | col("waiting") | col("suspended")), "Unknown")
    .otherwise(col("Department"))
)



In [58]:
# Set Location = 'Unknown' where (suspended | waiting | null_post)
data = data.withColumn(
    "Location",
    when(col("suspended") | col("waiting") | col("null_post"), "Unknown")
    .otherwise(col("Location"))
)

# For remaining null Location values, also set to Unknown
data = data.withColumn(
    "Location",
    when(col("null_loc") & ~(col("suspended") | col("waiting") | col("null_post")), "Unknown")
    .otherwise(col("Location"))
)



In [59]:
data = data.withColumn("status2", col("Current Post") == "Unknown")

# Default Job Status = Active
data = data.withColumn("Job Status", lit("Active"))

# Set to Inactive where (training | waiting | suspended | Current Post == N/A)
data = data.withColumn(
    "Job Status",
    when(col("status1") | col("status2"), "Inactive")
    .otherwise(col("Job Status"))
)

# Fill null 'With Effect From Date' with 'Unknown'
data = data.withColumn(
    "With Effect From Date",
    when(col("With Effect From Date").isNull(), "Unknown")
    .otherwise(col("With Effect From Date"))
)

# Set 'Current Tenure' = 'Unknown' where waiting or suspended
data = data.withColumn(
    "Current Tenure",
    when(col("waiting") | col("suspended"), "Unknown")
    .otherwise(col("Current Tenure"))
)

# If Current Tenure is null & With Effect From Date ≠ Unknown
data = data.withColumn(
    "Job Status",
    when(
        col("Current Tenure").isNull() & (col("With Effect From Date") != "Unknown"),
        "Inactive"
    ).otherwise(col("Job Status"))
)

# If Current Tenure is null and Current Post == Unknown
data = data.withColumn(
    "Job Status",
    when(
        col("Current Tenure").isNull() & (col("Current Post") == "Unknown"),
        "Inactive"
    ).otherwise(col("Job Status"))
)

# Final fill for remaining nulls in Current Tenure
data = data.withColumn(
    "Current Tenure",
    coalesce(col("Current Tenure"), lit("Unknown"))
)

data = data.withColumn(
    "Current Tenure",
    when(col("Current Tenure") == "Unknown", 0).otherwise(col("Current Tenure"))
)

data = data.withColumn(
    "Current Tenure",
    col("Current Tenure").cast("int")
)

In [61]:
data = data.drop('training','waiting','suspended','null_post','null_dept','null_loc','status1',"status2")

In [64]:
data.show(5)

+------+--------------------+------+----------+-------------------+---------------------+--------------+-----+------+-------+-------------+---------------+---------------+--------------------+--------------------+----------------+---------------------+---------+---+--------------+--------------+-------------+----------+
|    ID|                Name|Gender|       DOB|Date of Appointment|Source of Recruitment|Allotment Year|Cadre|Qual 1| Qual 2|       Spec 1|         Spec 2|       Domicile|        Current Post|          Department|        Location|With Effect From Date|Pay Level|Age|Service Tenure|Current Tenure|    Qual Type|Job Status|
+------+--------------------+------+----------+-------------------+---------------------+--------------+-----+------+-------+-------------+---------------+---------------+--------------------+--------------------+----------------+---------------------+---------+---+--------------+--------------+-------------+----------+
|007600|Shri Pradip Kumar...|  Mal

In [66]:

mode_value = data.groupBy("Department") \
    .count() \
    .orderBy(col("count").desc()) \
    .first()[0]  # extract the top value

data = data.withColumn(
    "Department",
    when(col("Department").isNull(), mode_value).otherwise(col("Department"))
)

from pyspark.sql import functions as F

data = data.withColumn(
    "With Effect From Date",
    F.when(F.col("With Effect From Date") == "Unknown", F.col("Date of Appointment"))  
     .otherwise(F.col("With Effect From Date"))                         
)

missing_percentage(data).toPandas().round(2).T


Unnamed: 0,0
ID,0.0
Name,0.0
Gender,0.0
DOB,0.0
Date of Appointment,0.0
Source of Recruitment,0.0
Allotment Year,0.0
Cadre,0.0
Qual 1,0.0
Qual 2,0.0


In [74]:
data.toPandas().to_csv("IAS_Cleaned.csv", index=False)