In [0]:
# Sanity check 1: is spark session available?
try:
    print("spark object:", spark)
    print("spark.version:", spark.version)
    # tiny Spark job to see core functionality
    spark.range(5).toDF("n").show()
except Exception as e:
    print("ERROR running spark test:", type(e).__name__, str(e))


spark object: <pyspark.sql.connect.session.SparkSession object at 0x7efc9b3519a0>
spark.version: 4.0.0
+---+
|  n|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:

%pip install pandas openpyxl pyarrow



Collecting openpyxl
  Downloading openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Downloading et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Downloading openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Downloading et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# CELL A - read Excel into pandas and convert to Spark DataFrame
excel_dbfs_path = "/Workspace/Users/rakshit.g@syrencloud.com/employees.xlsx"  
import pandas as pd
pdf = pd.read_excel(excel_dbfs_path, sheet_name=0)   
print("pandas dataframe shape:", pdf.shape)
display(pdf.head(10))

# convert to Spark DataFrame (keeps types simpler)
sdf = spark.createDataFrame(pdf)
print("spark schema:")
sdf.printSchema()
display(sdf.limit(10))


pandas dataframe shape: (7, 6)


Unnamed: 0,employee_id,name,department,salary,joining_year,bonus_percent
0,101,Alice,HR,50000,2018,5
1,102,Bob,Finance,60000,2019,7
2,103,Charlie,IT,75000,2017,10
3,104,David,Finance,62000,2020,6
4,105,Eva,IT,80000,2021,12
5,106,Frank,HR,52000,2018,5
6,107,Grace,IT,90000,2019,15


spark schema:
root
 |-- employee_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- joining_year: long (nullable = true)
 |-- bonus_percent: long (nullable = true)



DataFrame[employee_id: bigint, name: string, department: string, salary: bigint, joining_year: bigint, bonus_percent: bigint]

In [0]:
# CELL B 
from pyspark.sql import functions as F

# normalize column names 
def normalize_column_names(sdf):
    for c in sdf.columns:
        sdf = sdf.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))
    return sdf

sdf = normalize_column_names(sdf)

# Cast numeric columns to appropriate types (salary & bonus_percent -> double)
if 'employee_id' in sdf.columns:
    sdf = sdf.withColumn('employee_id', F.col('employee_id').cast('int'))
if 'salary' in sdf.columns:
    # remove commas/currency if any then cast to double
    sdf = sdf.withColumn('salary', F.regexp_replace(F.col('salary').cast('string'), '[,$]', '').cast('double'))
if 'joining_year' in sdf.columns:
    sdf = sdf.withColumn('joining_year', F.col('joining_year').cast('int'))
if 'bonus_percent' in sdf.columns:
    sdf = sdf.withColumn('bonus_percent', F.col('bonus_percent').cast('double'))

# Trim string columns
for name, dtype in sdf.dtypes:
    if dtype == 'string':
        sdf = sdf.withColumn(name, F.trim(F.col(name)))

print("After cleaning — schema:")
sdf.printSchema()
print("\nSample rows (first 10):")
display(sdf.limit(10))


After cleaning — schema:
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- joining_year: integer (nullable = true)
 |-- bonus_percent: double (nullable = true)


Sample rows (first 10):


DataFrame[employee_id: int, name: string, department: string, salary: double, joining_year: int, bonus_percent: double]

In [0]:
# CELL C - select only name and salary
sel_name_salary = sdf.select("name", "salary")
print("Selected columns and row count:", sel_name_salary.count())
display(sel_name_salary)


Selected columns and row count: 7


DataFrame[name: string, salary: double]

In [0]:
# CELL D - filter employees with salary > 60000
high_paid = sdf.filter(F.col("salary") > 60000)
print("Filtered count:", high_paid.count())
display(high_paid.orderBy(F.col("salary").desc()))


Filtered count: 4


DataFrame[employee_id: int, name: string, department: string, salary: double, joining_year: int, bonus_percent: double]

In [0]:
# CELL E - add bonus_amount and show results (rounded to 2 decimals)
from pyspark.sql import functions as F

