In [None]:
from pyspark.sql.types import LongType, FloatType, StringType
from pyspark.sql.functions import col, udf, desc
import re
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('career').getOrCreate()

In [None]:
json_dataset = "marketing_sample_for_careerbuilder_usa-careerbuilder_job_listing__20200401_20200630__30k_data.ldjson"
df = spark.read.json(json_dataset)

In [None]:
df.limit(10).toPandas().head()

### calculate number of jobs posted on daily basis, per each city

In [None]:
jobs_per_date_nd_each_state_df = df.groupBy(["post_date","city"]).count()
jobs_per_date_nd_each_state_df.write.option('header',True).csv('jobs_per_date_nd_each_state.csv')
jobs_per_date_nd_each_state_df.limit(10).toPandas().head()

### calculate average salary per job title and state

In [None]:
HOURS_IN_YEAR = 8760

In [None]:
def return_yearly_salary(salary: str)-> float:
    """
    Extract annual salary
    args: salary - salary in different formats.
    return: annual salary in float.
    """
    avg_yearly_salary = 0.0
    salary = salary.replace(",","")
    if "$" in salary:
        if "." in salary:
            # float values filter
            digit_filter = "\d+\.\d+"
        else:
            # int values filter
            digit_filter = "\d+"
            
        all_digits = re.findall(digit_filter, salary)
        if re.search('hour',salary,re.IGNORECASE):
            avg_hourly_salary = sum(float(digit) for digit in all_digits)/len(all_digits)
            avg_yearly_salary = float(avg_hourly_salary * HOURS_IN_YEAR)
        else:
            avg_yearly_salary = sum(float(digit) for digit in all_digits) / len(all_digits)
            
    return avg_yearly_salary

In [None]:
yearly_udf = udf(lambda z: return_yearly_salary(z), FloatType())

In [None]:
df = df.withColumn("cleaned_yearly_salary", yearly_udf(col("salary_offered")))

In [None]:
average_salary_per_job_title_and_state_df = df.filter(df.cleaned_yearly_salary!=0.0).groupBy(["job_title","state"]).avg("cleaned_yearly_salary")
average_salary_per_job_title_and_state_df.write.option('header',True).csv('average_salary_per_job_title_and_state.csv')
average_salary_per_job_title_and_state_df.limit(10).toPandas().head()

### Identify the top 10 most active companies by number of positions opened

In [None]:
top_10_most_active_companies_by_number_of_positions_opened_df = df.filter(df.has_expired=='false').groupBy("company_name").count().sort(desc("count")).limit(10)
top_10_most_active_companies_by_number_of_positions_opened_df.write.option('header',True).csv('top_10_most_active_companies_by_number_of_positions_opened.csv')

In [None]:
top_10_most_active_companies_by_number_of_positions_opened_df.limit(10).toPandas()

### Create a UDF function to clean job description from HTML code contained inside

In [None]:
# source: https://stackoverflow.com/questions/9662346/python-code-to-remove-html-tags-from-a-string
CLEANR = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});')

def cleanhtml(raw_html: str) -> str:
    """
    Code to clean the html
    args: raw_html - html in string format.
    return: cleaned html text
    """
    if type(raw_html)!=str:
        return ""
    cleantext = re.sub(CLEANR, '', raw_html)
    cleantext = cleantext.replace('/>','.')
    return cleantext

In [None]:
html_cleaner_udf = udf(lambda z: cleanhtml(z),StringType())

df = df.filter(df.job_description.isNotNull())

cleaned_job_description_df = df.withColumn("cleaned_job_description", html_cleaner_udf(col("job_description")))
cleaned_job_description_df.write.option('header',True).csv('html_cleaned_job_description.csv')


In [None]:
cleaned_job_description_df.limit(10).toPandas().head()