In [1]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('interation4').getOrCreate()

In [26]:
# Importing data which has a header. Schema is automatically configured.
dataset = spark.read.csv('usa-stackoverflow-iteration4.csv', header=True, inferSchema=True)



In [27]:
# Let's see the data. You'll notice nulls.
dataset.show(10)

+----------+-----+----------+-------+------------------+---------------+--------------+-----------+-----------+---------------+---------------+---------------+--------------+-----------+--------+-------------+------------+---------+--------+---+
|Respondent|Hobby|OpenSource|Student|        Employment|FormalEducation|UndergradMajor|CompanySize|YearsCoding|YearsCodingProf|JobSatisfaction|ConvertedSalary|NumberMonitors|CheckInCode|WakeTime|HoursComputer|HoursOutside|SkipMeals|Exercise|Age|
+----------+-----+----------+-------+------------------+---------------+--------------+-----------+-----------+---------------+---------------+---------------+--------------+-----------+--------+-------------+------------+---------+--------+---+
|         1|  Yes|       Yes|     No|Employed full-time|             Nd|            Cd|          8|          4|              1|              1|         120000|             2|          5|       6|            4|           0|        1|       0|  2|
|         2|  Ye

In [3]:
dataset.columns

['Respondent',
 'Hobby',
 'OpenSource',
 'Student',
 'Employment',
 'FormalEducation',
 'UndergradMajor',
 'CompanySize',
 'YearsCoding',
 'YearsCodingProf',
 'JobSatisfaction',
 'ConvertedSalary',
 'NumberMonitors',
 'CheckInCode',
 'WakeTime',
 'HoursComputer',
 'HoursOutside',
 'SkipMeals',
 'Exercise',
 'Age']

In [4]:
dataset.describe().show()

+-------+------------------+-----+----------+--------------+------------------+---------------+--------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|        Respondent|Hobby|OpenSource|       Student|        Employment|FormalEducation|UndergradMajor|       CompanySize|       YearsCoding|   YearsCodingProf|   JobSatisfaction|  ConvertedSalary|    NumberMonitors|       CheckInCode|          WakeTime|     HoursComputer|      HoursOutside|         SkipMeals|          Exercise|               Age|
+-------+------------------+-----+----------+--------------+------------------+---------------+--------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------

In [5]:
dataset.printSchema()

