In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col
spark = SparkSession.builder.appName('dataPreparation').getOrCreate()

In [2]:
data = spark.read.csv("Datasets/Average Hourly Earnings by Industry ANZSIC06 and Sex Quarterly December 2018.csv",inferSchema=True,header=True)
data.show()

+-----------+-------------------+------+-----------------------------+------------------------+------------------------------------------------+
|YearQuarter|           Industry|Gender|Ordinary Time Hourly Earnings|Overtime Hourly Earnings|Total (Ordinary Time + Overtime) Hourly Earnings|
+-----------+-------------------+------+-----------------------------+------------------------+------------------------------------------------+
|     1989Q1|Forestry and Mining|  Male|                        13.97|                   16.74|                                           14.18|
|     1989Q2|Forestry and Mining|  Male|                        14.07|                   16.44|                                           14.24|
|     1989Q3|Forestry and Mining|  Male|                        14.04|                   16.64|                                           14.23|
|     1989Q4|Forestry and Mining|  Male|                         14.3|                    16.5|                                   

In [3]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-----------+--------+------+-----------------------------+------------------------+------------------------------------------------+
|YearQuarter|Industry|Gender|Ordinary Time Hourly Earnings|Overtime Hourly Earnings|Total (Ordinary Time + Overtime) Hourly Earnings|
+-----------+--------+------+-----------------------------+------------------------+------------------------------------------------+
|          0|       0|     0|                            0|                       0|                                               0|
+-----------+--------+------+-----------------------------+------------------------+------------------------------------------------+



In [4]:
results = data.select('Ordinary Time Hourly Earnings', 'Overtime Hourly Earnings', 'Total (Ordinary Time + Overtime) Hourly Earnings')

In [5]:
bounds = {
    c: dict(
        zip(["q1", "q3"], results.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in results.columns
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

{'Ordinary Time Hourly Earnings': {'q3': 26.93, 'q1': 15.78, 'upper': 43.655, 'lower': -0.9450000000000021}, 'Total (Ordinary Time + Overtime) Hourly Earnings': {'q3': 26.96, 'q1': 15.9, 'upper': 43.55, 'lower': -0.6899999999999995}, 'Overtime Hourly Earnings': {'q3': 28.57, 'q1': 18.6, 'upper': 43.525, 'lower': 3.645000000000003}}


In [6]:
import pyspark.sql.functions as f
results.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in results.columns
    ]
).describe().show()

+-------+-----------------------------+------------------------+------------------------------------------------+---------------------------------+----------------------------+----------------------------------------------------+
|summary|Ordinary Time Hourly Earnings|Overtime Hourly Earnings|Total (Ordinary Time + Overtime) Hourly Earnings|Ordinary Time Hourly Earnings_out|Overtime Hourly Earnings_out|Total (Ordinary Time + Overtime) Hourly Earnings_out|
+-------+-----------------------------+------------------------+------------------------------------------------+---------------------------------+----------------------------+----------------------------------------------------+
|  count|                         6120|                    6120|                                            6120|                             6120|                        6120|                                                6120|
|   mean|           21.840455882352916|      24.210566993464013|                

In [7]:
data1 = spark.read.csv("Datasets/Average Weekly Paid Hours (Employees) by Industry (ANZSIC06) and Sex (Qrtly-MarJunSepDec).csv",inferSchema=True,header=True)
data1.show()

+-----------+-------------------+------+---------------------------------------+----------------------------------+----------------------------------------------------------+
|YearQuarter|           Industry|Gender|Average Weekly Ordinary Time Hours Paid|Average Weekly Overtime Hours Paid|Average Total (Ordinary Time + Overtime) Weekly Hours Paid|
+-----------+-------------------+------+---------------------------------------+----------------------------------+----------------------------------------------------------+
|     1989Q1|Forestry and Mining|  Male|                                  37.74|                              3.15|                                                      40.9|
|     1989Q2|Forestry and Mining|  Male|                                   36.9|                              2.87|                                                     39.77|
|     1989Q3|Forestry and Mining|  Male|                                  37.22|                              2.81|          

In [8]:
data1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data1.columns]).show()

