In [0]:
from pyspark.sql.functions import *

raw_data = [
 (1,"Alice","alice@gmail.com","IT",90000,"2019-01-10","Bangalore-560001"),
 (2,"Bob",None,"IT",None,"2020-05-20","Chennai-600001"),
 (3,"","charlie@bademail","HR",45000,None,"Hyderabad"),
 (4,"David","david@yahoo.com",None,70000,"2018-07-15","Pune-411001"),
 (5,None,"eva@gmail.com","FIN",120000,"2016-03-01",None),
 (6,"Frank","frank@company.com","IT",-1,"2021-12-10","Delhi-110001"),
 (7,"Grace","grace@gmail","HR",50000,"2022-01-01","Mumbai-400001"),
 (8,"Helen","helen@gmail.com","HR",None,None,"-560002"),
 (9,"Ian","ian@yahoo.com","IT",85000,"2020-10-10","Bangalore-560003"),
 (10,"Jane",None,"FIN",95000,"2017-06-06","Chennai")
]

cols = ["emp_id","emp_name","email","department","salary","joining_date","address"]

df = (spark.createDataFrame(raw_data, cols).withColumn("joining_date", to_date("joining_date")))
display(df)

In [0]:
from pyspark.sql.functions import *

email_profile_df = (
    df.select(
        "emp_id",
        "email",
        split(col("email"), "@")[0].alias("email_username"),
        # regexp_extract("email", "^(.+)@", 1).alias("email_username"),

        regexp_extract("email", "@(.+)", 1).alias("email_domain"),

        when(col("email").isNull(), "MISSING")
        .when(col("email").rlike("^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$"), "VALID")
        .otherwise("INVALID")
        .alias("email_status")
    )
)
display(email_profile_df)

In [0]:
from pyspark.sql.functions import *


valid_domains = ["com","org","edu","gov","net","co","io","in"]
pattern = r'\.(' + "|".join(valid_domains) + r')$'

email_profile_df = (
    df.select(
        col("emp_id"),

        # Clean name (trim → initcap → replace null with blank)
        coalesce(initcap(trim(col("emp_name"))), lit(" ")).alias("customer_name"),

        # Email validation
        col("email"),
        when(col("email").rlike(pattern), "valid").otherwise("invalid").alias("valid_mail")
    )
)

display(email_profile_df)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *

w_rank = Window.partitionBy("department").orderBy(desc_nulls_last("salary"))

dept_rank_df = (
    df.select(
        "emp_id",
        coalesce(col("department"), lit(" ")).alias("department"),
        coalesce("salary", lit(0)).alias("salary"),
        dense_rank().over(w_rank).alias("dept_salary_rank")
    )
)

display(dept_rank_df)


In [0]:
top_earners_df = (
    df
    .withColumn("salary_clean", when(col("salary") > 0, col("salary")).otherwise(None))
    .withColumn("rank", dense_rank().over(Window.partitionBy("department").orderBy(desc("salary_clean"))))
    .filter(col("rank") <= 2)
    .select("emp_id",coalesce("department", lit("")).alias("department"),"salary_clean","rank")
)

display(top_earners_df)

In [0]:
dept_summary_df = (
    df.groupBy(coalesce("department", lit("UNKNOWN")).alias("department"))
      .agg(
          count("*").alias("headcount"),
          sum(coalesce("salary", lit(0))).alias("total_salary"),
          avg(coalesce("salary", lit(0))).alias("avg_salary")
      )
)

display(dept_summary_df)


In [0]:
dept_avg_df = (df.groupBy("department").agg(avg(coalesce("salary", lit(0))).alias("dept_avg")))

salary_compare_df = (
    df.join(broadcast(dept_avg_df), "department", "left")
      .select(
          "emp_id",
          "department",
          "salary",
          "dept_avg",
          (coalesce("salary", lit(0)) > coalesce("dept_avg", lit(0)))
          .alias("above_avg_flag")
      )
)
display(salary_compare_df)

In [0]:
exp_rank_df = (
    df.withColumn(
        "joining_date_clean",
        coalesce("joining_date", current_date())
    )
    .withColumn(
        "experience_rank",
        rank().over(
            Window.partitionBy("department").orderBy("joining_date_clean")
        )
    )
    .select("emp_id","department","experience_rank")
)

display(exp_rank_df)


In [0]:
tenure_df = (
    df.select(
        "emp_id",
        "department",

        when(col("joining_date").isNull(), "UNKNOWN")
        .when(datediff(current_date(), "joining_date") > 3650, "SENIOR")
        .when(datediff(current_date(), "joining_date") > 1825, "MID")
        .otherwise("JUNIOR")
        .alias("tenure_band")
    )
)

display(tenure_df)


In [0]:
e1 = df.alias("e1")
e2 = df.alias("e2")

peer_df = (
    e1.join(
        e2,
        (col("e1.department") == col("e2.department")) &
        (col("e1.emp_id") != col("e2.emp_id")),
        "left"
    )
    .select(
        col("e1.emp_id"),
        col("e1.salary"),
        col("e2.emp_id").alias("peer_id"),
        col("e2.salary").alias("peer_salary")
    )
)

display(peer_df)


In [0]:
eligible_df = (
    df.join(
        df.filter(coalesce("salary", lit(0)) > 80000).select("emp_id"),
        "emp_id",
        "left_semi"
    )
)

display(eligible_df)


In [0]:
exception_df = (
    df.join(
        df.filter(coalesce("salary", lit(0)) > 80000).select("emp_id"),
        "emp_id",
        "left_anti"
    )
)

display(exception_df)


In [0]:
skill_df = (
    df.select(
        "emp_id",
        array(
            lit("spark"),
            lit("sql"),
            coalesce(lower("department"), lit("unknown"))
        ).alias("skills")
    )
)

display(skill_df)


In [0]:
skill_explode_df = (
    skill_df.select("emp_id", explode_outer("skills").alias("skill"))
)

display(skill_explode_df)


In [0]:
employee_json_df = (
    df.select(
        "emp_id",
        to_json(
            struct(
                coalesce("emp_name", lit("UNKNOWN")).alias("emp_name"),
                "email",
                coalesce("department", lit("UNKNOWN")).alias("department"),
                coalesce("salary", lit(0)).alias("salary")
            )
        ).alias("employee_json")
    )
)

display(employee_json_df)



In [0]:
json_extract_df = (
    employee_json_df.select(
        "emp_id",
        get_json_object("employee_json", "$.department").alias("department"),
        get_json_object("employee_json", "$.salary").cast("int").alias("salary")
    )
)

display(json_extract_df)


In [0]:
schema = "emp_name STRING, email STRING, department STRING, salary INT"

json_struct_df = (
    employee_json_df.select(
        "emp_id",
        from_json("employee_json", schema).alias("emp_struct")
    )
)

display(json_struct_df)

In [0]:
json_flat_df = (
    json_struct_df.select(
        "emp_id",
        "emp_struct.*"
    )
)

display(json_flat_df)


In [0]:
search_df = (
    json_flat_df.select(
        "emp_id",
        lower("emp_name").alias("emp_name_search"),
        lower("department").alias("department_search")
    )
)

display(search_df)


In [0]:
final_df = (
    json_flat_df
    .withColumn("salary_band",
        when(col("salary") >= 100000, "HIGH")
        .when(col("salary") >= 60000, "MEDIUM")
        .otherwise("LOW")
    )
)

display(final_df)
