In [None]:
from collections import defaultdict
from pymongo import MongoClient
from dotenv import load_dotenv
import pandas as pd
import json
import os
import re
import ast

In [None]:
mongo_host = '127.0.0.1'
mongo_port = '27017'
mongo_db = 'raw_data'
connection_string = f"mongodb://{mongo_host}:{mongo_port}/"

In [None]:
collection_name = 'okjob_test'

In [None]:
client = MongoClient(connection_string)
db = client[mongo_db]
collection = db[collection_name]

In [None]:
display(db['okjob_test'].find_one())

In [None]:
query = {
    "$and": [
        {"sourceId": {"$ne": ""}},
        {"Job-Title": {"$ne": ""}},  
        {"Location": {"$ne": ""}},
        {"Salary-Min": {"$ne": ""}},
        {"Salary-Max": {"$ne": ""}},
        {"LinkedIn-Job-Link": {"$ne": ""}},
        {"Job-Type": {"$ne": ""}},
        {"Job-Tags": {"$ne": ""}},
        {"Job-Category": {"$regex": "data", "$options": "i"}} # Case-insensitive search for "data" in jobTitle
      # {"currency": {"$ne": ""}}
    ]
}
projection = {
    "sourceId": 1,
    "Job-Title": 1,
    "Location": 1,
    "Salary-Min": 1,
    "Salary-Max": 1,
    "LinkedIn-Job-Link": 1,
    "Job-Type": 1,
    "Job-Tags": 1,
    "Job-Category": 1,
    "_id": 0
}

documents = collection.find(query, projection)
data_objects = []
for doc in documents:
    data_objects.append(doc)
df_ok = pd.DataFrame(data_objects)
df_ok['Salary-Min'] = pd.to_numeric(df_ok['Salary-Min'], errors='coerce')
df_ok['Salary-Max'] = pd.to_numeric(df_ok['Salary-Max'], errors='coerce')
# didn't dropna or filter salary-min, otherwise the dataset is very small, we should fill them with mean/mode later
df_ok.head(5)





In [None]:
print(len(df_ok))

In [None]:
# Categorize job titles
import spacy
nlp = spacy.load('en_core_web_sm')
def categorize_seniority(job_title):
    doc = nlp(job_title)
    # List of keywords (lemmas) to look for
    keywords_senior = ['strategic', 'principal', 'staff', 'lead', 'senior', 'head']
    keywords_junior = ['trainee', 'junior', 'apprentice', 'entry level' ]
    # Check if any token's lemma is in our keywords list
    if any(token.lemma_.lower() in keywords_senior for token in doc):
        return 'Senior'
    elif any(token.lemma_.lower() in keywords_junior for token in doc):
        return 'Junior'
    else:
        return 'Any'

# Apply the function to create a new column
df_ok['jobLevel'] = df_ok['Job-Title'].apply(categorize_seniority)


In [None]:
print(df_ok['Job-Category'].unique) 
print(df_ok['Job-Tags'].unique) 
print(df_ok['Job-Type'].unique) 


In [None]:
# Define your lists of skills, technologies, and site preferences

keywords_skills = [
    "SQL", "Structured Query Language", "Python", "R", "Docker", "AWS", "Amazon Web Services",
    "Azure", "Google Cloud Platform", "GCP", "Snowflake", "Hadoop", "Spark", "Kubernetes",
    "Jenkins", "BI", "Business Intelligence", "Tableau", "Power BI", "Looker", "ETL",
    "Extract Transform Load", "Informatica", "Talend", "SSIS", "CRM",
    "Customer Relationship Management", "Salesforce", "SAP", "Git", "NoSQL", "MongoDB",
    "Cassandra", "PostgreSQL", "MySQL", "Data Modeling", "Machine Learning", "ML", "AI",
    "Apache Kafka", "Redis", "Elasticsearch", "Kibana", "Ansible", "REST", "RESTful", "API",
    "GraphQL", "Linux", "Matplotlib", "Seaborn", "Jupyter Notebook", "Scikit-learn",
    "TensorFlow", "PyTorch", "Data Lakes", "Data Warehousing", "Agile", "Scrum", "Blockchain",
    "Edge Computing", "VMware", "SAS", "Flask", "Django", "Apache", "Airflow", "Luigi", "NLP",
    "Databricks", "redshift", "Excel", "HANA", "Oracle", "crypto", "BigQuery", "DataGovernance"
]

