In [None]:
## import all libraries necessary for pipeline

import os
import json
import requests
import pandas as pd
import numpy as np
import pyodbc
from dotenv import load_dotenv
from loguru import logger
import sys
from sqlalchemy import create_engine

In [None]:
## reset or remove any handlers from logger instances
logger.remove()

In [None]:
## create a directory and file for each logging level

log_levels = ["DEBUG","INFO", "WARNING", "ERROR"]
for level in log_levels:
    os.makedirs(f"logs/{level.lower()}", exist_ok=True)

In [None]:
## create each logger for each level

logger.add(
        "logs/info/info.log",
        level="INFO",
       format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | " \
        "PID:{process.id} | TID{thread.id}| "
        "{message}",
        filter= lambda record: record["level"].name == "INFO"
    )
logger.add(
        "logs/warning/warning.log",
        level="WARNING",
       format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | " \
        "PID:{process.id} | TID{thread.id}| "
        "{message}",
        filter= lambda record: record["level"].name == "WARNING"
    )
logger.add(
        "logs/debug/debug.log",
        level="DEBUG",
       format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | " \
        "PID:{process.id} | TID{thread.id}| "
        "{message}",
        filter= lambda record: record["level"].name == "DEBUG"
    )
logger.add(
        "logs/error/error.log",
        level="ERROR",
       format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | " \
        "PID:{process.id} | TID{thread.id}| "
        "{message}",
        filter= lambda record: record["level"].name == "ERROR"
    )


16

In [None]:
## Load credentials API and database

load_success = load_dotenv()
try:

    if load_success:
        logger.info('Successfully loaded environement variables from .env file')
    else:
        logger.warning('No .env file found or loaded. Continuing without environment variables')
except ValueError as e:
    print(f"Error: {e}")


In [None]:
## get credentials for API 

API_KEY = os.getenv("API_KEY")
API_HOST = "linkedin-job-search-api.p.rapidapi.com"
try:
    if API_KEY is None:
        logger.error("API key not found in environment variables. Application may not function correctly.")
        raise ValueError
    else:
        logger.info("environmnet credentials were successfully retrieved.")
except ValueError as e:
    print(f"Error: {e}")

Extra Data 

In [None]:
## retrieve data from inkedin-job API using an API request

url = "https://linkedin-job-search-api.p.rapidapi.com/active-jb-7d"

headers = {
    "x-rapidapi-key": API_KEY,
    "x-rapidapi-host": API_HOST
}

all_results = []
limit = 100
total_needed = 1000

logger.info(f"Starting API fetch with total_needed ={total_needed} and limit = {limit}")

## return a total of 1000 job postings back and offset by 100
for offset in range(0, total_needed, limit):
    querystring = {
        "limit": str(limit),
        "offset": str(offset),
        "title_filter": "Data Engineer",
        "location_filter": "United States",
        "description_type": "text"
    }

    logger.debug(f"Fetching data with params: {querystring}")

    response = requests.get(url, headers=headers, params=querystring)

    if response.status_code == 200:
        data = response.json()
        if isinstance(data, dict) and "data" in data:
            all_results.extend(data["data"])
        else:
            all_results.extend(data)
    else:
        logger.error(f"Error {response.status_code}: {response.text}")
    logger.info(f"Completed fetch for offset={offset}, limit{limit}, total jobs fetched{all_results}")



In [None]:
## Check to see if results were found from API request

if all_results:
    logger.debug(f"api sample results {json.dumps(all_results[0], indent=4)}")
else:
    logger.error("No results found in API response")

In [None]:
## Flatten semi-structured JSON data into a flat, tabular format

data = pd.json_normalize(all_results)
logger.info("normalize data into a table")
logger.debug(f"shape of table{data.shape}")


In [None]:
## create columns for dataframe

columns = ['id','date_posted', 'title', 'organization','locations_derived','cities_derived','employment_type','remote_derived','salary_raw.value.minValue', 'salary_raw.value.maxValue', 'description_text']
Data_Jobs_data = data[columns]
logger.info("table updated with selected columns")
logger.debug(f"new table {Data_Jobs_data.head(5)}")

In [None]:
## preview the new table

Data_Jobs_data.head(5)