+-----------+--------+------+---------------------------------------+----------------------------------+----------------------------------------------------------+
|YearQuarter|Industry|Gender|Average Weekly Ordinary Time Hours Paid|Average Weekly Overtime Hours Paid|Average Total (Ordinary Time + Overtime) Weekly Hours Paid|
+-----------+--------+------+---------------------------------------+----------------------------------+----------------------------------------------------------+
|          0|       0|     0|                                      0|                                 0|                                                         0|
+-----------+--------+------+---------------------------------------+----------------------------------+----------------------------------------------------------+



In [9]:
results2 = data1.select('Average Weekly Ordinary Time Hours Paid', 'Average Weekly Overtime Hours Paid', 'Average Total (Ordinary Time + Overtime) Weekly Hours Paid')

In [10]:
bounds = {
    c: dict(
        zip(["q1", "q3"], results2.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in results2.columns
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

{'Average Weekly Overtime Hours Paid': {'q3': 1.15, 'q1': 0.27, 'upper': 2.4699999999999998, 'lower': -1.0499999999999998}, 'Average Weekly Ordinary Time Hours Paid': {'q3': 35.85, 'q1': 29.34, 'upper': 45.615, 'lower': 19.574999999999996}, 'Average Total (Ordinary Time + Overtime) Weekly Hours Paid': {'q3': 36.89, 'q1': 29.59, 'upper': 47.84, 'lower': 18.64}}


In [11]:
import pyspark.sql.functions as f
results2.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in results2.columns
    ]
).describe().show()

+-------+---------------------------------------+----------------------------------+----------------------------------------------------------+-------------------------------------------+--------------------------------------+--------------------------------------------------------------+
|summary|Average Weekly Ordinary Time Hours Paid|Average Weekly Overtime Hours Paid|Average Total (Ordinary Time + Overtime) Weekly Hours Paid|Average Weekly Ordinary Time Hours Paid_out|Average Weekly Overtime Hours Paid_out|Average Total (Ordinary Time + Overtime) Weekly Hours Paid_out|
+-------+---------------------------------------+----------------------------------+----------------------------------------------------------+-------------------------------------------+--------------------------------------+--------------------------------------------------------------+
|  count|                                   6120|                              6120|                                              

In [12]:
data2 = spark.read.csv("Datasets/Filled Jobs by Industry (ANZSIC06) by Sex and Status in Employment (Qrtly-MarJunSepDec).csv",inferSchema=True,header=True)
data2.show()

+-----------+-------------------+------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+
|YearQuarter|           Industry|Gender|Number of Working Proprietors|Number of Part-Time Paid Employees|Number of Full-Time Paid Employees|Total Number of People in Employment|
+-----------+-------------------+------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+
|     1989Q1|Forestry and Mining|  Male|                          800|                               800|                              9000|                               10700|
|     1989Q2|Forestry and Mining|  Male|                          800|                               900|                              8100|                                9800|
|     1989Q3|Forestry and Mining|  Male|                          800|                               800|     

In [13]:
data2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data2.columns]).show()

+-----------+--------+------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+
|YearQuarter|Industry|Gender|Number of Working Proprietors|Number of Part-Time Paid Employees|Number of Full-Time Paid Employees|Total Number of People in Employment|
+-----------+--------+------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+
|          0|       0|     0|                            0|                                 0|                                 0|                                   0|
+-----------+--------+------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+



In [14]:
results3 = data2.select('Number of Working Proprietors', 'Number of Part-Time Paid Employees', 'Number of Full-Time Paid Employees', 'Total Number of People in Employment')