keywords_site = ['remote', 'hybrid', 'on-site', "flexible"]
# Function to categorize job titles and descriptions by keywords, accepting a keyword list as a parameter
def categorize_by_keywords(text, keywords):
    doc = nlp(text)
    # Initialize an empty set to avoid duplicates
    keywords_found = set()
    # Check each token in the text
    for token in doc:
        # Normalize the token's text for case-insensitive matching
        token_text = token.text.lower()
        # If the normalized token is in our list of keywords, add the original token text to the set
        if token_text in [keyword.lower() for keyword in keywords]:
            keywords_found.add(token.text)
    # Return a comma-separated string of unique keywords found, or "None" if no keywords were identified
    return ', '.join(keywords_found) if keywords_found else "None"

# Applying the function to each row for both jobSkills and jobSite columns
df_ok['jobSkills'] = df_ok.apply(lambda row: categorize_by_keywords(row['Job-Tags'], keywords_skills), axis=1)
df_ok['jobSite'] = df_ok.apply(lambda row: categorize_by_keywords(row['Job-Type'], keywords_site), axis=1)





In [None]:
# categorize titles
keywords_title = {
    "data administrator": ["data", "administrator", "entry", "protection", "officer", "clerk", "admin", "migration", "cleanser", "inputter", "coordinator", "assistant", "pocessor", "auditor", "governance", "apprentice", "executive", "manager"],
    "data engineer": ["data", "engineer", "developer", "engineering", "modeller", "technical"],
    "data analyst": ["data", "analyst", "analytics", "analysis", "investigation", "workstream", "visualisation", "insight", "consultant"],
    "database administrator": ["database", "administrator", "assistant", "manager"],
    "data scientist": ["data", "scientist", "science", "engineer"],
    "data center": ["data", "center", "cabling", "installer", "installation", "engineer"],
    "data test": ["data", "test", "tester", "automation", "processing"],
    "data architect": ["data", "architect"],
    "manager": ["head", "manager", "director", "procurement", "management"]  
}
def categorize_job_titles(job_title, keywords_title):
    # Prepare the job title: lowercase, remove special characters, and split into words
    words = re.sub('[^a-z0-9\s]', '', job_title.lower()).split()
    
    # Initialize a dictionary to hold the count of matches for each category
    matches = defaultdict(int)
    # Initialize a dictionary to hold the sum of indexes for matched words for tie-breaking
    index_sums = defaultdict(int)
    
    for category, keywords in keywords_title.items():
        for word in words:
            if word in keywords:
                matches[category] += 1
                # Sum the indexes of matched words for tie-breaking
                index_sums[category] += keywords.index(word)
    
    if not matches:
        return 'Other'
    
    # Find the category(ies) with the maximum count of matches
    max_matches = max(matches.values())
    candidates = [category for category, count in matches.items() if count == max_matches]
    
    # If there's a single best match, return it
    if len(candidates) == 1:
        return candidates[0]
    
    # If there are ties, use the sum of indexes for tie-breaking
    return min(candidates, key=lambda category: index_sums[category])
# Apply the function to the 'jobTitle' column and create a new 'jobCategory' column
#df_ok['jobCategory'] = df_ok['Job-Category'].apply(lambda x: categorize_job_titles(x, keywords_title))
df_ok['jobCategory'] = df_ok.apply(
    lambda row: categorize_job_titles(row['Job-Title'] + " " + row['Job-Category'], keywords_title), axis=1)