Unnamed: 0,id,date_posted,title,organization,locations_derived,cities_derived,employment_type,remote_derived,salary_raw.value.minValue,salary_raw.value.maxValue,description_text
0,1873929964,2025-09-20T21:32:43.954,Data Engineer,Best Job Tool,[United States],,[FULL_TIME],False,,,About The Company\n\nBering Straits Native Cor...
1,1873849392,2025-09-20T19:53:04,Data Engineer,Vinsys Information Technology Inc,"[Seattle, Washington, United States]",[Seattle],[FULL_TIME],False,,,Data Engineer II Global Marketing Technology\n...
2,1873849728,2025-09-20T19:52:52,Network / Data Center Engineer,Kanak Elite Services,"[Abilene, Texas, United States]",[Abilene],[FULL_TIME],False,,,Role: Data Center Technician\n\nLocation Abile...
3,1873849400,2025-09-20T19:52:50,Data Engineer Lead,Shrive Technologies,"[Fremont, California, United States]",[Fremont],[FULL_TIME],False,,,Microsoft Fabric.\nMicrosoft Azure: Azure Data...
4,1873849403,2025-09-20T19:52:50,Data Analytical Engineer,Vinsys Information Technology Inc,"[Malvern, Pennsylvania, United States]",[Malvern],[FULL_TIME],False,,,Writes ETL (Extract / Transform / Load) proces...


In [None]:
## check date_posted data type

print(Data_Jobs_data["date_posted"].dtype)

object


In [None]:
## change date_posted data type to date

Data_Jobs_data["date_posted"] = pd.to_datetime(Data_Jobs_data["date_posted"], errors="coerce")
Data_Jobs_data["date_posted"] = pd.to_datetime(Data_Jobs_data["date_posted"]).dt.normalize()

if pd.api.types.is_datetime64_any_dtype(Data_Jobs_data["date_posted"]):
    logger.info(
        f"'date_posted' column data type successfully changed to {Data_Jobs_data['date_posted'].dtype}"
    )
else:
    logger.error(
        f"Unsuccessful at changing 'date_posted' data type, current dtype is {Data_Jobs_data['date_posted'].dtype}"
    )
    raise ValueError
    
