In [None]:
dach_df = spark.read.table("default.li_jobs_dach_remote_software_engineer_2023_05_11_2")
hu_df = spark.read.table("default.li_jobs_hu_remote_developer_2023_05_11")
de_df = spark.read.table("default.li_jobs_emea_remote_data_engineer_2023_05_11")

combined = dach_df.unionByName(hu_df).unionByName(de_df)

In [None]:
combined.display()

In [None]:
combined.count()

Out[10]: 4433

In [None]:
combined.distinct().count()

Out[11]: 4433

In [None]:
from pyspark.sql.functions import substring_index

cleaned = combined.withColumn("Job_URL", substring_index("Job_URL", "/", 6)).distinct()

In [None]:
cleaned.display()

In [None]:
import requests

subscription_key = 'KEY'
endpoint = "https://api.bing.microsoft.com/v7.0/search"

session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
        pool_connections=50,
        pool_maxsize=50)
session.mount('http://', adapter)


def prepare_request(query: str, subscription_key: str = subscription_key, mkt: str = 'en-US') -> tuple[dict, dict]:
    # Construct a request
    params = {'q': query, 'mkt': mkt, 'answerCount': 1, 'count': 1}
    headers = {'Ocp-Apim-Subscription-Key': subscription_key}
    return headers, params


def get_company_url(company_name: str) -> str:
    try:
        headers, params = prepare_request(company_name)

        response = session.get(endpoint, headers=headers, params=params)
        response.raise_for_status()
        text = response.json()
        url = text.get('webPages').get('value')[0].get('url')
        return url
    except Exception as e:
        return None


In [None]:
print(get_company_url("Zühlke Group"))

https://www.zuehlke.com/en


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

get_company_url_UDF = udf(lambda company_name: get_company_url(company_name), StringType())

In [None]:
cleaned_df = spark.read.table("cleaned_jobs_2023_05_11")

In [None]:
enriched_df = cleaned_df.withColumn('Company_URL', get_company_url_UDF('Company'))

In [None]:
enriched_df.display()

In [None]:
def get_stakeholder(company, location=""):
    search_string = f'site:linkedin.com/in ( "talent acquisition" | "recruitment" | "Human resources" | "HR" ) & ( manager | director ) {location} intitle:{company}'
    return get_company_url(search_string)


get_company_stakeholder_UDF = udf(lambda company_name: get_stakeholder(company_name), StringType())

In [None]:
get_stakeholder("interactive brokers", "hungary")

'https://hu.linkedin.com/in/zsofia-lovas'

In [None]:
from pyspark.sql.types import BooleanType


def recruitment_agency_check(company, url):
    try:
        hints = 0
        keywords = ["recruitment", "talent", "recruiter"]
        if set(company.lower().split()) & set(keywords):
            #print("Recruiter chk 0")
            hints+=1
        response = session.get(url)
        response.raise_for_status()

        for item in keywords:
            if item in response.text:
                #print(f"{item} found on company page.")
                hints+=1
        if hints > 0:
            return True
        return False
    except Exception as e:
        return None


recruitment_agency_check_UDF = udf(lambda company_name, url: recruitment_agency_check(company_name, url), BooleanType())

In [None]:
recruitment_agency_check("ALDI Magyarország", "https://www.aldi.hu/hu/homepage.html")

False

In [None]:
enriched_df = enriched_df.withColumn("Recruitment_Agency", recruitment_agency_check_UDF("Company", "Company_URL"))

In [None]:
enriched_df.display()