root
 |-- Respondent: integer (nullable = true)
 |-- Hobby: string (nullable = true)
 |-- OpenSource: string (nullable = true)
 |-- Student: string (nullable = true)
 |-- Employment: string (nullable = true)
 |-- FormalEducation: string (nullable = true)
 |-- UndergradMajor: string (nullable = true)
 |-- CompanySize: integer (nullable = true)
 |-- YearsCoding: integer (nullable = true)
 |-- YearsCodingProf: integer (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- ConvertedSalary: decimal(7,0) (nullable = true)
 |-- NumberMonitors: integer (nullable = true)
 |-- CheckInCode: integer (nullable = true)
 |-- WakeTime: integer (nullable = true)
 |-- HoursComputer: integer (nullable = true)
 |-- HoursOutside: integer (nullable = true)
 |-- SkipMeals: integer (nullable = true)
 |-- Exercise: integer (nullable = true)
 |-- Age: integer (nullable = true)



In [6]:
dataset.count()

13832

In [7]:
dataset = dataset[dataset["Employment"]=="Employed full-time"]

In [8]:
dataset.count()

11995

In [9]:
dataset = dataset[dataset["ConvertedSalary"].isNotNull()]

In [10]:
dataset.count()

11482

In [36]:
from pyspark.sql.functions import isnan
dataset.filter((dataset["ConvertedSalary"] == "") | dataset["ConvertedSalary"].isNull() | isnan(dataset["ConvertedSalary"])).count()

919

In [11]:
dataset=dataset.na.drop()

In [12]:
dataset.count()

9959

In [13]:
bounds = dataset.approxQuantile("ConvertedSalary", [0.25, 0.75], 0)
IQR = bounds[1]-bounds[0]
lowerRange = bounds[0] - 1.5*IQR
upperRange = bounds[1]+ 1.5*IQR
dataset = dataset.filter((dataset['ConvertedSalary'] >= lowerRange) & (dataset['ConvertedSalary'] <= upperRange))

In [14]:
dataset.count()

9391

In [15]:
dataset.approxQuantile("ConvertedSalary", [0.5], 0)

[100000.0]

In [16]:
from pyspark.sql import functions as f
dataset = dataset.withColumn('NewSalaryLevel', f.when(f.col('ConvertedSalary') < 100000, 0).otherwise(1))

In [17]:
columns_to_drop = ['Student', 'Employment','ConvertedSalary','Respondent','Hobby','OpenSource']
dataset = dataset.drop(*columns_to_drop)

In [18]:
dataset.show(10)

+---------------+--------------+-----------+-----------+---------------+---------------+--------------+-----------+--------+-------------+------------+---------+--------+---+--------------+
|FormalEducation|UndergradMajor|CompanySize|YearsCoding|YearsCodingProf|JobSatisfaction|NumberMonitors|CheckInCode|WakeTime|HoursComputer|HoursOutside|SkipMeals|Exercise|Age|NewSalaryLevel|
+---------------+--------------+-----------+-----------+---------------+---------------+--------------+-----------+--------+-------------+------------+---------+--------+---+--------------+
|             Nd|            Cd|          8|          4|              1|              1|             2|          5|       6|            4|           0|        1|       0|  2|             1|
|             Bd|            We|          5|          3|              1|             -3|             2|          1|       2|            2|           1|        0|       3|  2|             0|
|             Bd|            Cd|          6|      

In [19]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)
# First create a string indexer (convert every string into a number, such as male = 0 and female = 1).
# A number will be assigned to every category in the column.
FormalEducation_indexer = StringIndexer(inputCol='FormalEducation',outputCol='FormalEducationIndex')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# This makes it easier to process when you have multiple classes.
FormalEducation_encoder = OneHotEncoder(inputCol='FormalEducationIndex',outputCol='FormalEducationVec')

UndergradMajor_indexer = StringIndexer(inputCol='UndergradMajor',outputCol='UndergradMajorIndex')
UndergradMajor__encoder = OneHotEncoder(inputCol='UndergradMajorIndex',outputCol='UndergradMajorVec')

# assemble all of this as one vector in the features column. 
assembler = VectorAssembler(inputCols=[
 'FormalEducationVec',
 'UndergradMajorVec',
 'CompanySize',
 'YearsCoding',
 'YearsCodingProf',
 'JobSatisfaction',
 'NumberMonitors',
 'CheckInCode',
 'WakeTime',
 'HoursComputer',
 'HoursOutside',
 'SkipMeals',
 'Exercise',
 'Age'],outputCol='features')


# Train/test split. 
trainset, testset= dataset.randomSplit([0.8,0.2])

In [20]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [21]:
# Logistic Regression Model
# Note that survived is a categorial variable but didn't require any transformation.
# That's because it's already in the format of 1's and 0's. 
log_reg = LogisticRegression(featuresCol='features',labelCol='NewSalaryLevel')


# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline_log = Pipeline(stages=[FormalEducation_indexer,UndergradMajor_indexer,
                           FormalEducation_encoder,UndergradMajor__encoder,
                           assembler,log_reg])

# Note pipeline. Call it as you would call a machine learning object.
fit_model = pipeline_log.fit(trainset)

# Transform test data. 
predictions_log_reg = fit_model.transform(testset)

# Evaluate the model using the binary classifer.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval= BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='NewSalaryLevel')


# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
predictions_log_reg.select('NewSalaryLevel','prediction').show()


# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
print('Test Area Under ROC', my_eval.evaluate(predictions_log_reg ))

+--------------+----------+
|NewSalaryLevel|prediction|
+--------------+----------+
|             0|       0.0|
|             0|       1.0|
|             1|       1.0|
|             1|       0.0|
|             0|       0.0|
|             1|       1.0|
|             1|       1.0|
|             0|       0.0|
|             1|       0.0|
|             0|       0.0|
|             0|       0.0|
|             1|       0.0|
|             0|       0.0|
|             1|       1.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             1|       1.0|
|             1|       1.0|
+--------------+----------+
only showing top 20 rows