In [15]:
bounds = {
    c: dict(
        zip(["q1", "q3"], results3.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in results3.columns
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

{'Number of Working Proprietors': {'q3': 9200.0, 'q1': 700.0, 'upper': 21950.0, 'lower': -12050.0}, 'Total Number of People in Employment': {'q3': 104200.0, 'q1': 22700.0, 'upper': 226450.0, 'lower': -99550.0}, 'Number of Part-Time Paid Employees': {'q3': 28500.0, 'q1': 4000.0, 'upper': 65250.0, 'lower': -32750.0}, 'Number of Full-Time Paid Employees': {'q3': 62300.0, 'q1': 14100.0, 'upper': 134600.0, 'lower': -58200.0}}


In [16]:
import pyspark.sql.functions as f
results3.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in results3.columns
    ]
).describe().show()

+-------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+---------------------------------+--------------------------------------+--------------------------------------+----------------------------------------+
|summary|Number of Working Proprietors|Number of Part-Time Paid Employees|Number of Full-Time Paid Employees|Total Number of People in Employment|Number of Working Proprietors_out|Number of Part-Time Paid Employees_out|Number of Full-Time Paid Employees_out|Total Number of People in Employment_out|
+-------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+---------------------------------+--------------------------------------+--------------------------------------+----------------------------------------+
|  count|                         6120|                              6120|                          

In [17]:
data3 = spark.read.csv("Datasets/Full-Time Equivalent Employees by Industry (ANZSIC06) and Sex (Qrtly-MarJunSepDec).csv",inferSchema=True,header=True)
data3.show()

+-----------+-------------------+------+----------------------------------------+
|YearQuarter|           Industry|Gender|Number of Full-Time Equivalent Employees|
+-----------+-------------------+------+----------------------------------------+
|     1989Q1|Forestry and Mining|  Male|                                    9400|
|     1989Q2|Forestry and Mining|  Male|                                    8600|
|     1989Q3|Forestry and Mining|  Male|                                    8300|
|     1989Q4|Forestry and Mining|  Male|                                    7900|
|     1990Q1|Forestry and Mining|  Male|                                    8100|
|     1990Q2|Forestry and Mining|  Male|                                    7500|
|     1990Q3|Forestry and Mining|  Male|                                    8100|
|     1990Q4|Forestry and Mining|  Male|                                    7800|
|     1991Q1|Forestry and Mining|  Male|                                    8500|
|     1991Q2|For

In [18]:
data3.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data3.columns]).show()

+-----------+--------+------+----------------------------------------+
|YearQuarter|Industry|Gender|Number of Full-Time Equivalent Employees|
+-----------+--------+------+----------------------------------------+
|          0|       0|     0|                                       0|
+-----------+--------+------+----------------------------------------+



In [19]:
results4 = data3.select('Number of Full-Time Equivalent Employees')

In [20]:
bounds = {
    c: dict(
        zip(["q1", "q3"], results4.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in results4.columns
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

{'Number of Full-Time Equivalent Employees': {'q3': 78900.0, 'q1': 17500.0, 'upper': 171000.0, 'lower': -74600.0}}


In [21]:
import pyspark.sql.functions as f
results4.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in results4.columns
    ]
).describe().show()

+-------+----------------------------------------+--------------------------------------------+
|summary|Number of Full-Time Equivalent Employees|Number of Full-Time Equivalent Employees_out|
+-------+----------------------------------------+--------------------------------------------+
|  count|                                    6120|                                        6120|
|   mean|                       94783.25163398693|                         0.08349673202614379|
| stddev|                       198506.0713472215|                         0.27665417744919085|
|    min|                                     700|                                           0|
|    max|                                 1592800|                                           1|
+-------+----------------------------------------+--------------------------------------------+



In [22]:
data4 = spark.read.csv("Datasets/Total Gross Earnings by Industry (ANZSIC06) (Qrtly-MarJunSepDec).csv",inferSchema=True,header=True)
data4.show()

+-----------+-------------------+--------------------+
|YearQuarter|           Industry|Total Gross Earnings|
+-----------+-------------------+--------------------+
|     1989Q1|Forestry and Mining|             6273000|
|     1989Q2|Forestry and Mining|             5601500|
|     1989Q3|Forestry and Mining|             5417300|
|     1989Q4|Forestry and Mining|             5228800|
|     1990Q1|Forestry and Mining|             5234500|
|     1990Q2|Forestry and Mining|             5004300|
|     1990Q3|Forestry and Mining|             5385500|
|     1990Q4|Forestry and Mining|             5388800|
|     1991Q1|Forestry and Mining|             5769000|
|     1991Q2|Forestry and Mining|             5560500|
|     1991Q3|Forestry and Mining|             5727100|
|     1991Q4|Forestry and Mining|             5879700|
|     1992Q1|Forestry and Mining|             6338500|
|     1992Q2|Forestry and Mining|             5993000|
|     1992Q3|Forestry and Mining|             5995200|
|     1992

In [23]:
data4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data4.columns]).show()

