# Perform data preprocessing using Spark

In [85]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import random
import re

In [2]:
spark = SparkSession \
    .builder \
    .appName("Spark_data_clean") \
    .config("spark.master", "spark://ip-172-31-13-80.ap-southeast-2.compute.internal:7077") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.11:0.12.2") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.num", "1") \
    .getOrCreate()

### 1. Load the raw data and add in the heading

In [3]:
df = spark.read.format("com.crealytics.spark.excel") \
.option("useHeader", "false") \
.option("inferSchema", "true") \
.load("NZ_Admin_JOBS.xlsx")

In [5]:
df = df.withColumnRenamed("_c0", "Job_Title") \
    .withColumnRenamed("_c1", "Web_site") \
    .withColumnRenamed("_c2", "Company_name") \
    .withColumnRenamed("_c3", "Location") \
    .withColumnRenamed("_c4", "Date") \
    .withColumnRenamed("_c5", "Classification")

In [8]:
df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           Job_Title|            Web_site|        Company_name|            Location|                Date|      Classification|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       Administrator|https://www.seek....|                    |location: Bay of ...|Featured,at,Priva...|classification: A...|
|        Receptionist|https://www.seek....|Avenues Orthodontics|location: Bay of ...|         Featured,at|classification: A...|
|Prosecutions Supp...|https://www.seek....|  New Zealand Police|location: Aucklan...|           4d ago,at|classification: A...|
|Early Childhood C...|https://www.seek....|Kew Pacific Islan...|location: Southla...|           1h ago,at|classification: A...|
|Business Support ...|https://www.seek....|                    |location: Canterb...|4d ago,at,Private..

### 2. Split the "Location" into "Location" and "Area"

In [93]:
def split_location(location, index):
    if "location: " in location:
        location = location[10:]
    result = location.split("area:")
    if len(result) < 2 and index == 1:
        return None
    else:
        return result[index] 

split_location = f.udf(split_location)
df_location = df.withColumn('loc', split_location(f.col("Location"), f.lit(0))) \
                    .withColumn('area', split_location(f.col("Location"), f.lit(1)))
df_loc = df_location.drop("Web_site").drop("Location").withColumnRenamed("loc","Location")
df_loc.show(5)   

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           Job_Title|        Company_name|                Date|      Classification|            Location|                area|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       Administrator|                    |Featured,at,Priva...|classification: A...|Bay of PlentyBay ...|    TaurangaTauranga|
|        Receptionist|Avenues Orthodontics|         Featured,at|classification: A...|Bay of PlentyBay ...|    TaurangaTauranga|
|Prosecutions Supp...|  New Zealand Police|           4d ago,at|classification: A...|    AucklandAuckland|                null|
|Early Childhood C...|Kew Pacific Islan...|           1h ago,at|classification: A...|  SouthlandSouthland| InvercargillInve...|
|Business Support ...|                    |4d ago,at,Private...|classification: A...|CanterburyCanterbur

### 3. Remove duplication

In [94]:
def remove_duplicate(x):
    if x is not None:
        x = x.strip()
        x_len = len(x)
        return x[:int(x_len/2)]

remove_duplicate = f.udf(remove_duplicate)
df_location_no_dup = df_loc.withColumn('Location', f.when(f.col('Location').isNotNull(),remove_duplicate(f.col('Location')))) \
                                .withColumn('area', f.when(f.col('area').isNotNull(),remove_duplicate(f.col('area'))))

df_location_no_dup.show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+------------+
|           Job_Title|        Company_name|                Date|      Classification|     Location|        area|
+--------------------+--------------------+--------------------+--------------------+-------------+------------+
|       Administrator|                    |Featured,at,Priva...|classification: A...|Bay of Plenty|    Tauranga|
|        Receptionist|Avenues Orthodontics|         Featured,at|classification: A...|Bay of Plenty|    Tauranga|
|Prosecutions Supp...|  New Zealand Police|           4d ago,at|classification: A...|     Auckland|        null|
|Early Childhood C...|Kew Pacific Islan...|           1h ago,at|classification: A...|    Southland|Invercargill|
|Business Support ...|                    |4d ago,at,Private...|classification: A...|   Canterbury|Christchurch|
+--------------------+--------------------+--------------------+--------------------+-----------

### 4. Fix the “posted date”

In [95]:
from pyspark.sql.functions import regexp_extract
df_date_raw = df_location_no_dup.withColumn('Date', regexp_extract(f.col('Date'), '(\d+[a-z])', 1))
df_date_raw.show(5)