# add bonus_amount column
sdf = sdf.withColumn("bonus_amount", F.col("salary") * F.col("bonus_percent") / 100.0)

# display selected columns with bonus rounded
display(
    sdf.select(
        "employee_id",
        "name",
        "department",
        "salary",
        "bonus_percent",
        F.round(F.col("bonus_amount"), 2).alias("bonus_amount")
    ).orderBy(F.col("salary").desc())
)


DataFrame[employee_id: int, name: string, department: string, salary: double, bonus_percent: double, bonus_amount: double]

In [0]:
# CELL F - rename joining_year -> year_joined and drop bonus_percent
sdf = sdf.withColumnRenamed("joining_year", "year_joined")

# drop bonus_percent as requested by the assignment
if "bonus_percent" in sdf.columns:
    sdf = sdf.drop("bonus_percent")

print("Updated schema:")
sdf.printSchema()

print("\nSample rows (first 8):")
display(sdf.limit(8))


Updated schema:
root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- year_joined: integer (nullable = true)
 |-- bonus_amount: double (nullable = true)


Sample rows (first 8):


DataFrame[employee_id: int, name: string, department: string, salary: double, year_joined: int, bonus_amount: double]

In [0]:
# CELL G - Average salary per department
avg_salary_df = sdf.groupBy("department").avg("salary")

print("Average Salary per Department:")
avg_salary_df.show()


Average Salary per Department:
+----------+-----------------+
|department|      avg(salary)|
+----------+-----------------+
|        HR|          51000.0|
|   Finance|          61000.0|
|        IT|81666.66666666667|
+----------+-----------------+



In [0]:
# CELL H: count per department
from pyspark.sql import functions as F

count_dept = sdf.groupBy("department").agg(F.count("*").alias("num_employees"))
print("Count of employees per department:")
count_dept.show()


Count of employees per department:
+----------+-------------+
|department|num_employees|
+----------+-------------+
|        HR|            2|
|   Finance|            2|
|        IT|            3|
+----------+-------------+



In [0]:
# CELL I - max salary, top by salary (desc), and sort by year_joined (asc)
from pyspark.sql import functions as F

# 1) Maximum salary
max_row = sdf.agg(F.max("salary").alias("max_salary")).collect()[0]
print("Maximum salary:", max_row["max_salary"])

# 2) Top employees by salary (descending)
print("\nTop employees by salary (desc):")
display(
    sdf.orderBy(F.col("salary").desc())
       .select("employee_id", "name", "department", "salary", "year_joined", "bonus_amount")
       .limit(10)
)

# 3) Employees sorted by year_joined (ascending)
if "year_joined" in sdf.columns:
    print("\nEmployees sorted by year_joined (asc):")
    display(
        sdf.orderBy(F.col("year_joined").asc())
           .select("employee_id", "name", "department", "salary", "year_joined")
           .limit(20)
    )
else:
    print("\nNo year_joined column found.")


Maximum salary: 90000.0

Top employees by salary (desc):


DataFrame[employee_id: int, name: string, department: string, salary: double, year_joined: int, bonus_amount: double]


Employees sorted by year_joined (asc):


DataFrame[employee_id: int, name: string, department: string, salary: double, year_joined: int]

In [0]:
# JOIN CELL J - create dept details and left-join with employees
dept_data = [("HR","Human Resources","New York"),
             ("Finance","Finance Dept","London"),
             ("IT","Information Technology","San Francisco")]
dept_cols = ["department","dept_name","location"]

dept_df = spark.createDataFrame(dept_data, schema=dept_cols)
print("Dept table:")
display(dept_df)

joined = sdf.join(dept_df, on="department", how="left")
print("Joined sample (first 20 rows):")
display(joined.orderBy("employee_id").limit(20))


Dept table:


DataFrame[department: string, dept_name: string, location: string]

Joined sample (first 20 rows):


DataFrame[department: string, employee_id: int, name: string, salary: double, year_joined: int, bonus_amount: double, dept_name: string, location: string]

In [0]:
# Cell K
from pyspark.sql import functions as F


print("Joined schema:")
joined.printSchema()

print("\nFirst 20 joined rows:")
display(joined.orderBy("employee_id").limit(20))