+-----------+--------+--------------------+
|YearQuarter|Industry|Total Gross Earnings|
+-----------+--------+--------------------+
|          0|       0|                   0|
+-----------+--------+--------------------+



In [24]:
results5 = data4.select('Total Gross Earnings')

In [25]:
bounds = {
    c: dict(
        zip(["q1", "q3"], results5.approxQuantile(c, [0.25, 0.75], 0))
    )
    for c in results5.columns
}

for c in bounds:
    iqr = bounds[c]['q3'] - bounds[c]['q1']
    bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
    bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
print(bounds)

{'Total Gross Earnings': {'q3': 101427500.0, 'q1': 27123500.0, 'upper': 212883500.0, 'lower': -84332500.0}}


In [26]:
import pyspark.sql.functions as f
results5.select(
    "*",
    *[
        f.when(
            f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
            0
        ).otherwise(1).alias(c+"_out") 
        for c in results5.columns
    ]
).describe().show()

+-------+--------------------+------------------------+
|summary|Total Gross Earnings|Total Gross Earnings_out|
+-------+--------------------+------------------------+
|  count|                2040|                    2040|
|   mean|1.2021931794117647E8|     0.07549019607843137|
| stddev| 2.537836196783933E8|      0.2642454439277016|
|    min|             4721200|                       0|
|    max|          1955658900|                       1|
+-------+--------------------+------------------------+



In [27]:
data = data.filter((data.Gender != 'Both Sexes'))
data1 = data1.filter((data1.Gender != 'Both Sexes'))
data2 = data2.filter((data2.Gender != 'Both Sexes'))
data3 = data3.filter((data3.Gender != 'Both Sexes'))

In [28]:
data.createOrReplaceTempView('data') 
data1.createOrReplaceTempView('data1') 
data2.createOrReplaceTempView('data2') 
data3.createOrReplaceTempView('data3') 
data4.createOrReplaceTempView('data4') 

In [29]:
combined4 = data.join(data1, ['YearQuarter', 'Industry', 'Gender'])
combined3 = combined4.join(data2, ['YearQuarter', 'Industry', 'Gender'])
combined2 = combined3.join(data3, ['YearQuarter', 'Industry', 'Gender'])
combined1 = combined2.join(data4, ['YearQuarter', 'Industry'])
combined1.show()

+-----------+-------------------+------+-----------------------------+------------------------+------------------------------------------------+---------------------------------------+----------------------------------+----------------------------------------------------------+-----------------------------+----------------------------------+----------------------------------+------------------------------------+----------------------------------------+--------------------+
|YearQuarter|           Industry|Gender|Ordinary Time Hourly Earnings|Overtime Hourly Earnings|Total (Ordinary Time + Overtime) Hourly Earnings|Average Weekly Ordinary Time Hours Paid|Average Weekly Overtime Hours Paid|Average Total (Ordinary Time + Overtime) Weekly Hours Paid|Number of Working Proprietors|Number of Part-Time Paid Employees|Number of Full-Time Paid Employees|Total Number of People in Employment|Number of Full-Time Equivalent Employees|Total Gross Earnings|
+-----------+-------------------+------+----

In [30]:
combined = combined1.toPandas()

In [31]:
combined0 = combined.pivot_table(index=['YearQuarter', 'Industry'], columns='Gender')

In [32]:
df = spark.createDataFrame(combined0.reset_index(drop=False))
df.show()
df.dtypes

+-------------------+--------------------+------------------------------------------------------------------------+----------------------------------------------------------------------+-----------------------------------------------------+---------------------------------------------------+------------------------------------------------+----------------------------------------------+------------------------------------------------------+----------------------------------------------------+------------------------------------------------+----------------------------------------------+------------------------------------------------+----------------------------------------------+-------------------------------------------+-----------------------------------------+-------------------------------------------+-----------------------------------------+--------------------------------------+------------------------------------+--------------------------------------------------------------+-

