In [78]:
from pyspark.sql import SparkSession
from pyspark.sql.types import LongType, FloatType, IntegerType, DateType
import pyspark.sql.functions as F
import re
import yaml

#define path to your yaml file
yaml_file_path= 'config.yaml'

with open(yaml_file_path, 'r') as yaml_file:
    config = yaml.safe_load(yaml_file)

spark = SparkSession \
    .builder \
    .appName("final_project") \
    .config("spark.jars", config['spark']['path']) \
    .getOrCreate()

In [79]:
def extract():
    try:
        # CSV path
        csv = config['csv']['path']
        # Read raw_data
        df = spark.read.csv(csv, header=True, inferSchema=False)
        return df
    except Exception as e:
        raise Exception(f"An error occurred during data extraction: {str(e)}")
        spark.stop()

In [80]:
df=extract()
df.show(3)

+---------------+-------------+--------------+------------+--------+---------------+--------+---------+---------+------------+----------------+----------+--------------+----------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         Job Id|   experience|qualifications|Salary Range|location|        country|latitude|longitude|Work Type|Company Size|Job Posting Date|preference|Contact Person|         contact|           Job Title|                Role|         Job Portal|     Job Description|            benefits|              skills|    responsibilities|             company|     Company Profile|
+---------------+-------------+--------------+------------+--------+---------------+--------+---------+---------+------------+----------------+----------+--------------+----------------+--------------------+--------------------+-------------------+--