print("\nFirst 5 rows (explicit):")
display(joined.limit(5))

# total count
total = joined.count()
print(f"\nTotal rows in joined DF: {total}")


rows = joined.collect()
print("\nCollected sample (first 5 shown):")
for r in rows[:5]:
    print(r)

print("\nTake first 3 (take):")
for r in joined.take(3):
    print(r)




Joined schema:
root
 |-- department: string (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- year_joined: integer (nullable = true)
 |-- bonus_amount: double (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- location: string (nullable = true)


First 20 joined rows:


DataFrame[department: string, employee_id: int, name: string, salary: double, year_joined: int, bonus_amount: double, dept_name: string, location: string]


First 5 rows (explicit):


DataFrame[department: string, employee_id: int, name: string, salary: double, year_joined: int, bonus_amount: double, dept_name: string, location: string]


Total rows in joined DF: 7

Collected sample (first 5 shown):
Row(department='HR', employee_id=101, name='Alice', salary=50000.0, year_joined=2018, bonus_amount=2500.0, dept_name='Human Resources', location='New York')
Row(department='Finance', employee_id=102, name='Bob', salary=60000.0, year_joined=2019, bonus_amount=4200.0, dept_name='Finance Dept', location='London')
Row(department='IT', employee_id=103, name='Charlie', salary=75000.0, year_joined=2017, bonus_amount=7500.0, dept_name='Information Technology', location='San Francisco')
Row(department='Finance', employee_id=104, name='David', salary=62000.0, year_joined=2020, bonus_amount=3720.0, dept_name='Finance Dept', location='London')
Row(department='IT', employee_id=105, name='Eva', salary=80000.0, year_joined=2021, bonus_amount=9600.0, dept_name='Information Technology', location='San Francisco')

Take first 3 (take):
Row(department='HR', employee_id=101, name='Alice', salary=50000.0, year_joined=2018, bonus_amount=2500.0, d

In [0]:
# CELL L Download links for small outputs (run in Databricks notebook)
import base64, io
from IPython.display import HTML, display

def make_download_link_from_pdf(df, filename):
    csv = df.to_csv(index=False)
    b64 = base64.b64encode(csv.encode()).decode()
    href = f'<a href="data:text/csv;base64,{b64}" download="{filename}">Download {filename}</a>'
    display(HTML(href))

# joined and cleaned DataFrames should exist in the notebook as Spark DataFrames:
# convert to pandas 
pdf_joined = joined.toPandas()
pdf_clean = sdf.toPandas()

make_download_link_from_pdf(pdf_joined, "employees_with_dept.csv")
make_download_link_from_pdf(pdf_clean, "cleaned_employee.csv")

# optional: aggregates
if 'avg_salary_df' in globals():
    make_download_link_from_pdf(avg_salary_df.toPandas(), "avg_salary_by_dept.csv")
if 'count_dept' in globals():
    make_download_link_from_pdf(count_dept.toPandas(), "count_by_dept.csv")


[0;31m---------------------------------------------------------------------------[0m
[0;31mRuntimeError[0m                              Traceback (most recent call last)
File [0;32m<command-5372983276421612>, line 21[0m
[1;32m     19[0m [38;5;66;03m# 1) check core DF 'sdf' exists[39;00m
[1;32m     20[0m [38;5;28;01mif[39;00m [38;5;124m'[39m[38;5;124msdf[39m[38;5;124m'[39m [38;5;129;01mnot[39;00m [38;5;129;01min[39;00m [38;5;28mglobals[39m():
[0;32m---> 21[0m     [38;5;28;01mraise[39;00m [38;5;167;01mRuntimeError[39;00m([38;5;124m"[39m[38;5;124mSpark DataFrame [39m[38;5;124m'[39m[38;5;124msdf[39m[38;5;124m'[39m[38;5;124m not found in this session. Please re-run the cell that loads employees.xlsx into [39m[38;5;124m'[39m[38;5;124msdf[39m[38;5;124m'[39m[38;5;124m first.[39m[38;5;124m"[39m)
[1;32m     23[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124m'[39m[38;5;124msdf[39m[38;5;124m'[39m[38;5;124m found. Row count (approx):