+--------------------+--------------------+----+--------------------+-------------+------------+
|           Job_Title|        Company_name|Date|      Classification|     Location|        area|
+--------------------+--------------------+----+--------------------+-------------+------------+
|       Administrator|                    |    |classification: A...|Bay of Plenty|    Tauranga|
|        Receptionist|Avenues Orthodontics|    |classification: A...|Bay of Plenty|    Tauranga|
|Prosecutions Supp...|  New Zealand Police|  4d|classification: A...|     Auckland|        null|
|Early Childhood C...|Kew Pacific Islan...|  1h|classification: A...|    Southland|Invercargill|
|Business Support ...|                    |  4d|classification: A...|   Canterbury|Christchurch|
+--------------------+--------------------+----+--------------------+-------------+------------+
only showing top 5 rows



In [96]:
def apply_posted_time(x):
    if 'd' in x:
        return x[:-1]
    if 'm' in x:
        return str(int(x[:-1])*30)
    else:
        return 0
apply_posted_time = f.udf(apply_posted_time)    
df_date = df_date_raw.withColumn("Date", apply_posted_time(f.col("Date")))

df_date.show(5)

+--------------------+--------------------+----+--------------------+-------------+------------+
|           Job_Title|        Company_name|Date|      Classification|     Location|        area|
+--------------------+--------------------+----+--------------------+-------------+------------+
|       Administrator|                    |   0|classification: A...|Bay of Plenty|    Tauranga|
|        Receptionist|Avenues Orthodontics|   0|classification: A...|Bay of Plenty|    Tauranga|
|Prosecutions Supp...|  New Zealand Police|   4|classification: A...|     Auckland|        null|
|Early Childhood C...|Kew Pacific Islan...|   0|classification: A...|    Southland|Invercargill|
|Business Support ...|                    |   4|classification: A...|   Canterbury|Christchurch|
+--------------------+--------------------+----+--------------------+-------------+------------+
only showing top 5 rows



### 5. Fix the "Company name"

In [97]:
df_company_name = df_date.withColumn("Company_name", f.when(f.col("Company_name") == "", f.lit("Private Advertiser")).otherwise(f.col("Company_name")))
df_company_name.show(5)

+--------------------+--------------------+----+--------------------+-------------+------------+
|           Job_Title|        Company_name|Date|      Classification|     Location|        area|
+--------------------+--------------------+----+--------------------+-------------+------------+
|       Administrator|  Private Advertiser|   0|classification: A...|Bay of Plenty|    Tauranga|
|        Receptionist|Avenues Orthodontics|   0|classification: A...|Bay of Plenty|    Tauranga|
|Prosecutions Supp...|  New Zealand Police|   4|classification: A...|     Auckland|        null|
|Early Childhood C...|Kew Pacific Islan...|   0|classification: A...|    Southland|Invercargill|
|Business Support ...|  Private Advertiser|   4|classification: A...|   Canterbury|Christchurch|
+--------------------+--------------------+----+--------------------+-------------+------------+
only showing top 5 rows



### 6. Fix the "salary"

In [98]:
def get_salary(x):
    if x is not None and any(char.isdigit() for char in x):
        return x
    else:
        return None
get_salary = f.udf(get_salary)

df_salary = df_company_name.withColumn('salary', get_salary(f.col("Classification")))
df_salary.show(15) 

+--------------------+--------------------+----+--------------------+-------------+--------------------+------------+
|           Job_Title|        Company_name|Date|      Classification|     Location|                area|      salary|
+--------------------+--------------------+----+--------------------+-------------+--------------------+------------+
|       Administrator|  Private Advertiser|   0|classification: A...|Bay of Plenty|            Tauranga|        null|
|        Receptionist|Avenues Orthodontics|   0|classification: A...|Bay of Plenty|            Tauranga|        null|
|Prosecutions Supp...|  New Zealand Police|   4|classification: A...|     Auckland|                null|        null|
|Early Childhood C...|Kew Pacific Islan...|   0|classification: A...|    Southland|        Invercargill|        null|
|Business Support ...|  Private Advertiser|   4|classification: A...|   Canterbury|        Christchurch|        null|
|     Support Officer|Ministry for Prim...| 270|classifi

