In [53]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .config("spark.ui.enabled", "true") \
    .getOrCreate()

# Display Spark UI Link
print("Spark UI link:", spark.sparkContext.uiWebUrl)


Spark UI link: http://601293a625a9:4040


In [54]:
import os
files = os.listdir("/home/jovyan/data")
print(files)


['Departments.txt', 'Employees.txt']


In [None]:
from pyspark.sql.types import *

# Define the exact schema for the employees file as there is no header
emp_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("salary", DoubleType(), True),
    StructField("depid", IntegerType(), True)
])

# Define the exact schema for the departments file as there is no header
dept_schema = StructType([
    StructField("depid", IntegerType(), True),
    StructField("depname", StringType(), True)
])

# Read the employees.txt file
#  header, "false" → no header row in the file
# schema=emp_schema → enforce the structure we defined above
# .csv() works with .txt files as long as they are comma-separated  
employees = spark.read.option("header", "false") \
                       .schema(emp_schema) \
                       .csv("/home/jovyan/data/employees.txt")

departments = spark.read.option("header", "false") \
                       .schema(dept_schema) \
                       .csv("/home/jovyan/data/departments.txt")

# Cache both DataFrames in memory for faster repeated operations
employees.cache()
departments.cache()
#create temporary views to write SQL.
employees.createOrReplaceTempView("employees")
departments.createOrReplaceTempView("departments")

employees.show(5, truncate=False)
departments.show(5, truncate=False)

+---+---------+--------+--------+-----+
|id |firstname|lastname|salary  |depid|
+---+---------+--------+--------+-----+
|1  |Bilal    |Al-Hakim|38424.0 |104  |
|2  |Ghani    |Tahir   |132534.0|110  |
|3  |Ahmed    |Saeed   |49967.0 |105  |
|4  |Majid    |Malik   |127049.0|109  |
|5  |Aqeel    |Camil   |64438.0 |102  |
+---+---------+--------+--------+-----+
only showing top 5 rows

+-----+----------------------+
|depid|depname               |
+-----+----------------------+
|101  |Human Resources       |
|102  |Information Technology|
|103  |Finance               |
|104  |Marketing             |
|105  |Operations            |
+-----+----------------------+
only showing top 5 rows



In [57]:
#inner join
spark.sql("""
    SELECT e.id, e.firstname, e.lastname, e.salary, d.depname
    FROM employees e
    INNER JOIN departments d ON e.depid = d.depid
    ORDER BY e.id
""").show()

+---+---------+----------+--------+--------------------+
| id|firstname|  lastname|  salary|             depname|
+---+---------+----------+--------+--------------------+
|  1|    Bilal|  Al-Hakim| 38424.0|           Marketing|
|  2|    Ghani|     Tahir|132534.0|      Administration|
|  3|    Ahmed|     Saeed| 49967.0|          Operations|
|  4|    Majid|     Malik|127049.0|               Legal|
|  5|    Aqeel|     Camil| 64438.0|Information Techn...|
|  6|   Khalid|     Tamer|120511.0|          Operations|
|  7|     Omar|     Wajih| 53527.0|Research & Develo...|
|  8|   Khalid|      Reza|124978.0|          Operations|
|  9|    Tariq|     Majid|118738.0|          Operations|
| 10|    Malik|     Samir| 37182.0|             Finance|
| 11|     Omid|     Majid| 81851.0|Information Techn...|
| 12|    Ezzat|     Usman| 52104.0|               Legal|
| 13|     Sara| Al-Rashid|125118.0|Research & Develo...|
| 14|     Hana|     Latif|132968.0|             Finance|
| 15|    Vahid|    Peyman|14880

In [58]:
#Group By – Number of employees and average salary in each department
spark.sql("""
    SELECT 
        d.depname,
        COUNT(e.id) AS emp_count,
        ROUND(AVG(e.salary), 2) AS avg_salary
    FROM employees e
    JOIN departments d ON e.depid = d.depid
    GROUP BY d.depname
    ORDER BY avg_salary DESC
""").show()

+--------------------+---------+----------+
|             depname|emp_count|avg_salary|
+--------------------+---------+----------+
|    Customer Service|       85|  94439.66|
|             Finance|       90|  92167.39|
|Research & Develo...|      107|  91773.88|
|Information Techn...|      107|   90409.1|
|           Marketing|      106|  90178.72|
|          Operations|      100|  89310.71|
|               Legal|      108|  88799.69|
|      Administration|       97|  88525.68|
|     Human Resources|       95|  87789.64|
|               Sales|      106|  86817.71|
+--------------------+---------+----------+



In [33]:
#Salary ranking within each department (from highest to lowest)
spark.sql("""
    SELECT 
        firstname,
        lastname,
        salary,
        depname,
        RANK() OVER (PARTITION BY depname ORDER BY salary DESC) AS salary_rank
    FROM employees e
    JOIN departments d ON e.depid = d.depid
""").show(20)

+---------+---------+--------+--------------+-----------+
|firstname| lastname|  salary|       depname|salary_rank|
+---------+---------+--------+--------------+-----------+
|     Noor|    Fahad|149191.0|Administration|          1|
|     Omar|    Pasha|149145.0|Administration|          2|
|    Vahid|   Peyman|148801.0|Administration|          3|
|   Hassan|    Salim|146382.0|Administration|          4|
|     Imad|    Nasir|145987.0|Administration|          5|
|    Jamal|  Al-Qawi|144090.0|Administration|          6|
|    Zaher|    Kamil|139406.0|Administration|          7|
|     Adel|    Tahir|138479.0|Administration|          8|
|    Camil|    Qasim|138451.0|Administration|          9|
|  Ibrahim|    Ahmed|137346.0|Administration|         10|
|     Omar|  Al-Aziz|135217.0|Administration|         11|
|   Qassem|  Al-Alim|135161.0|Administration|         12|
|     Emir|    Basil|133027.0|Administration|         13|
|    Ghani|    Tahir|132534.0|Administration|         14|
|    Bilal|   

In [60]:
#The top 3 salaries in each department only
spark.sql("""
    SELECT *
    FROM (
        SELECT 
            firstname,
            lastname,
            salary,
            depname,
            ROW_NUMBER() OVER (PARTITION BY depname ORDER BY salary DESC) AS rn
        FROM employees e
        JOIN departments d ON e.depid = d.depid
    ) ranked
    WHERE rn <= 3
    ORDER BY depname, salary DESC
""").show(30)

+---------+----------+--------+--------------------+---+
|firstname|  lastname|  salary|             depname| rn|
+---------+----------+--------+--------------------+---+
|     Noor|     Fahad|149191.0|      Administration|  1|
|     Omar|     Pasha|149145.0|      Administration|  2|
|    Vahid|    Peyman|148801.0|      Administration|  3|
|    Jamal|     Yamin|149534.0|    Customer Service|  1|
|    Camil|      Omar|148283.0|    Customer Service|  2|
|    Salim|       Ali|145887.0|    Customer Service|  3|
|  Youssef| Al-Farooq|149578.0|             Finance|  1|
|     Hana|Al-Khaleef|147175.0|             Finance|  2|
|    Karim|  Al-Latif|145439.0|             Finance|  3|
|    Tamer|     Bilal|149548.0|     Human Resources|  1|
| Muhammad|     Saadi|149393.0|     Human Resources|  2|
|    Hakim| Al-Rashid|148807.0|     Human Resources|  3|
|    Saeed|      Omar|149948.0|Information Techn...|  1|
|   Jaafar| Al-Siddiq|149183.0|Information Techn...|  2|
|    Aqeel|   Mansour|149180.0|

In [52]:
spark.stop()    