23/11/01 17:55:20 INFO InMemoryFileIndex: It took 2 ms to list leaf files for 1 paths.
23/11/01 17:55:20 INFO InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths.
23/11/01 17:55:20 INFO FileSourceStrategy: Pushed Filters: 
23/11/01 17:55:20 INFO FileSourceStrategy: Post-Scan Filters: (length(trim(value#6314, None)) > 0)
23/11/01 17:55:20 INFO MemoryStore: Block broadcast_139 stored as values in memory (estimated size 200.2 KiB, free 433.9 MiB)
23/11/01 17:55:20 INFO MemoryStore: Block broadcast_139_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 433.9 MiB)
23/11/01 17:55:20 INFO BlockManagerInfo: Added broadcast_139_piece0 in memory on 172.16.5.112:38541 (size: 34.6 KiB, free: 434.3 MiB)
23/11/01 17:55:20 INFO SparkContext: Created broadcast 139 from csv at <unknown>:0
23/11/01 17:55:20 INFO FileSourceScanExec: Planning scan with bin packing, max size: 55434439 bytes, open cost is considered as scanning 4194304 bytes.
23/11/01 17:55:20 INFO SparkContext: Star

In [81]:
def clean():
    try:
        df = extract()
        # Changing -ve company size to positive
        df = df.withColumn("Company Size", F.when(F.col("Company Size") < 0, -F.col("Company Size")).otherwise(F.col("Company Size")))
        
        # List of columns to drop
        dropped = ["Contact Person", "Contact", "Benefits", "Company Profile"]
        # Drop columns
        df = df.drop(*dropped)
        
        # Changing to standard data types
        df = df.withColumn("Job Id", df["Job Id"].cast(LongType()))\
               .withColumn("latitude", df["latitude"].cast(FloatType()))\
               .withColumn("longitude", df["longitude"].cast(FloatType()))\
               .withColumn("Company Size", df["Company Size"].cast(IntegerType()))\
               .withColumn("Job Posting Date", df["Job Posting Date"].cast(DateType()))
        
        return df
    except Exception as e:
        raise Exception(f"An error occurred during data cleaning: {str(e)}")
        spark.stop()

In [82]:
#udf to calculate avg
def calculate_average(range_str):
    # Use regular expression to extract numbers
    numbers = re.findall(r'\d+', range_str)
    if len(numbers) == 2:
        lower = int(numbers[0])
        upper = int(numbers[1])
        avg = (lower + upper) / 2
        return avg
    else:
        return None


In [83]:
def transform():
    try:
        df=clean()
        calculate_average_udf = F.udf(calculate_average)
        # Add a new column with the calculated average
        new_df = df.withColumn("Average", calculate_average_udf(df["Salary Range"]))
        new_df = new_df.withColumnRenamed("Average","Average Salary")
        new_df = new_df.withColumn("Average Salary",  
                                  new_df["Average Salary"] 
                                  .cast('int')) 

        
        # Changing gender preference from "both" to "Male or Female"
        new_df = new_df.withColumn("Preference", F.when(F.col("Preference") == "Both", "Male or Female").otherwise(F.col("Preference")))
        new_df.show()

        #Dividing companies into tiers according to the company size

        quartiles = new_df.stat.approxQuantile("company size", [0.25, 0.75], 0.0)
        q1, q3 = quartiles
        new_df = new_df.withColumn("CompanyTier",
        F.when(F.col("company size") <= q1, "Tier-1 (Low)")
        .when((F.col("company size") > q1) & (F.col("company size") <= q3), "Tier-2 (Medium)")
        .when(F.col("company size") > q3, "Tier-3 (High)")
        .otherwise("Uncategorized"))


        #Dividing jobs into three categories according to their salary.
        
        quartiles = new_df.stat.approxQuantile("Average Salary", [0.25, 0.75], 0.0)
        q1, q3 = quartiles
        new_df = new_df.withColumn("SalaryLevel",
        F.when(F.col("Average Salary") <= q1, "Low Pay")
        .when((F.col("Average Salary") > q1) & (F.col("Average Salary") <= q3), "Average Pay")
        .when(F.col("Average Salary") > q3, "High Pay")
        .otherwise("Uncategorized"))
        

        # Differentiate qualifications according to their initials

        new_df = new_df.withColumn("QualificationCategory", F.when(F.col("qualifications").startswith("M"), "Masters")
        .when(F.col("qualifications").startswith("B"), "Bachelors")
        .when(F.col("qualifications").startswith("P"), "PhD")
        .otherwise("Uncategorized"))


        return new_df

    except Exception as e:
        raise Exception(f"An error occurred during data transformation: {str(e)}")
        spark.stop()

In [84]:
dff=transform()
dff.printSchema()

23/11/01 17:55:46 INFO InMemoryFileIndex: It took 0 ms to list leaf files for 1 paths.
23/11/01 17:55:46 INFO InMemoryFileIndex: It took 0 ms to list leaf files for 1 paths.
23/11/01 17:55:46 INFO FileSourceStrategy: Pushed Filters: 
23/11/01 17:55:46 INFO FileSourceStrategy: Post-Scan Filters: (length(trim(value#6494, None)) > 0)
23/11/01 17:55:46 INFO MemoryStore: Block broadcast_144 stored as values in memory (estimated size 200.2 KiB, free 433.2 MiB)
23/11/01 17:55:46 INFO MemoryStore: Block broadcast_144_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 433.2 MiB)
23/11/01 17:55:46 INFO BlockManagerInfo: Added broadcast_144_piece0 in memory on 172.16.5.112:38541 (size: 34.6 KiB, free: 434.2 MiB)
23/11/01 17:55:46 INFO SparkContext: Created broadcast 144 from csv at <unknown>:0
23/11/01 17:55:46 INFO FileSourceScanExec: Planning scan with bin packing, max size: 55434439 bytes, open cost is considered as scanning 4194304 bytes.
23/11/01 17:55:46 INFO SparkContext: Star

+----------------+-------------+--------------+------------+--------------------+---------------+--------+---------+---------+------------+----------------+--------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|          Job Id|   experience|qualifications|Salary Range|            location|        country|latitude|longitude|Work Type|Company Size|Job Posting Date|    Preference|           Job Title|                Role|         Job Portal|     Job Description|              skills|    responsibilities|             company|Average Salary|
+----------------+-------------+--------------+------------+--------------------+---------------+--------+---------+---------+------------+----------------+--------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|

23/11/01 17:55:46 INFO MemoryStore: Block broadcast_149_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 432.4 MiB)
23/11/01 17:55:46 INFO BlockManagerInfo: Added broadcast_149_piece0 in memory on 172.16.5.112:38541 (size: 34.6 KiB, free: 434.1 MiB)
23/11/01 17:55:46 INFO SparkContext: Created broadcast 149 from approxQuantile at <unknown>:0
23/11/01 17:55:46 INFO FileSourceScanExec: Planning scan with bin packing, max size: 55434439 bytes, open cost is considered as scanning 4194304 bytes.
23/11/01 17:55:46 INFO SparkContext: Starting job: approxQuantile at <unknown>:0
23/11/01 17:55:46 INFO DAGScheduler: Registering RDD 484 (approxQuantile at <unknown>:0) as input to shuffle 15
23/11/01 17:55:46 INFO DAGScheduler: Got job 56 (approxQuantile at <unknown>:0) with 4 output partitions
23/11/01 17:55:46 INFO DAGScheduler: Final stage: ResultStage 72 (approxQuantile at <unknown>:0)
23/11/01 17:55:46 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 71)
23/11/01

root
 |-- Job Id: long (nullable = true)
 |-- experience: string (nullable = true)
 |-- qualifications: string (nullable = true)
 |-- Salary Range: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- Work Type: string (nullable = true)
 |-- Company Size: integer (nullable = true)
 |-- Job Posting Date: date (nullable = true)
 |-- Preference: string (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Job Portal: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- skills: string (nullable = true)
 |-- responsibilities: string (nullable = true)
 |-- company: string (nullable = true)
 |-- Average Salary: integer (nullable = true)
 |-- CompanyTier: string (nullable = false)
 |-- SalaryLevel: string (nullable = false)
 |-- QualificationCategory: string (nullable = false)



23/11/01 17:55:51 INFO DAGScheduler: Job 57 finished: approxQuantile at <unknown>:0, took 2.677026 s
                                                                                

In [86]:
def load():
    try:
        dff=transform()
        ##Load the clean data in postgres
        dff.write.format('jdbc').options(url=config['postgres']["url"],driver = config['postgres']["driver"], dbtable = config['postgres']["dbtable"], user=config['postgres']["user"],password=config['postgres']["password"]).mode('overwrite').save()
        return dff
    except Exception as e:
        raise Exception(f"An error occurred during loading the data: {str(e)}")
        spark.stop()    


In [87]:
load()

23/11/01 17:56:54 INFO InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths.
23/11/01 17:56:54 INFO InMemoryFileIndex: It took 1 ms to list leaf files for 1 paths.
23/11/01 17:56:54 INFO FileSourceStrategy: Pushed Filters: 
23/11/01 17:56:54 INFO FileSourceStrategy: Post-Scan Filters: (length(trim(value#7008, None)) > 0)
23/11/01 17:56:54 INFO MemoryStore: Block broadcast_155 stored as values in memory (estimated size 200.2 KiB, free 433.9 MiB)
23/11/01 17:56:54 INFO MemoryStore: Block broadcast_155_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 433.9 MiB)
23/11/01 17:56:54 INFO BlockManagerInfo: Added broadcast_155_piece0 in memory on 172.16.5.112:38541 (size: 34.6 KiB, free: 434.3 MiB)
23/11/01 17:56:54 INFO SparkContext: Created broadcast 155 from csv at <unknown>:0
23/11/01 17:56:54 INFO FileSourceScanExec: Planning scan with bin packing, max size: 55434439 bytes, open cost is considered as scanning 4194304 bytes.
23/11/01 17:56:54 INFO SparkContext: Star

+----------------+-------------+--------------+------------+--------------------+---------------+--------+---------+---------+------------+----------------+--------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|          Job Id|   experience|qualifications|Salary Range|            location|        country|latitude|longitude|Work Type|Company Size|Job Posting Date|    Preference|           Job Title|                Role|         Job Portal|     Job Description|              skills|    responsibilities|             company|Average Salary|
+----------------+-------------+--------------+------------+--------------------+---------------+--------+---------+---------+------------+----------------+--------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|

23/11/01 17:56:55 INFO BlockManagerInfo: Removed broadcast_158_piece0 on 172.16.5.112:38541 in memory (size: 34.6 KiB, free: 434.3 MiB)
23/11/01 17:56:56 INFO Executor: Finished task 1.0 in stage 77.0 (TID 384). 1919 bytes result sent to driver
23/11/01 17:56:56 INFO TaskSetManager: Finished task 1.0 in stage 77.0 (TID 384) in 938 ms on 172.16.5.112 (executor driver) (1/16)
23/11/01 17:56:56 INFO Executor: Finished task 2.0 in stage 77.0 (TID 385). 1876 bytes result sent to driver
23/11/01 17:56:56 INFO TaskSetManager: Finished task 2.0 in stage 77.0 (TID 385) in 957 ms on 172.16.5.112 (executor driver) (2/16)
23/11/01 17:56:56 INFO Executor: Finished task 8.0 in stage 77.0 (TID 391). 1876 bytes result sent to driver
23/11/01 17:56:56 INFO Executor: Finished task 4.0 in stage 77.0 (TID 387). 1876 bytes result sent to driver
23/11/01 17:56:56 INFO TaskSetManager: Finished task 8.0 in stage 77.0 (TID 391) in 1065 ms on 172.16.5.112 (executor driver) (3/16)
23/11/01 17:56:56 INFO TaskSetM

DataFrame[Job Id: bigint, experience: string, qualifications: string, Salary Range: string, location: string, country: string, latitude: float, longitude: float, Work Type: string, Company Size: int, Job Posting Date: date, Preference: string, Job Title: string, Role: string, Job Portal: string, Job Description: string, skills: string, responsibilities: string, company: string, Average Salary: int, CompanyTier: string, SalaryLevel: string, QualificationCategory: string]

23/11/01 17:59:31 INFO BlockManagerInfo: Removed broadcast_167_piece0 on 172.16.5.112:38541 in memory (size: 26.0 KiB, free: 434.4 MiB)
23/11/01 17:59:31 INFO BlockManagerInfo: Removed broadcast_166_piece0 on 172.16.5.112:38541 in memory (size: 34.6 KiB, free: 434.4 MiB)