Test Area Under ROC 0.7112227680671661


In [22]:
# Decision Tree Model

from pyspark.ml.classification import DecisionTreeClassifier
decision_tree = DecisionTreeClassifier(featuresCol='features',labelCol='NewSalaryLevel')


# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline_dec = Pipeline(stages=[FormalEducation_indexer,UndergradMajor_indexer,
                           FormalEducation_encoder,UndergradMajor__encoder,
                           assembler,decision_tree])

# Note pipeline. Call it as you would call a machine learning object.
fit_model_dec = pipeline_dec.fit(trainset)

# Transform test data. 
predictions_decision_tree = fit_model_dec.transform(testset)


# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
predictions_decision_tree.select('NewSalaryLevel','prediction').show()


# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
print('Test Area Under ROC', my_eval.evaluate(predictions_decision_tree ))

+--------------+----------+
|NewSalaryLevel|prediction|
+--------------+----------+
|             0|       0.0|
|             0|       1.0|
|             1|       0.0|
|             1|       0.0|
|             0|       0.0|
|             1|       0.0|
|             1|       1.0|
|             0|       0.0|
|             1|       0.0|
|             0|       1.0|
|             0|       0.0|
|             1|       1.0|
|             0|       1.0|
|             1|       0.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             1|       0.0|
|             1|       0.0|
+--------------+----------+
only showing top 20 rows

Test Area Under ROC 0.6910826728911085


In [23]:
# Random Forest Model
 
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol='features',labelCol='NewSalaryLevel')

# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline_rf = Pipeline(stages=[FormalEducation_indexer,UndergradMajor_indexer,
                           FormalEducation_encoder,UndergradMajor__encoder,
                           assembler,rf])

# Note pipeline. Call it as you would call a machine learning object.
fit_model_rf = pipeline_rf.fit(trainset)

# Transform test data. 
predictions_rf = fit_model_rf.transform(testset)


# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
predictions_rf.select('NewSalaryLevel','prediction').show()


# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
print('Test Area Under ROC', my_eval.evaluate(predictions_rf ))

+--------------+----------+
|NewSalaryLevel|prediction|
+--------------+----------+
|             0|       0.0|
|             0|       1.0|
|             1|       1.0|
|             1|       0.0|
|             0|       0.0|
|             1|       1.0|
|             1|       1.0|
|             0|       0.0|
|             1|       0.0|
|             0|       1.0|
|             0|       0.0|
|             1|       1.0|
|             0|       1.0|
|             1|       1.0|
|             0|       1.0|
|             0|       0.0|
|             0|       0.0|
|             0|       1.0|
|             1|       1.0|
|             1|       1.0|
+--------------+----------+
only showing top 20 rows

Test Area Under ROC 0.7119635037817102


In [24]:
# Gradient-boosted tree model
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol='features',labelCol='NewSalaryLevel')

# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline_gbt = Pipeline(stages=[FormalEducation_indexer,UndergradMajor_indexer,
                           FormalEducation_encoder,UndergradMajor__encoder,
                           assembler,gbt])

# Note pipeline. Call it as you would call a machine learning object.
fit_model_gbt = pipeline_gbt.fit(trainset)

# Transform test data. 
predictions_gbt = fit_model_gbt.transform(testset)


# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
predictions_gbt.select('NewSalaryLevel','prediction').show()


# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
print('Test Area Under ROC', my_eval.evaluate(predictions_gbt ))

+--------------+----------+
|NewSalaryLevel|prediction|
+--------------+----------+
|             0|       0.0|
|             0|       1.0|
|             1|       0.0|
|             1|       1.0|
|             0|       0.0|
|             1|       0.0|
|             1|       1.0|
|             0|       0.0|
|             1|       0.0|
|             0|       1.0|
|             0|       0.0|
|             1|       0.0|
|             0|       1.0|
|             1|       0.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             0|       0.0|
|             1|       0.0|
|             1|       0.0|
+--------------+----------+
only showing top 20 rows

Test Area Under ROC 0.7117282777838093