[("('YearQuarter', '')", 'string'),
 ("('Industry', '')", 'string'),
 ("('Average Total (Ordinary Time + Overtime) Weekly Hours Paid', 'Female')",
  'double'),
 ("('Average Total (Ordinary Time + Overtime) Weekly Hours Paid', 'Male')",
  'double'),
 ("('Average Weekly Ordinary Time Hours Paid', 'Female')", 'double'),
 ("('Average Weekly Ordinary Time Hours Paid', 'Male')", 'double'),
 ("('Average Weekly Overtime Hours Paid', 'Female')", 'double'),
 ("('Average Weekly Overtime Hours Paid', 'Male')", 'double'),
 ("('Number of Full-Time Equivalent Employees', 'Female')", 'bigint'),
 ("('Number of Full-Time Equivalent Employees', 'Male')", 'bigint'),
 ("('Number of Full-Time Paid Employees', 'Female')", 'bigint'),
 ("('Number of Full-Time Paid Employees', 'Male')", 'bigint'),
 ("('Number of Part-Time Paid Employees', 'Female')", 'bigint'),
 ("('Number of Part-Time Paid Employees', 'Male')", 'bigint'),
 ("('Number of Working Proprietors', 'Female')", 'bigint'),
 ("('Number of Working Propri

In [33]:
import re

def clean_df_names(df):
    l = df.columns
    cols = [c.replace(' ','').replace('(','').replace(')','').replace(',', '').replace("'", '').replace("+", '').strip() for c in l]
    return df.toDF(*cols)

clean_df = clean_df_names(df)

clean_df.show()

+-----------+--------------------+-----------------------------------------------------+---------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------------+----------------------------------+------------------------------------------+----------------------------------------+------------------------------------+----------------------------------+------------------------------------+----------------------------------+--------------------------------+------------------------------+--------------------------------+------------------------------+----------------------------+--------------------------+---------------------------------------------+-------------------------------------------+------------------------+----------------------+-------------------------------------+-----------------------------------+
|YearQuarter|            Industry|AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidFe

In [34]:
clean_df.dtypes

[('YearQuarter', 'string'),
 ('Industry', 'string'),
 ('AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidFemale', 'double'),
 ('AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidMale', 'double'),
 ('AverageWeeklyOrdinaryTimeHoursPaidFemale', 'double'),
 ('AverageWeeklyOrdinaryTimeHoursPaidMale', 'double'),
 ('AverageWeeklyOvertimeHoursPaidFemale', 'double'),
 ('AverageWeeklyOvertimeHoursPaidMale', 'double'),
 ('NumberofFull-TimeEquivalentEmployeesFemale', 'bigint'),
 ('NumberofFull-TimeEquivalentEmployeesMale', 'bigint'),
 ('NumberofFull-TimePaidEmployeesFemale', 'bigint'),
 ('NumberofFull-TimePaidEmployeesMale', 'bigint'),
 ('NumberofPart-TimePaidEmployeesFemale', 'bigint'),
 ('NumberofPart-TimePaidEmployeesMale', 'bigint'),
 ('NumberofWorkingProprietorsFemale', 'bigint'),
 ('NumberofWorkingProprietorsMale', 'bigint'),
 ('OrdinaryTimeHourlyEarningsFemale', 'double'),
 ('OrdinaryTimeHourlyEarningsMale', 'double'),
 ('OvertimeHourlyEarningsFemale', 'double'),
 ('OvertimeHourlyEarningsMale', 

In [35]:
new_df = clean_df.withColumn("AverageHourlyPayDifference" , clean_df['TotalOrdinaryTimeOvertimeHourlyEarningsMale'] - clean_df['TotalOrdinaryTimeOvertimeHourlyEarningsFemale'])
new_df.show()

+-----------+--------------------+-----------------------------------------------------+---------------------------------------------------+----------------------------------------+--------------------------------------+------------------------------------+----------------------------------+------------------------------------------+----------------------------------------+------------------------------------+----------------------------------+------------------------------------+----------------------------------+--------------------------------+------------------------------+--------------------------------+------------------------------+----------------------------+--------------------------+---------------------------------------------+-------------------------------------------+------------------------+----------------------+-------------------------------------+-----------------------------------+--------------------------+
|YearQuarter|            Industry|AverageTotalOrdinaryTi

In [36]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [37]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=new_df['AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidFemale','AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidMale','AverageWeeklyOrdinaryTimeHoursPaidFemale','AverageWeeklyOrdinaryTimeHoursPaidMale','AverageWeeklyOvertimeHoursPaidFemale','AverageWeeklyOvertimeHoursPaidMale','NumberofFull-TimeEquivalentEmployeesFemale','NumberofFull-TimeEquivalentEmployeesMale','NumberofFull-TimePaidEmployeesFemale','NumberofFull-TimePaidEmployeesMale','NumberofPart-TimePaidEmployeesFemale','NumberofPart-TimePaidEmployeesMale','NumberofWorkingProprietorsFemale','NumberofWorkingProprietorsMale','OrdinaryTimeHourlyEarningsFemale','OrdinaryTimeHourlyEarningsMale','OvertimeHourlyEarningsFemale','OvertimeHourlyEarningsMale','TotalOrdinaryTimeOvertimeHourlyEarningsFemale','TotalOrdinaryTimeOvertimeHourlyEarningsMale','TotalGrossEarningsFemale','TotalGrossEarningsMale','TotalNumberofPeopleinEmploymentFemale','TotalNumberofPeopleinEmploymentMale'],outputCol='features')
output = assembler.transform(data)

TypeError: Invalid param value given for param "inputCols". Could not convert DataFrame[AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidFemale: double, AverageTotalOrdinaryTimeOvertimeWeeklyHoursPaidMale: double, AverageWeeklyOrdinaryTimeHoursPaidFemale: double, AverageWeeklyOrdinaryTimeHoursPaidMale: double, AverageWeeklyOvertimeHoursPaidFemale: double, AverageWeeklyOvertimeHoursPaidMale: double, NumberofFull-TimeEquivalentEmployeesFemale: bigint, NumberofFull-TimeEquivalentEmployeesMale: bigint, NumberofFull-TimePaidEmployeesFemale: bigint, NumberofFull-TimePaidEmployeesMale: bigint, NumberofPart-TimePaidEmployeesFemale: bigint, NumberofPart-TimePaidEmployeesMale: bigint, NumberofWorkingProprietorsFemale: bigint, NumberofWorkingProprietorsMale: bigint, OrdinaryTimeHourlyEarningsFemale: double, OrdinaryTimeHourlyEarningsMale: double, OvertimeHourlyEarningsFemale: double, OvertimeHourlyEarningsMale: double, TotalOrdinaryTimeOvertimeHourlyEarningsFemale: double, TotalOrdinaryTimeOvertimeHourlyEarningsMale: double, TotalGrossEarningsFemale: bigint, TotalGrossEarningsMale: bigint, TotalNumberofPeopleinEmploymentFemale: bigint, TotalNumberofPeopleinEmploymentMale: bigint] to list of strings

In [None]:
final_data = output.select("features",'AverageHourlyPayDifference')

In [None]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

dtc = DecisionTreeClassifier(labelCol='AverageHourlyPayDifference',featuresCol='features')
rfc = RandomForestClassifier(labelCol='AverageHourlyPayDifference',featuresCol='features')
gbt = GBTClassifier(labelCol='AverageHourlyPayDifference',featuresCol='features')

In [None]:
pipeline = Pipeline(stages=[industry_indexer,
                           assembler,lr])

In [None]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [None]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_binary_eval = BinaryClassificationEvaluator(labelCol = 'AverageHourlyPayDifference')

In [None]:
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))

print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='AverageHourlyPayDifference', rawPredictionCol='prediction')
print("GBT")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="PrivateIndex", predictionCol="prediction", metricName="accuracy")

In [None]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

In [None]:
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))