In [None]:
df_ok = df_ok[['jobCategory', 'jobLevel', 'jobSkills', 'jobSite', 'Salary-Min', 'Salary-Max', 'sourceId','Job-Title','LinkedIn-Job-Link','Location' ]]
df_ok.drop_duplicates(inplace=True)

In [None]:
print(len(df_ok))

In [None]:
def clean_sort_and_deduplicate(text):
    if not isinstance(text, str):
        return text
    # Lowercase, strip whitespace, and split on commas
    parts = text.lower().strip().split(',')
    # Remove duplicates and sort
    cleaned_parts = sorted(set(part.strip() for part in parts))
    # Join the cleaned parts back into a single string
    return ', '.join(cleaned_parts)


In [None]:
# Apply the modified function to sort the terms within the 'jobSite' column using .loc
df_ok.loc[:, 'jobSite'] = df_ok['jobSite'].apply(clean_sort_and_deduplicate)

In [None]:
# Mapping of source column names to postgres db column names
column_mappings = {
    'sourceId': 'source_id',
    'Job-Title': 'job_title_name',
    'jobLevel': 'experience_level',
    'Salary-Min': 'salary_min',
    'Salary-Max': 'salary_max',
    'LinkedIn-Job-Link': 'joboffer_url',
    'Location': 'location_country',
    'jobSite': 'job_site',
    'jobSkills': 'skills',
    'jobCategory': 'categories'
}
additional_columns = {
    'data_source_name': 'ok'
}
# Rename columns based on the mapping
df_ok.rename(columns=column_mappings, inplace=True)
df_ok.head()

In [None]:
df_ok_postgres = df_ok[
    [column_mappings.get(col, col) for col in column_mappings.keys()]
]

In [None]:
# Add additional columns with default values
for col, default_value in additional_columns.items():
    df_ok_postgres[col] = default_value

In [None]:
# fill salary 

df_ok_postgres['salary_min'] = df_ok_postgres.groupby('categories')['salary_min'].transform(lambda x: x.fillna(x.mean()))
df_ok_postgres['salary_max'] = df_ok_postgres.groupby('categories')['salary_max'].transform(lambda x: x.fillna(x.mean()))

In [None]:
print(df_ok_postgres.head(20))

In [None]:
# Ensure salaries are integers and greater than 0
df_ok_postgres['salary_min'] = pd.to_numeric(df_ok_postgres['salary_min'], errors='coerce').fillna(0).astype(int)
df_ok_postgres['salary_max'] = pd.to_numeric(df_ok_postgres['salary_max'], errors='coerce').fillna(0).astype(int)
df_ok_postgres = df_ok_postgres[(df_ok_postgres['salary_min'] > 0) & (df_ok_postgres['salary_max'] > 0)]
# Set the types for other fields as strings
df_ok_postgres['source_id'] = df_ok_postgres['source_id'].astype(str)
df_ok_postgres['experience_level'] = df_ok_postgres['experience_level'].astype(str)
df_ok_postgres['joboffer_url'] = df_ok_postgres['joboffer_url'].astype(str)
#df_ok_postgres['currency_symbol'] = df_ok_postgres['currency_symbol'].astype(str)
df_ok_postgres['location_country'] = df_ok_postgres['location_country'].astype(str)
df_ok_postgres['data_source_name'] = df_ok_postgres['data_source_name'].astype(str)
#df_ok_postgres['skills'] = df_ok_postgres['skills'].astype(list)
#df_ok_postgres['categories'] = df_ok_postgres['categories'].astype(list)
df_ok_postgres['job_site'] = df_ok_postgres['job_site'].astype(str)

In [None]:
# placeholder for country column
from geopy.geocoders import Nominatim
geolocator = Nominatim(user_agent="geoapiExercises")

def city_to_country(city):
    try:
        # Geolocate the city
        location = geolocator.geocode(city)
        # Return the country
        return location.address.split(',')[-1]
    except:
        # Return None if the city can't be geolocated
        return None

df_ok_postgres['location_country'] = df_ok_postgres['location_country'].apply(city_to_country)


In [None]:
# placeholder for currency and published date