# Class 3 Live Job Aggregator Project

## BRONZE LAYER _ Ingesting Data to Databricks

In [0]:
import requests,json
from pyspark.sql import SparkSession

#defining the Spark Session
spark = SparkSession.builder.appName("JobETL_Bronze").getOrCreate()

#defining the API Key
API_KEY = "389429d5e4ce6553514c445f10f6ade53b4e34031a4c1c88f802ab97d22455be"
roles = ["Data Engineer", "Python Developer", "ETL Developer", "Spark Engineer", "Data Analyst"]
location = "India"
all_jobs = []

#Running a Loop over all the Job Roles
for role in roles:
    params={
        "engine":"google_jobs",
        "q":role,
        "location":location,
        "api_key":API_KEY
    }
    res = requests.get("https://serpapi.com/search.json", params=params)
    print("✅Reading 🔴LIVE Data from Google Jobs API")
    jobs = res.json().get("jobs_results", [])
                        
    for job in jobs:
        job["search_role"] = role
    all_jobs.extend(jobs)

bronze_df = spark.read.json(spark.sparkContext.parallelize([json.dumps(job) for job in all_jobs]))
bronze_df.write.format("delta").option("mergeSchema", "true").mode("append").save("/mnt/lakehouse/bronze/jobs_raw")
print("✅Data Written to Bronze Layer")

In [0]:
# display(bronze_df)

## SILVER LAYER ---> Cleaning and Structuring

In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Silver").getOrCreate()

#reading Bronze Layer
print("✅Reading from Bronze Layer -------> Silver Layer")
df=spark.read.format("delta").load("/mnt/lakehouse/bronze/jobs_raw")

#cleaning and Structuring
silver_df=df.selectExpr(
    "title",
    "company_name",
    "location",
    "description",
    "job_id",  
    "detected_extensions.posted_at as posted_at",
    "search_role"
).dropna(subset=["title", "company_name", "location"])


In [0]:
# display(silver_df)

In [0]:
silver_df=silver_df.withColumn("company_name",trim(upper(col("company_name"))))
silver_df=silver_df.dropna(subset=["posted_at"])
silver_df.write.format("delta").option("mergeSchema", "true").mode("append").save("/mnt/lakehouse/silver/jobs")
print("✅ Data Written to Silver Layer!")
# display(silver_df)

## Golden LAYER ---> Generate KPI Tables

In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Gold").getOrCreate()

print("✅Reading from Silver Layer -------> Golden Layer")

df=spark.read.format("delta").load("/mnt/lakehouse/silver/jobs")

#writing the golden layer to a file 
df.write.format("delta").option("mergeSchema", "true").mode("append").save("/mnt/lakehouse/gold/jobs")
#KPI 1 - Top Companies 
top_companies = df.groupBy("company_name").count().orderBy(col("count").desc())
# display(top_companies)

#KPI - Jobs by City 
df.groupBy('Location').agg(count('*').alias('job_count')).orderBy(col('job_count').desc())
# display(top_companies)


In [0]:
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("JobETL_Gold").getOrCreate()

df=spark.read.format("delta").load("/mnt/lakehouse/silver/jobs")
df.write.mode("append").saveAsTable("JobAggregatorTable")
print("✅Data Successfully Written to Table ---> JobAggregatorTable")

In [0]:
print("😍Congratulations!!!!! Full ETL Job Aggregator Project is Completed and LIVE!!!!")

In [0]:
%sql 
describe history JobAggregatorTable

In [0]:
%sql
select * from JobAggregatorTable

# Data Analytics Part -: Visualising Business Scenrios

In [0]:
import matplotlib.pyplot as plt

In [0]:
spark = SparkSession.builder.appName("VisualsFromGold").getOrCreate()

In [0]:
# Load gold layer: top companies
df_companies = spark.read.format("delta").load("/mnt/lakehouse/gold/jobs")
df_companies=df_companies.dropna(subset=["posted_at"])
display(df_companies)

In [0]:
# Convert to Pandas for plotting
pdf = df_companies.orderBy("c", ascending=True).limit(10).toPandas()

In [0]:
display(pdf)

In [0]:
#creating a plot
plt.figure(figsize=(10,5))
plt.barh(pdf['company_name'][::-1], pdf['title'][::-1], color='skyblue')
plt.xlabel('Number of Jobs')
plt.ylabel('Top Companies Hiring')
plt.grid(True,axis='x',linestyle='--',alpha=0.5)
plt.tight_layout()
plt.show()



## Job Demand by City Plot

In [0]:
df_cities = spark.read.format("delta").load("/mnt/lakehouse/gold/jobs")
df_cities=df_cities.dropna(subset=["posted_at"])
pdf_city = df_cities.orderBy("posted_at", ascending=False).toPandas()

# Plot
plt.figure(figsize=(10, 6))
plt.barh(pdf_city['location'][::-1], pdf_city['title'][::-1], color='lightgreen')
plt.xlabel("Job Postings")
plt.title("Job Demand by City")
plt.grid(True, axis='x', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()