In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, isnan, when, count, avg, min, max, stddev, year, row_number, to_date, when
from pyspark.sql.window import Window

spark = SparkSession.builder. \
    appName("pyspark-1"). \
    getOrCreate()

### Read data

In [3]:
df = spark.read.csv("/dataset/nyc-jobs.csv", header=True)
#df.printSchema()

#df.show(5)

#print(f"Total number for rows: {df.count()}")

#df.describe().show()

#df.select([count(when(col(c).isNull() | isnan(c) | (col(c) == ''), c)).alias(c) for c in df.columns]).show()

#df.select("Job Category").distinct().show(10)


### Sample function

In [4]:
def get_salary_frequency(df) -> list:
    row_list = df.select('Salary Frequency').distinct().collect()
    return [row['Salary Frequency'] for row in row_list]

In [5]:
def clean_data(df):
    df = df.withColumn("Salary Range From", col("Salary Range From").cast("double"))\
        .withColumn("Salary Range To", col("Salary Range To").cast("double"))
    df = df.withColumn("Salary Mean", (col("Salary Range From")+col("Salary Range To")) / 2)\
            .withColumn("Posting Date", to_date(col("Posting Date"), 'yyyy-MM-dd'))
    return df

In [6]:
def top_10_jobs_per_category(df): 
    return df.groupBy('Job Category').count().orderBy("count", ascending=False).limit(10)

In [7]:
def salary_distribution_per_category(df):
    return df.groupBy('Job Category').agg(avg("Salary Mean").alias("avg_salary"),
                                         min("Salary Mean").alias("min_salary"),
                                         max("Salary Mean").alias("max_salary"),
                                         stddev("Salary Mean").alias("stddev_salary")
                                         ).orderBy("avg_salary", ascending=False)

In [8]:
def highest_salary_per_agency(df):
    window_specs = Window.partitionBy("Agency").orderBy(col("Salary Mean").desc())
    return df.withColumn("rank", row_number().over(window_specs)
                        ).filter(col("rank") == 1
                                ).select("Agency", "Job Category", "Salary Mean")
    

In [9]:
def last_2year_avg_salary_agency(df):
    max_year = df.select(year(col("Posting Date")).alias("year")).agg(max("year").alias("max_year")).collect()[0]["max_year"]
    recent_df = df.filter(year(col("Posting Date")) > (max_year-2))
    return recent_df.groupBy("Agency").agg(avg("Salary Mean").alias("avg_salary")
                                          ).orderBy(col("avg_salary").desc())


In [10]:
def highest_paid_skills_us(df):
    skills_keywords = ['Python', 'SQL','Java','Spark','AWS','Azure','GCP','Kubernetes','Docker','Tableau','Power BI', 'Snowflake','Databricks', 'Pandas']
    for skill in skills_keywords:
        df = df.withColumn(skill, when(col("Job Description").rlike(f"(?i){skill}"), 1).otherwise(0))
    paid_per_skills = {}
    for skill in skills_keywords:
        paid_per_skills[skill] = df.filter(col(skill) == 1).agg(avg("Salary Mean").alias(f"{skill}_avg_salary")).collect()[0][0]
    return pd.DataFrame(paid_per_skills.items(), columns=['Skills', 'Salary'])
    

In [11]:
def remove_useless_values(df):
    total_rows = df.count()
    null_counts = df.select([
        count(
            when(col(c).isNull() | isnan(col(c)) | (col(c) == ''), c)
        ).alias(c) 
        for c in df.columns
    ])
    null_ratios = null_counts.collect()[0].asDict()
    column_to_drop = [col_name for col_name, nulls in null_ratios.items() if nulls/total_rows > 0.5]
    print("Drpping Columns : ", column_to_drop)
    
    return df.drop(*column_to_drop)

In [16]:
def write_data_to_target(df, output_path:str):
    df.write.mode("overwrite").option("header", "true").csv(output_path)

In [13]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
sns.set(style="whitegrid")
def visualize_result(df, x_col, y_col=None, title=''):
    if isinstance(df, DataFrame):
        pd_df = df.toPandas()
    else:
        pd_df = df
    plt.figure(figsize=(12, 6))
    sns.barplot(data=pd_df, x=x_col, y=y_col, palette="magma")
    plt.title(title)
    plt.tight_layout()
    plt.show()
    

In [20]:
#df=remove_useless_values(df)
df=clean_data(df)
df.count()
output_path = "./output/process_file"
write_data_to_target(df, output_path)
'''
rec_df = top_10_jobs_per_category(df)
rec_df.show(5)
visualize_result(rec_df, x_col='count', y_col="Job Category", title="Top 10 Salary")

salary_per_cat_df=salary_distribution_per_category(df).limit(10)
salary_per_cat_df.show(5)
visualize_result(salary_per_cat_df, x_col='avg_salary', y_col='Job Category', title='Average Salary distribution per catagory')
visualize_result(salary_per_cat_df, x_col='min_salary', y_col='Job Category', title='Min Salary distribution per catagory')
visualize_result(salary_per_cat_df, x_col='max_salary', y_col='Job Category', title='max Salary distribution per catagory')
visualize_result(salary_per_cat_df, x_col='stddev_salary', y_col='Job Category', title='stddevSalary distribution per catagory')

highest_sal_df = highest_salary_per_agency(df)
visualize_result(highest_sal_df.limit(10), x_col='Salary Mean', y_col='Agency', title='highest Salary as per Agency')

last2_year_avg_sal_df = last_2year_avg_salary_agency(df)
visualize_result(last2_year_avg_sal_df.limit(10), x_col='avg_salary', y_col='Agency', title='Average Salary as per Agency')

skills_paid_df = highest_paid_skills_us(df)
visualize_result(skills_paid_df, x_col='Salary', y_col='Skills', title='Sallary Based on Skills')
'''

'\nrec_df = top_10_jobs_per_category(df)\nrec_df.show(5)\nvisualize_result(rec_df, x_col=\'count\', y_col="Job Category", title="Top 10 Salary")\n\nsalary_per_cat_df=salary_distribution_per_category(df).limit(10)\nsalary_per_cat_df.show(5)\nvisualize_result(salary_per_cat_df, x_col=\'avg_salary\', y_col=\'Job Category\', title=\'Average Salary distribution per catagory\')\nvisualize_result(salary_per_cat_df, x_col=\'min_salary\', y_col=\'Job Category\', title=\'Min Salary distribution per catagory\')\nvisualize_result(salary_per_cat_df, x_col=\'max_salary\', y_col=\'Job Category\', title=\'max Salary distribution per catagory\')\nvisualize_result(salary_per_cat_df, x_col=\'stddev_salary\', y_col=\'Job Category\', title=\'stddevSalary distribution per catagory\')\n\nhighest_sal_df = highest_salary_per_agency(df)\nvisualize_result(highest_sal_df.limit(10), x_col=\'Salary Mean\', y_col=\'Agency\', title=\'highest Salary as per Agency\')\n\nlast2_year_avg_sal_df = last_2year_avg_salary_age

### Example of test function

In [None]:
mock_data = [('A', 'Annual'), ('B', 'Daily')]
expected_result = ['Annual', 'Daily']

In [None]:
def test_get_salary_frequency(mock_data: list, 
                              expected_result: list,
                              schema: list = ['id', 'Salary Frequency']):  
    mock_df = spark.createDataFrame(data = mock_data, schema = schema)
    #assert get_salary_frequency(mock_df) == expected_result