In [5]:
import sys; 
sys.path.insert(0, '..')

import findspark
findspark.init()

from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from pyspark.sql.functions import when
from pyspark.sql.types import IntegerType,BooleanType,DateType

spark = SparkSession.builder.appName("pyspark-ETL").getOrCreate() 

def file_read(filename):
    # filename: File name with path to extract data
    
    return spark.read.options(inferSchema='True').option('escape','"').csv(filename, header=True)
    
def gap_filling_by_mode(df_spark,grp_by_cols,req_col):
    # df_spark : Input spark dataframe where missing data need to be populated
    # grp_by_cols : List of columns used for finding the mode of column values, whose null values need to be replaced
    # req_col : Column whose null values need to be replaced
    
    # Creating copy of column lists used for grouping
    grp_by_cols_copy=grp_by_cols.copy()
    grp_by_cols_copy.append(req_col)
    # Finding Mode values of req_col for each group of grp_by_cols values
    df_spark_tmp=df_spark.groupby(grp_by_cols_copy).count().orderBy("count", ascending=False)
    windowSpec  = Window.partitionBy(grp_by_cols).orderBy(f.col("count").desc())
    df_spark_tmp_final=df_spark_tmp.withColumn("rank",f.row_number().over(windowSpec))
    # Rank=1 is used for finding the most frequently used value of req_col(mode) for each combinations of grp_by_cols
    df_spark_tmp_final=df_spark_tmp_final.filter(f.col("rank") == 1).withColumnRenamed(req_col,req_col+"_new")  
    #Collecting the column list of input dataframe
    main_cols=df_spark.columns
    #Joining the Mode dataframe and input dataframe
    df_spark=df_spark.join(df_spark_tmp_final, on=grp_by_cols, how='left')
    #Replacing NULL values of req_col with Mode value(req_col_new)
    df_spark=df_spark.withColumn(req_col,f.coalesce(f.col(req_col),f.col(req_col+"_new")))
    return df_spark[[main_cols]]
    
def get_avg_sal(df_spark, sal_from,sal_to):
    # df_spark : Input spark dataframe 
    # sal_from and sal_to are salary ranges for each job positions
    
    # Will convert the salary columns to Integer type for performing aggregate functions
    df_spark = df_spark.withColumn(sal_from,df_spark[sal_from].cast(IntegerType()))
    df_spark = df_spark.withColumn(sal_to,df_spark[sal_to].cast(IntegerType()))
    
    # Finding average salary from from and To salaries
    df_spark = df_spark.withColumn("salary_avg",(df_spark[sal_from]+df_spark[sal_to])/2.0)
    return df_spark
    
def get_daily_avg_sal(df_spark, sal_avg):
    # df_spark : Input spark dataframe 
    # sal_avg : Avg salary used for normalizing to daily level
    
    #Normalizing the salary to a frequency of Daily
    df_spark = df_spark.withColumn("daily_salary_avg", when(df_spark["Salary Frequency"] == "Annual",df_spark["salary_avg"]/(12*22.00))\
                             .when(df_spark["Salary Frequency"] == "Hourly",df_spark["salary_avg"]*8.00)\
                             .otherwise(df_spark["salary_avg"]))
    return df_spark
  
def get_year_month(df_spark,date_col, year_col,month_col):
    # df_spark : Input spark dataframe 
    # sal_avg : Avg salary used for normalizing to daily level
    
    # column with Year and month details of date_col
    df_spark=df_spark.withColumn(year_col,f.year(f.to_timestamp(df_spark[date_col],'yyyy-MM-dd hh24:mi:ss.ff3'))\
                               .cast(IntegerType())).withColumn(month_col,\
                                                                f.month(f.to_timestamp(df_spark[date_col],'yyyy-MM-dd hh24:mi:ss.ff3'))\
                                                                .cast(IntegerType()))
    return df_spark
    
def remove_special_chars(df_spark,col_name):
    # df_spark : Input spark dataframe 
    # col_name : Column value need to be cleaned
    
    # Cleaning column col_name by reamoving special characters
    df_spark=df_spark.withColumn(col_name, f.regexp_replace(df_spark[col_name], "[^a-zA-Z0-9. ]", ""))
    return df_spark
    
def one_hot_encoding(df_spark,col_name):
    # df_spark : Input spark dataframe 
    # col_name : Column value need to be encoded
    
    # fetching unique column values
    unique_vals =[i[col_name] for i in df_spark.select(col_name).distinct().collect()]
    #Assigning 1 for new column based of values of col_name
    for j in unique_vals:
        df_spark = df_spark.withColumn(j, when(df_spark[col_name] == j,1).otherwise(0))
    return df_spark
    
def drop_feauture(df_spark,col_list):
    # df_spark : Input spark dataframe 
    # col_list : LIST of columns to be removed
    
    # Removing columns in the given list
    for i in col_list:
        df_spark=df_spark.drop(i)
    return df_spark
    
def write_data(df_spark,filename):
    # df_spark : Input spark dataframe 
    # filename : Target file path
    df_spark.coalesce(1).write.mode('overwrite').option('header',True).option("encoding", "UTF-8").option("quote", '"').csv(filename)
    
def main():
    #Creating spark session variable
    #extracting data from the provided file
    df_spark=file_read("/dataset/nyc-jobs.csv")
    
    # Categorical Imputation for filling missing 'Full-Time/Part-Time indicator' and 'Posting Date'
    df_spark=gap_filling_by_mode(df_spark,['Salary Frequency'],'Full-Time/Part-Time indicator')
    df_spark=gap_filling_by_mode(df_spark,['Posting Type','Agency'],'Posting Date')
    
    # Creating Feature salary_avg for quantifying salary across multiple job titles
    df_spark=get_avg_sal(df_spark, "Salary Range From","Salary Range To")
    
    # Creating Feature daily_salary_avg for normalizing salary across multiple job titles irrspective of Salary Frquency
    df_spark=get_daily_avg_sal(df_spark, "salary_avg")
    
    df_spark=get_year_month(df_spark,"Posting Date", "Job_Post_Year","Job_Post_Month")
    df_spark=get_year_month(df_spark,"Process Date", "data_processed_year","data_processed_month")
    
    #Cleaning data by removing special characters in "Preferred Skills" column
    df_spark=remove_special_chars(df_spark,"Preferred Skills")
    
    #One Hot encoding for Posting Type and Salary Frequency for converting Catgorical features to Numerical features
    df_spark=one_hot_encoding(df_spark,'Posting Type')
    df_spark=one_hot_encoding(df_spark,'Salary Frequency')
    
    #Feature Selection by removing the feautures with redundant data like, 'Posting Type','Salary Frequency', 'salary_avg'
    df_spark=drop_feauture(df_spark,['Posting Type','Salary Frequency', 'salary_avg'])
    
    #writing data to csv file
    write_data(df_spark,"/dataset/output.csv")

#if __name__ == "__main__":
    #main()

In [6]:
main()

KeyboardInterrupt: 