In [1]:
import sys
import os

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
from src.spark_session import local_spark_session
spark = local_spark_session()

Deleted existing warehouse directory: ./temp/warehouse/


In [4]:
import random

# Define sample departments
departments = ["HR", "IT", "Finance", "Marketing"]

# Generate sample employee data as a list of dictionaries
row_count = 100
employee_data = [
    (
         i + 1,  # Unique employee ID
        random.choice(departments),  # Random department
         random.randint(30000, 120000)  # Random salary
    )
    for i in range(row_count)
]

# Print first 5 records
for record in employee_data[:5]:
    print(record)


(1, 'HR', 77625)
(2, 'Marketing', 57035)
(3, 'Marketing', 108216)
(4, 'Marketing', 83747)
(5, 'Finance', 92952)


# Compare the salary of employees with the average salary within their department 

In [5]:
from pyspark.sql import Window
from pyspark.sql.functions import avg, col, when
from pyspark.sql.types import StructType,StructField,IntegerType,StringType
# create struct schema
expected_schema= (StructType().add(StructField("emp_id",IntegerType()))
                  .add(StructField('department',StringType()))
                  .add(StructField('salary',IntegerType())))
employee_df=spark.createDataFrame(employee_data,expected_schema)
# employee_df.show()

avg_emp_salary = employee_df.withColumn(
    "avg_salary", avg(col("salary")).over(Window.partitionBy(col("department")))
)
comparision_df = avg_emp_salary.withColumn(
    "comparision",
    when(col("salary") > col("avg_salary"), "Higher")
    .when(col("salary").eqNullSafe(col("avg_salary")), "equal")
    .otherwise("lower"),
)
comparision_df.show()

+------+----------+------+-----------------+-----------+
|emp_id|department|salary|       avg_salary|comparision|
+------+----------+------+-----------------+-----------+
|     5|   Finance| 92952|74120.44444444444|     Higher|
|     6|   Finance| 76872|74120.44444444444|     Higher|
|     9|   Finance| 82601|74120.44444444444|     Higher|
|    11|   Finance| 37446|74120.44444444444|      lower|
|    16|   Finance| 65255|74120.44444444444|      lower|
|    24|   Finance| 69857|74120.44444444444|      lower|
|    25|   Finance|112849|74120.44444444444|     Higher|
|    27|   Finance| 77618|74120.44444444444|     Higher|
|    30|   Finance| 70417|74120.44444444444|      lower|
|    34|   Finance|109041|74120.44444444444|     Higher|
|    42|   Finance| 52419|74120.44444444444|      lower|
|    45|   Finance| 94804|74120.44444444444|     Higher|
|    47|   Finance| 56671|74120.44444444444|      lower|
|    52|   Finance| 99107|74120.44444444444|     Higher|
|    72|   Finance| 74527|74120

# nth highest salary for each department

In [None]:


from pyspark.sql.functions import desc, asc, dense_rank, lit
from pyspark.sql.dataframe import DataFrame

# dept wide window
dept_window = Window.partitionBy(col("department")).orderBy([desc(col("salary"))])


def nth_highest_salary(spark, input_df: DataFrame, rank: int) -> DataFrame:
    df = input_df.withColumn("rank", dense_rank().over(dept_window))
    filtered_df = df.filter(col("rank") == lit(rank)).select(
        "emp_id", "department", "salary"
    )
    return filtered_df


df = nth_highest_salary(spark, employee_df, 1)
df.show()
df = nth_highest_salary(spark, employee_df, 2)
df.show()

+------+----------+------+
|emp_id|department|salary|
+------+----------+------+
|    96|   Finance|116959|
|    56|        HR|117795|
|    67|        IT|111359|
|    85| Marketing|119723|
+------+----------+------+

+------+----------+------+
|emp_id|department|salary|
+------+----------+------+
|    25|   Finance|112849|
|    50|        HR|117185|
|    55|        IT|108593|
|    71| Marketing|119598|
+------+----------+------+



In [12]:
%%markdown
# Write a Pyspark code, to filter the salary between 20000 and 30000.


# Write a Pyspark code, to filter the salary between 20000 and 30000.


In [14]:
employee_df.filter(col('salary').between(lit(70000),lit(80000))).show()

+------+----------+------+
|emp_id|department|salary|
+------+----------+------+
|     1|        HR| 77625|
|     6|   Finance| 76872|
|     8|        IT| 79448|
|    27|   Finance| 77618|
|    30|   Finance| 70417|
|    32|        HR| 72176|
|    33|        IT| 70707|
|    36|        IT| 76538|
|    69| Marketing| 72629|
|    72|   Finance| 74527|
|    79|   Finance| 71154|
|    88|        IT| 71422|
|    91|   Finance| 77029|
|    95| Marketing| 75785|
|    98|   Finance| 72506|
+------+----------+------+



In [1]:
%%markdown 
# count number of null values

# count number of null values


In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [
    (1, "Alice", "HR", 50000),
    (2, "Bob", "IT", None),  # NULL salary
    (3, None, "Finance", 70000),  # NULL name
    (4, "Charlie", None, 60000),  # NULL department
    (5, "David", "IT", 65000),
    (6, "Emma", "HR", None),  # NULL salary
    (7, None, None, None),  # NULL name, department, salary
]

schema = StructType(
    [
        StructField("emp_id", IntegerType(), nullable=True),
        StructField("emp_name", StringType(), nullable=True),
        StructField("department", StringType(), nullable=True),
        StructField("salary", IntegerType(), nullable=True),
    ]
)

spark = local_spark_session()
df = spark.createDataFrame(data, schema)


def col_null_cnt(spark, input_df: DataFrame) -> DataFrame:
    df = input_df.select(
        [
            sum(when(col(cl).isNull(), lit(1)).otherwise(lit(0))).alias(
                f"null_cnt_{cl}"
            )
            for cl in input_df.columns
        ]
    )
    return df


col_null_cnt(spark, df).show()


+---------------+-----------------+-------------------+---------------+
|null_cnt_emp_id|null_cnt_emp_name|null_cnt_department|null_cnt_salary|
+---------------+-----------------+-------------------+---------------+
|              0|                2|                  2|              3|
+---------------+-----------------+-------------------+---------------+

+------+--------+----------+------+
|emp_id|emp_name|department|salary|
+------+--------+----------+------+
|     0|       2|         2|     3|
+------+--------+----------+------+



In [42]:
df.groupBy().agg(
    *[sum(col(cl).isNull().cast("int")).alias(f"cnt_{cl}") for cl in df.columns]
).show()

+----------+------------+--------------+----------+
|cnt_emp_id|cnt_emp_name|cnt_department|cnt_salary|
+----------+------------+--------------+----------+
|         0|           2|             2|         3|
+----------+------------+--------------+----------+



In [2]:
spark.stop()