logger.debug(f"column date_posted is now displayed as{Data_Jobs_data.head(1)}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data["date_posted"] = pd.to_datetime(Data_Jobs_data["date_posted"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data["date_posted"] = pd.to_datetime(Data_Jobs_data["date_posted"]).dt.normalize()


In [None]:
## change the column cities_derived from a list to a string separated by commas

logger.debug("Starting transformation of 'cities_derived' column.")

before_dtype = Data_Jobs_data["cities_derived"].dtype

logger.info(f"Original dtype of 'cities_derived': {before_dtype}")

Data_Jobs_data["cities_derived"] = Data_Jobs_data["cities_derived"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)

after_dtype = Data_Jobs_data["cities_derived"].dtype

logger.info(f"Transformation successful. New dtype of 'cities_derived': {after_dtype}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data["cities_derived"] = Data_Jobs_data["cities_derived"].apply(lambda x: ", ".join(x) if isinstance(x, list) else x)


In [None]:
## preview cities_derived column

Data_Jobs_data["cities_derived"].head(5)

0       None
1    Seattle
2    Abilene
3    Fremont
4    Malvern
Name: cities_derived, dtype: object

In [None]:
## get the first text for employment_type

logger.debug(f"dataframe before transformation{Data_Jobs_data['employment_type'].head(5)}")
Data_Jobs_data['employment_type'] = Data_Jobs_data['employment_type'].str[0]
logger.debug(f"dataframe after transformation{Data_Jobs_data['employment_type'].head(5)}")


In [None]:
## change format of salary_raw.value.minValue from decimal to currency

logger.debug(f"dataframe before transformation{Data_Jobs_data['salary_raw.value.minValue'].head(2)}")
Data_Jobs_data['salary_raw.value.minValue'] = Data_Jobs_data['salary_raw.value.minValue'].apply(lambda x: "${:,.2f}".format(x) if pd.notnull(x) else x)
logger.debug(f"dataframe after transformation{Data_Jobs_data['salary_raw.value.minValue'].head(2)}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data['salary_raw.value.minValue'] = Data_Jobs_data['salary_raw.value.minValue'].apply(lambda x: "${:,.2f}".format(x) if pd.notnull(x) else x)


In [None]:
## change format of salary_raw.value.maxValue from decimal  to currency

logger.debug(f"dataframe before transformation{Data_Jobs_data['salary_raw.value.maxValue'].head(2)}")
Data_Jobs_data['salary_raw.value.maxValue'] = Data_Jobs_data['salary_raw.value.maxValue'].apply(lambda x: "${:,.2f}".format(x) if pd.notnull(x) else x)
logger.debug(f"dataframe after transformation{Data_Jobs_data['salary_raw.value.maxValue'].head(2)}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data['salary_raw.value.maxValue'] = Data_Jobs_data['salary_raw.value.maxValue'].apply(lambda x: "${:,.2f}".format(x) if pd.notnull(x) else x)


In [None]:
## turn locations derived column into a string

logger.debug(f"dataframe before transformation{Data_Jobs_data['locations_derived'].head(2)}")
Data_Jobs_data['locations_derived'] = Data_Jobs_data['locations_derived'].str[0]
logger.debug(f"dataframe after transformation{Data_Jobs_data['locations_derived'].head(2)}")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  Data_Jobs_data['locations_derived'] = Data_Jobs_data['locations_derived'].str[0]


In [None]:
## split values in locations_derived into separate columns  and drop columns that are not states

logger.debug(f"dataframe before transformation{Data_Jobs_data['locations_derived'].head(2)}")
Data_Jobs_data_expanded = Data_Jobs_data['locations_derived'].str.split(',', expand = True)
Data_Jobs_data_expanded_with = Data_Jobs_data.join(Data_Jobs_data_expanded)
Data_Jobs_data_expanded_with
Data_Jobs_data = Data_Jobs_data_expanded_with.drop([2, 0], axis=1)
logger.debug(f"dataframe after transformation{Data_Jobs_data['locations_derived'].head(2)}")

In [None]:
## permantely change the column that have states to the name state

Data_Jobs_data.rename(columns={1: 'state'}, inplace= True)
logger.debug(f"new column created {Data_Jobs_data['state'].head(3)}")

In [None]:
## create new column names for dataframe that includes the column state

columns = ['id','date_posted','title','organization','locations_derived','state','cities_derived','employment_type','remote_derived','salary_raw.value.minValue','salary_raw.value.maxValue','description_text']
Data_Jobs_data[columns]
pd.set_option('display.max_rows', 10)
pd.set_option('display.max_columns', 10)

logger.debug(f"Dataframe with updated columns {Data_Jobs_data.head(4)}")

pd.reset_option('display.max_rows')
pd.reset_option('display.max_columns')

In [None]:
## extract only the state from the locations_derived table
def extract_state(location):
    if pd.isna(location):  # handle NaN
        return None
    parts = location.split(",")
    if len(parts) == 3:
        return parts[1].strip()
    elif len(parts) == 2:
        return parts[0].strip()
    else:
        return location.strip()

Data_Jobs_data["state"] = Data_Jobs_data["locations_derived"].apply(extract_state)
logger.debug(f"dataframe after extracting state {Data_Jobs_data["state"].head(2)}")
        
        
    

In [None]:
## remove the locations_derived column

logger.debug(f"dataframe before transformation{Data_Jobs_data['locations_derived'].head(2)}")
Data_Jobs_data = Data_Jobs_data.drop('locations_derived', axis= 1)

pd.set_option('display.max_rows', 10)
pd.set_option('display.max_columns', 10)

logger.debug(f"dataframe before transformation{Data_Jobs_data.head(1)}")

pd.reset_option('display.max_rows')
pd.reset_option('display.max_columns')

In [None]:
## insert state column right before cities column

logger.debug(f"dataframe before transformation{Data_Jobs_data['state'].head(2)}")
cols = list(Data_Jobs_data.columns)
cols.insert(4, cols.pop(cols.index('state')))
Data_Jobs_data = Data_Jobs_data[cols]
logger.debug(f"dataframe after transformation{Data_Jobs_data['state'].head(2)}")

In [None]:
## change description_text data type from an object to a string

logger.debug(f"dataframe before transformation{Data_Jobs_data['description_text'].dtype}")
Data_Jobs_data['description_text'] = Data_Jobs_data['description_text'].astype('string')
after_dtype = Data_Jobs_data["description_text"].dtype
logger.debug(f"'description_text' dtype after transformation: {after_dtype}")

In [None]:
## separate each individual skill by a comma for each row

import re
import string

skills_list = [
    "python", "sql", "spark", "aws", "azure", "snowflake", "data processing",
    "data storage", "data management", "data ingestion", "data preparation",
    "data provisioning", "real-time processing", "informatica",
    "bachelor's degree", "engineering", "mathematics", "computer science",
    "ai/ml", "ci/cd", "postgres", "nosql", "data warehouse", "rdbms",
    "datalake", "github", "devops", "mapreduce", "hive", "emr", "kafka",
    "gurobi", "docker", "kubernetes", "big data", "c++", "javascript",
    "cassandra", "pandas", "data pipelines", "java", "lake houses",
    "apache iceberg", "tableau", "power bi", "data modelling",
    "data modeling", "data models", "apache beam", "bigquery", "gcp",
    "machine learning", "google cloud", "data pipeline", "fabric",
    "data warehousing", "linux", "windows", "unix", "databricks", "etl"
]
logger.debug(f"dataframe before transformation{Data_Jobs_data['description_text'].head(1)}")

def clean_text(text):
    text = text.lower()
    text = text.translate(str.maketrans("", "", string.punctuation))  # remove punctuation
    return text

def extract_skills(text):
    if pd.isna(text):
        return None
    
    text = clean_text(text)
    found = []
    
    for skill in skills_list:
        # word boundary match if it's a single word
        if " " not in skill and "/" not in skill and "-" not in skill:
            if re.search(rf"\b{re.escape(skill)}\b", text):
                found.append(skill)
        else:
            # direct substring match for multi-word or special cases
            if skill in text:
                found.append(skill)
    
    return found if found else None
Data_Jobs_data["skills_requirements"] = Data_Jobs_data["description_text"].apply(extract_skills)
logger.debug(f"dataframe after transformation{Data_Jobs_data["skills_requirements"].head(1)}")

In [None]:
## remove the column description_text

logger.debug(f"dataframe before dropping description_text column{Data_Jobs_data.columns}")
Data_Jobs_data = Data_Jobs_data.drop('description_text', axis= 1)
logger.debug(f"dataframe after dropping description_text column{Data_Jobs_data.columns}")

In [None]:
## join all skills together for each posting

logger.debug(f"dataframe before skills requirements transformation{Data_Jobs_data["skills_requirements"].head(2)}")
Data_Jobs_data["skills_requirements"] = (
    Data_Jobs_data["skills_requirements"]
    .apply(lambda x: ", ".join(x) if isinstance(x, list) else x)
)
logger.debug(f"dataframe after skills requirements transformation{Data_Jobs_data["skills_requirements"].head(2)}")

In [73]:
logger.info("dataframe transformation complete")
Data_Jobs_data

Unnamed: 0,id,date_posted,title,organization,state,cities_derived,employment_type,remote_derived,salary_raw.value.minValue,salary_raw.value.maxValue,skills_requirements
0,1873929964,2025-09-20,Data Engineer,Best Job Tool,,,[FULL_TIME],False,,,"python, sql, spark, data management, engineeri..."
1,1873849392,NaT,Data Engineer,Vinsys Information Technology Inc,Washington,Seattle,[FULL_TIME],False,,,"python, sql, spark, azure, snowflake, data man..."
2,1873849728,NaT,Network / Data Center Engineer,Kanak Elite Services,Texas,Abilene,[FULL_TIME],False,,,
3,1873849400,NaT,Data Engineer Lead,Shrive Technologies,California,Fremont,[FULL_TIME],False,,,"python, sql, spark, azure, engineering, kafka,..."
4,1873849403,NaT,Data Analytical Engineer,Vinsys Information Technology Inc,Pennsylvania,Malvern,[FULL_TIME],False,,,"python, sql, spark, aws, data processing, gith..."
...,...,...,...,...,...,...,...,...,...,...,...
995,1872288869,NaT,Data Engineer - Senior Manager,PwC,United States,,[FULL_TIME],False,"$124,000.00","$280,000.00","aws, azure, snowflake, data processing, engine..."
996,1872288831,NaT,Data Engineer - Senior Manager,PwC,United States,,[FULL_TIME],False,"$124,000.00","$280,000.00","aws, azure, snowflake, data processing, engine..."
997,1872289378,NaT,Mechanical Engineer - Data Center Components,Hyper Solutions,Virginia,,[FULL_TIME],False,,,engineering
998,1872288075,NaT,GCP Data Engineer - Senior Manager,PwC,United States,,[FULL_TIME],False,"$124,000.00","$280,000.00","sql, spark, snowflake, data processing, data s..."


In [76]:
SQL_SERVER_DATABASE = "Linkedin_jobs"
SQL_SERVER_HOST = os.getenv("SQL_SERVER_HOST")
SQL_SERVER_DATABASE = os.getenv("SQL_SERVER_DATABASE")
print(SQL_SERVER_DATABASE)

Linkedin_jobs


In [None]:
## create connection to SQL SERVER

SQL_SERVER_HOST = "LPJZ5KFY3"
SQL_SERVER_DATABASE = "Linkedin_jobs"
conn_str = (
    "DRIVER={ODBC Driver 17 for SQL Server};"
    f"SERVER={SQL_SERVER_HOST};"
    f"DATABASE={SQL_SERVER_DATABASE};"
    "Trusted_Connection=yes;"
)

conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

cursor.execute("SELECT @@VERSION;")
print(cursor.fetchone())

('Microsoft SQL Server 2022 (RTM-CU20-GDR) (KB5065220) - 16.0.4212.1 (X64) \n\tAug 13 2025 16:40:40 \n\tCopyright (C) 2022 Microsoft Corporation\n\tDeveloper Edition (64-bit) on Windows 10 Enterprise 10.0 <X64> (Build 26100: ) (Hypervisor)\n',)


In [None]:
## Create table in Linkedin_jobs database

create_table_sql = """
IF NOT EXISTS (
    SELECT 1 FROM sysobjects 
    WHERE name = 'Linkedin_jobs' AND xtype = 'U'
)
BEGIN

    CREATE TABLE Linkedin_jobs(
        id INTEGER PRIMARY KEY,
        date_posted DATE,
        title varchar(300),
        organization varchar(300),
        state varchar(300),
        cities_derived varchar(300),
        employment_type varchar(300),
        remote_derived BIT DEFAULT 0,
        salary_raw_value_minValue MONEY, 
        salary_raw_value_maxValue MONEY,
        skills_requirements varchar(MAX)
    );
END
"""
cursor.execute(create_table_sql)
conn.commit()
conn.close()

In [None]:
## create SQL SERVER engine

engine = create_engine(f'mssql+pyodbc://{SQL_SERVER_HOST}/{SQL_SERVER_DATABASE}?driver=ODBC+Driver+17+for+SQL+Server')

In [82]:
# Ensure a copy (avoids SettingWithCopyWarning)
Data_Jobs_data = Data_Jobs_data.copy()

# Rename columns to match SQL Server table
Data_Jobs_data = Data_Jobs_data.rename(columns={
    "salary_raw.value.minValue": "salary_raw_value_minValue",
    "salary_raw.value.maxValue": "salary_raw_value_maxValue"
})

# Also make sure 'employment_type' is a string, not a list
if Data_Jobs_data["employment_type"].apply(lambda x: isinstance(x, list)).any():
    Data_Jobs_data["employment_type"] = Data_Jobs_data["employment_type"].apply(
        lambda x: ", ".join(x) if isinstance(x, list) else x
    )

# Insert into SQL Server
table_name = 'Linkedin_jobs'
try:
    Data_Jobs_data.to_sql(table_name, engine, if_exists='append', index=False, chunksize=1000)
    print(f"✅ Data successfully inserted into {table_name}.")
    logger.info("data successfully loaded into database")
except Exception as e:
    logger.error(f"Error inserting data: {e}")


✅ Data successfully inserted into Linkedin_jobs.