In [99]:
def low_high_salary(x, index):
    # refine the salary using regular expression
    # get the lowest and the highest salary
    
    output = []
    if x != None:
        x = x.replace(',','')
        x = ''.join((ch if ch in '0123456789.' else ' ') for ch in x)
        x = x.replace(' .','').replace(' 0','0')
        listOfNumbers = [float(i) for i in x.split()]
        if len(listOfNumbers) == 1:
            output = [listOfNumbers[0], listOfNumbers[0]]
        elif len(listOfNumbers) > 1:
            output = [listOfNumbers[0], listOfNumbers[1]]
        # if "0k " in x or "5k " in x or "0K " in x or "5K " in x:
        if bool(re.match("\dk", x)):
            output = [i * 1000 for i in output]
        if output and output[0] < 1000 and output[1] < 1000:
            output = [i * 1760 for i in output]
        if output and output[0] < 1000 and output[1] > 1000:
            output[0] = output[0] * 1000
    if not output:
        lo = random.uniform(35000, 55000)
        output = [lo, random.uniform(lo, 55000)]
       
    # listToStr = ' '.join([str(int(i)) for i in output])       
    # return listToStr[index]
    return "%.0f" % output[index]

low_high_salary = f.udf(low_high_salary)
df_low_high_salary = df_salary.withColumn('low_salary', low_high_salary(f.col("salary"), f.lit(0))) \
                    .withColumn('high_salary', low_high_salary(f.col("salary"), f.lit(1)))
df_low_high_salary = df_low_high_salary.drop("salary")
df_low_high_salary.show(15) 

+--------------------+--------------------+----+--------------------+-------------+--------------------+----------+-----------+
|           Job_Title|        Company_name|Date|      Classification|     Location|                area|low_salary|high_salary|
+--------------------+--------------------+----+--------------------+-------------+--------------------+----------+-----------+
|       Administrator|  Private Advertiser|   0|classification: A...|Bay of Plenty|            Tauranga|     37832|      54463|
|        Receptionist|Avenues Orthodontics|   0|classification: A...|Bay of Plenty|            Tauranga|     48893|      47247|
|Prosecutions Supp...|  New Zealand Police|   4|classification: A...|     Auckland|                null|     52253|      43147|
|Early Childhood C...|Kew Pacific Islan...|   0|classification: A...|    Southland|        Invercargill|     35152|      52907|
|Business Support ...|  Private Advertiser|   4|classification: A...|   Canterbury|        Christchurch|

### 7. Fix the classification

In [100]:
def split_classification(x, index):
    if x is not None and "classification: " in x:
        x = x[16:]
        result = x.split("subClassification: ")
        return result[index]
    else:
        return None

split_classification = f.udf(split_classification)
df_classification = df_low_high_salary.withColumn('classification_dup', split_classification(f.col("Classification"), f.lit(0))) \
                    .withColumn('sub-Classification', split_classification(f.col("Classification"), f.lit(1)))
df_classification = df_classification.drop("Classification").withColumnRenamed("classification_dup","Classification")
df_classification.show(5) 

+--------------------+--------------------+----+-------------+------------+----------+-----------+--------------------+--------------------+
|           Job_Title|        Company_name|Date|     Location|        area|low_salary|high_salary|      Classification|  sub-Classification|
+--------------------+--------------------+----+-------------+------------+----------+-----------+--------------------+--------------------+
|       Administrator|  Private Advertiser|   0|Bay of Plenty|    Tauranga|     50463|      48839|Administration & ...|Office Management...|
|        Receptionist|Avenues Orthodontics|   0|Bay of Plenty|    Tauranga|     53946|      43513|Administration & ...|ReceptionistsRece...|
|Prosecutions Supp...|  New Zealand Police|   4|     Auckland|        null|     49858|      48148|Administration & ...|          OtherOther|
|Early Childhood C...|Kew Pacific Islan...|   0|    Southland|Invercargill|     50495|      49457|Administration & ...|Administrative As...|
|Business Sup

In [104]:
df_classification_no_dup = df_classification.withColumn('Classification', f.when(f.col('Classification').isNotNull(),remove_duplicate(f.col('Classification')))) \
                                .withColumn('sub-Classification', f.when(f.col('sub-Classification').isNotNull(),remove_duplicate(f.col('sub-Classification'))))

df_classification_no_dup.show(5, False)

+------------------------------------+----------------------------------------+----+-------------+------------+----------+-----------+-------------------------------+-----------------------------+
|Job_Title                           |Company_name                            |Date|Location     |area        |low_salary|high_salary|Classification                 |sub-Classification           |
+------------------------------------+----------------------------------------+----+-------------+------------+----------+-----------+-------------------------------+-----------------------------+
|Administrator                       |Private Advertiser                      |0   |Bay of Plenty|Tauranga    |45015     |48696      |Administration & Office Support|Office Management            |
|Receptionist                        |Avenues Orthodontics                    |0   |Bay of Plenty|Tauranga    |42589     |51762      |Administration & Office Support|Receptionists                |
|Prosecutions S

In [113]:
df_classification_no_dup.repartition(1) \
   .write.format("com.databricks.spark.csv") \
   .option("header", "true") \
   .save("Spark_cleaned_Admin_JOBS.csv")