In [1]:
## Set Python - Spark environment.
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
import pyspark
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [4]:
## Create  SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkConf
spark = SparkSession.builder\
        .appName("SparkML_Classification-Students")\
        .master('local[*]')\
        .enableHiveSupport()\
        .config('spark.sql.warehouse.dir','hdfs://bigdata:8020/user/2019B42/spark-warehouse')\
        .getOrCreate()

In [5]:
# importing libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

Question 1: Create a dataframe for above csv data.
(First line in the dataset is header, comma is a part of data in few fields, fields are escaped with double-quote(“)).

In [6]:
## Read data and create a dataframe
dataDF = spark.read.csv(header=True,nullValue="NA",escape='"',quote='"',inferSchema=True,
                         ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,                                                  
                         path="file:///home/2019B42/PartB/cute_dataset_final1.csv") 

In [16]:
dataDF.printSchema()

root
 |-- ROW_ID: integer (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: double (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- CASE_STATUS: string (nullable = true)



Question 2: Verify summary of the dataframe (how many rows and columns).

In [14]:
print("No. of Columns = {}".format(len(dataDF.columns)))
print('No. of Rows = {}'.format(dataDF.count()))

No. of Columns = 11
No. of Rows = 179734


Question 3: Derive the summary statistics

In [13]:
dataDF.describe().show()

+-------+-----------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+-----------+
|summary|           ROW_ID|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|   PREVAILING_WAGE|              YEAR|            WORKSITE|               lon|              lat|CASE_STATUS|
+-------+-----------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+------------------+-----------------+-----------+
|  count|           179734|              179729|              178709|              179732|            179734|            179727|            179734|              179734|            173360|           173360|     179734|
|   mean|1502112.807610135|                null|                null|   55557.88888888889|              null|138044.84623122835|

Question 4: Find the count of distinct values in each column.

In [24]:
## find distinct values of columns
print("Distinct values in ROW_ID in the dataset are {}".format(dataDF.select('ROW_ID').distinct().count()))
print("Distinct values in EMPLOYER_NAME in the dataset are {}".format(dataDF.select('EMPLOYER_NAME').distinct().count()))
print("Distinct values in SOC_NAME in the dataset are {}".format(dataDF.select('SOC_NAME').distinct().count()))
print("Distinct values in JOB_TITLE in the dataset are {}".format(dataDF.select('JOB_TITLE').distinct().count()))
print("Distinct values in FULL_TIME_POSITION in the dataset are {}".format(dataDF.select('FULL_TIME_POSITION').distinct().count()))
print("Distinct values in PREVAILING_WAGE in the dataset are {}".format(dataDF.select('PREVAILING_WAGE').distinct().count()))
print("Distinct values in YEAR in the dataset are {}".format(dataDF.select('YEAR').distinct().count()))
print("Distinct values in WORKSITE in the dataset are {}".format(dataDF.select('WORKSITE').distinct().count()))
print("Distinct values in lon in the dataset are {}".format(dataDF.select('lon').distinct().count()))
print("Distinct values in lat in the dataset are {}".format(dataDF.select('lat').distinct().count()))
print("Distinct values in CASE_STATUS in the dataset are {}".format(dataDF.select('CASE_STATUS').distinct().count()))

Distinct values in ROW_ID in the dataset are 179734
Distinct values in EMPLOYER_NAME in the dataset are 45388
Distinct values in SOC_NAME in the dataset are 1156
Distinct values in JOB_TITLE in the dataset are 38452
Distinct values in FULL_TIME_POSITION in the dataset are 2
Distinct values in PREVAILING_WAGE in the dataset are 17581
Distinct values in YEAR in the dataset are 6
Distinct values in WORKSITE in the dataset are 6522
Distinct values in lon in the dataset are 2393
Distinct values in lat in the dataset are 2395
Distinct values in CASE_STATUS in the dataset are 4


Question 5: List EMPLOYER_NAME and YEAR in the descending order of the Approved applications count (Approved applications are obtained using CASE_STATUS = 'CERTIFIED').

In [63]:
dataDF.where("CASE_STATUS = 'CERTIFIED'").groupBy('EMPLOYER_NAME','YEAR').agg(
    count("*").alias("approvedcount")).orderBy(desc("approvedcount")).show()
data_DF_Approved = dataDF.where("CASE_STATUS = 'CERTIFIED'").groupBy('EMPLOYER_NAME','YEAR').agg(
    count("*").alias("approvedcount")).orderBy(desc("approvedcount"))

+--------------------+----+-------------+
|       EMPLOYER_NAME|YEAR|approvedcount|
+--------------------+----+-------------+
|     INFOSYS LIMITED|2015|         2015|
|     INFOSYS LIMITED|2013|         1921|
|     INFOSYS LIMITED|2016|         1560|
|     INFOSYS LIMITED|2014|         1411|
|TATA CONSULTANCY ...|2015|         1010|
|     INFOSYS LIMITED|2012|          957|
|CAPGEMINI AMERICA...|2016|          898|
|TATA CONSULTANCY ...|2014|          873|
|TATA CONSULTANCY ...|2016|          757|
|       WIPRO LIMITED|2015|          588|
|       WIPRO LIMITED|2016|          560|
|       ACCENTURE LLP|2015|          549|
|       ACCENTURE LLP|2016|          549|
|IBM INDIA PRIVATE...|2015|          548|
|TATA CONSULTANCY ...|2013|          511|
|       WIPRO LIMITED|2014|          494|
|IBM INDIA PRIVATE...|2016|          454|
|DELOITTE CONSULTI...|2015|          451|
|DELOITTE CONSULTI...|2016|          431|
|DELOITTE CONSULTI...|2014|          395|
+--------------------+----+-------

In [64]:
data_DF_Approved.show(2)

+---------------+----+-------------+
|  EMPLOYER_NAME|YEAR|approvedcount|
+---------------+----+-------------+
|INFOSYS LIMITED|2015|         2015|
|INFOSYS LIMITED|2013|         1921|
+---------------+----+-------------+
only showing top 2 rows



Question 6: Observe the above results and list the EMPLOYER_NAME that had the maximum approved applications for each year for 2012, 2013, 2014, 2015 and 2016?

In [66]:
data_DF_Approved.where("YEAR != '2011'").groupBy('EMPLOYER_NAME','YEAR').max("approvedcount").show()


+--------------------+----+------------------+
|       EMPLOYER_NAME|YEAR|max(approvedcount)|
+--------------------+----+------------------+
|ERNST & YOUNG U.S...|2015|               249|
|         YAHOO! INC.|2016|                26|
|UNIVERSITY OF MIC...|2015|                24|
|THE UNIVERSITY OF...|2012|                 9|
|SYSFORE TECHNOLOG...|2015|                 8|
|SCHLUMBERGER TECH...|2016|                 8|
|GENERAL MOTORS CO...|2015|                 8|
|      EXPICIENT, INC|2015|                 8|
|SYSCOM TECHNOLOGI...|2012|                 7|
|      MOURI TECH LLC|2014|                 7|
|STRATEGIC SYSTEMS...|2012|                 7|
|MASSACHUSETTS INS...|2016|                 7|
|VENTURESOFT GLOBA...|2015|                 7|
| SANDISK CORPORATION|2014|                 6|
|ALPHA NET CONSULT...|2012|                 6|
|VISIONET SYSTEMS,...|2014|                 6|
|       WORKDAY, INC.|2016|                 6|
|MANHATTAN ASSOCIA...|2012|                 5|
|JACKSON THER

Question 7: List the approved applications count in the descending order for the JOB_TITLE = "DATA SCIENTIST" and for each employer and year.

In [67]:
dataDF.where("CASE_STATUS = 'CERTIFIED'").where("JOB_TITLE = 'DATA SCIENTIST'").groupBy('EMPLOYER_NAME','YEAR').agg(
    count("*").alias("approvedcount")).orderBy(desc("approvedcount")).show()

+--------------------+----+-------------+
|       EMPLOYER_NAME|YEAR|approvedcount|
+--------------------+----+-------------+
|MICROSOFT CORPORA...|2016|            5|
|      FACEBOOK, INC.|2015|            3|
|UBER TECHNOLOGIES...|2016|            3|
|MICROSOFT CORPORA...|2015|            3|
|LINKEDIN CORPORATION|2014|            2|
|     TRIPADVISOR LLC|2016|            2|
|LINKEDIN CORPORATION|2016|            2|
| CITYGRID MEDIA, LLC|2012|            1|
|PIONEER HI-BRED I...|2015|            1|
|          LYFT, INC.|2016|            1|
|   REVON SYSTEMS LLC|2015|            1|
|     METROMILE, INC.|2016|            1|
|        CELECT, INC.|2016|            1|
|    TEKSYSTEMS, INC.|2016|            1|
|HUMANA INSURANCE ...|2016|            1|
|      FACEBOOK, INC.|2013|            1|
|       BANKRATE INC.|2016|            1|
|QUANTIPLY CORPORA...|2015|            1|
|AMERICAN EXPRESS ...|2016|            1|
|        DAVOCADO LLC|2015|            1|
+--------------------+----+-------

Question 8: Find the null values count in each column

In [7]:
dataDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataDF.columns]).show(truncate=False)

+------+-------------+--------+---------+------------------+---------------+----+--------+----+----+-----------+
|ROW_ID|EMPLOYER_NAME|SOC_NAME|JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|WORKSITE|lon |lat |CASE_STATUS|
+------+-------------+--------+---------+------------------+---------------+----+--------+----+----+-----------+
|0     |5            |1025    |2        |0                 |7              |0   |0       |6374|6374|0          |
+------+-------------+--------+---------+------------------+---------------+----+--------+----+----+-----------+



Question 9: Remove all the rows with null values (in any column/position).

In [7]:
dataDFFinal = dataDF.dropna()

Question 10: Verify the null values count in each column.

In [9]:
#Verify
dataDFFinal.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataDFFinal.columns]).show(truncate=False)

+------+-------------+--------+---------+------------------+---------------+----+--------+---+---+-----------+
|ROW_ID|EMPLOYER_NAME|SOC_NAME|JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|WORKSITE|lon|lat|CASE_STATUS|
+------+-------------+--------+---------+------------------+---------------+----+--------+---+---+-----------+
|0     |0            |0       |0        |0                 |0              |0   |0       |0  |0  |0          |
+------+-------------+--------+---------+------------------+---------------+----+--------+---+---+-----------+



Question 10:List the count of applications in each status (CASE_STATUS) in the descending order of the year.

In [77]:
dataDFFinal.groupBy('YEAR','CASE_STATUS').agg(
    count("*").alias("count")).orderBy(desc("YEAR")).show()

+----+-------------------+-----+
|YEAR|        CASE_STATUS|count|
+----+-------------------+-----+
|2016|          CERTIFIED|32960|
|2016|CERTIFIED-WITHDRAWN| 2686|
|2016|             DENIED|  501|
|2016|          WITHDRAWN| 1258|
|2015|             DENIED|  623|
|2015|CERTIFIED-WITHDRAWN| 2452|
|2015|          WITHDRAWN| 1122|
|2015|          CERTIFIED|31937|
|2014|          CERTIFIED|26195|
|2014|CERTIFIED-WITHDRAWN| 2199|
|2014|             DENIED|  674|
|2014|          WITHDRAWN|  907|
|2013|             DENIED|  630|
|2013|          WITHDRAWN|  651|
|2013|          CERTIFIED|21979|
|2013|CERTIFIED-WITHDRAWN| 2073|
|2012|CERTIFIED-WITHDRAWN| 1729|
|2012|          CERTIFIED|19954|
|2012|          WITHDRAWN|  605|
|2012|             DENIED| 1126|
+----+-------------------+-----+
only showing top 20 rows



Question 12: Find the mean PREVAILING_WAGE for each year for the approved applications.

In [79]:
dataDFFinal.where("CASE_STATUS = 'CERTIFIED'").groupBy('YEAR').agg(
    round(mean("PREVAILING_WAGE"),2).alias("mean_wage")).show()

+----+---------+
|YEAR|mean_wage|
+----+---------+
|2015| 71739.94|
|2013| 69874.55|
|2014| 70691.36|
|2012| 67424.46|
|2016| 74058.22|
|2011| 66243.87|
+----+---------+



Question 13: Find the mean PREVAILING_WAGE for each year for the approved applications for each employer.

In [80]:
dataDFFinal.where("CASE_STATUS = 'CERTIFIED'").groupBy('EMPLOYER_NAME','YEAR').agg(
    round(mean("PREVAILING_WAGE"),2).alias("mean_wage")).show()

+--------------------+----+---------+
|       EMPLOYER_NAME|YEAR|mean_wage|
+--------------------+----+---------+
|ERNST & YOUNG U.S...|2015|  70352.8|
|LILAX TECHNOLOGIE...|2015|  66737.0|
|TRINITY CONSULTIN...|2015|  71989.0|
|UNIVERSITY OF MIC...|2015|  51929.5|
|D&S MACHINE WORKS...|2012|  59093.0|
|   CENTRAPRISE, CORP|2014|  56160.0|
|     PLAYFIRST, INC.|2011| 107099.0|
|DATA EXCHANGE COR...|2011|  39395.0|
|       SAR TECH, LLC|2016|  50523.0|
|RALPH LAUREN CORP...|2016|136122.33|
|BAL SEAL ENGINEER...|2015|  51667.0|
|SYSCOM TECHNOLOGI...|2012| 74440.14|
|ESOLUTIONS AMERIC...|2015|  65738.5|
|      MOURI TECH LLC|2014| 62911.14|
|TRUISI ARCHITECTU...|2016|  38729.6|
|GW COMMUNICATIONS...|2012|  51209.6|
|MANHATTAN ASSOCIA...|2012|  70879.6|
|ACTIONTEC ELECTRO...|2016|  92290.0|
|SYSFORE TECHNOLOG...|2015|  59797.5|
|  AMPHION GLOBAL INC|2013|  45947.5|
+--------------------+----+---------+
only showing top 20 rows



In [110]:
dataDFFinal.select('CASE_STATUS').distinct().show()

+-------------------+
|        CASE_STATUS|
+-------------------+
|          CERTIFIED|
|CERTIFIED-WITHDRAWN|
|          WITHDRAWN|
|             DENIED|
+-------------------+



Question 14: Find the approved applications count in each year for the full time positions in the descending order of the year.

In [81]:
dataDFFinal.where("CASE_STATUS = 'CERTIFIED'").where("FULL_TIME_POSITION = 'Y'").groupBy('YEAR').agg(
    count("*").alias("countfulltime")).orderBy(desc("YEAR")).show()

+----+-------------+
|YEAR|countfulltime|
+----+-------------+
|2016|        15292|
|2015|        31210|
|2014|        25520|
|2013|        21347|
|2012|        19308|
|2011|        16509|
+----+-------------+



Question 15: Identify the different levels/labels/classes/categories in CASE_STATUS field and generate indexes, for each class/label/category starting from 0 in a new column named ‘label’ using an user defined function.

In [8]:
## user defined function
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

Function1 = udf(lambda x: '0' if x == 'CERTIFIED' else '1' if x == 'CERTIFIED-WITHDRAWN' else '2' if x == 'DENIED' else '3', StringType())

In [9]:
data_model = dataDFFinal.withColumn("label",Function1(dataDFFinal["CASE_STATUS"]))
data_model.show(5)

+-------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------------+-----+
| ROW_ID|       EMPLOYER_NAME|            SOC_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|            WORKSITE|         lon|       lat|        CASE_STATUS|label|
+-------+--------------------+--------------------+--------------------+------------------+---------------+----+--------------------+------------+----------+-------------------+-----+
|2803367| VENTUS NETWORKS LLC|Computer Software...|SOFTWARE DEVELOPM...|                 N|        82035.2|2011|SANTA CLARA, CALI...|-121.9552356|37.3541079|          CERTIFIED|    0|
|2512758|ELI LILLY AND COM...|Biochemists and B...|SENIOR RESEARCH S...|                 Y|        77230.0|2012|INDIANAPOLIS, IND...|  -86.158068| 39.768403|          CERTIFIED|    0|
|2657095|   ERNST & YOUNG LLP|            Auditors|    ASSURANCE SENIOR|        

In [10]:
#drop column CASE_STATUS as now we have label columns
data_model = data_model.drop("CASE_STATUS")

In [11]:
data_model = data_model.drop("ROW_ID","WORKSITE","SOC_NAME")

In [12]:
data_model.show(5)

+--------------------+--------------------+------------------+---------------+----+------------+----------+-----+
|       EMPLOYER_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|         lon|       lat|label|
+--------------------+--------------------+------------------+---------------+----+------------+----------+-----+
| VENTUS NETWORKS LLC|SOFTWARE DEVELOPM...|                 N|        82035.2|2011|-121.9552356|37.3541079|    0|
|ELI LILLY AND COM...|SENIOR RESEARCH S...|                 Y|        77230.0|2012|  -86.158068| 39.768403|    0|
|   ERNST & YOUNG LLP|    ASSURANCE SENIOR|                 Y|        51800.0|2011| -84.3879824|33.7489954|    0|
|MOTOROLA MOBILITY...|   RESEARCH ENGINEER|                 Y|        91380.0|2013| -87.9531303|42.2830786|    0|
|      UST GLOBAL INC|SENIOR SYSTEMS AN...|                 Y|        55744.0|2015| -77.4360481|37.5407246|    1|
+--------------------+--------------------+------------------+---------------+----+-----

In [13]:
##renaming columns
data_model = data_model.withColumnRenamed("label", "class")
data_model.show(5)

+--------------------+--------------------+------------------+---------------+----+------------+----------+-----+
|       EMPLOYER_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|         lon|       lat|class|
+--------------------+--------------------+------------------+---------------+----+------------+----------+-----+
| VENTUS NETWORKS LLC|SOFTWARE DEVELOPM...|                 N|        82035.2|2011|-121.9552356|37.3541079|    0|
|ELI LILLY AND COM...|SENIOR RESEARCH S...|                 Y|        77230.0|2012|  -86.158068| 39.768403|    0|
|   ERNST & YOUNG LLP|    ASSURANCE SENIOR|                 Y|        51800.0|2011| -84.3879824|33.7489954|    0|
|MOTOROLA MOBILITY...|   RESEARCH ENGINEER|                 Y|        91380.0|2013| -87.9531303|42.2830786|    0|
|      UST GLOBAL INC|SENIOR SYSTEMS AN...|                 Y|        55744.0|2015| -77.4360481|37.5407246|    1|
+--------------------+--------------------+------------------+---------------+----+-----

In [14]:
cat_Var_Names = [ 'EMPLOYER_NAME', 'JOB_TITLE', 'FULL_TIME_POSITION', 'YEAR', 'class']

num_Var_Names = ['PREVAILING_WAGE','lon','lat']

In [15]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [16]:
## for numeric features vectorize it
assembler_Num = VectorAssembler(inputCols=num_Var_Names, outputCol="num_features")
#output = assembler_Num.transform(data_model)
#print("Assembled columns to vector column 'num_features'")
#output.select(num_Var_Names).show(truncate=False)

In [17]:
from pyspark.ml.feature import MinMaxScaler

min_Max_Scalar = MinMaxScaler(inputCol="num_features", outputCol="scaled_num_features")

Question 16: Dummify categorical variables (using String Indexer and One Hot Encoder).

In [18]:
indexers_Cat = [StringIndexer(inputCol=cat_Var_Name, outputCol="{0}_index".format(cat_Var_Name), handleInvalid="skip") for cat_Var_Name in cat_Var_Names ]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]
assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat], outputCol="cat_features")

Question 17: Create a new column ‘features’ (feature vector for all the columns used for the model building) by combining the vectors created for the categorical variables and numerical features.

In [19]:
assembler = VectorAssembler(inputCols=["scaled_num_features", "cat_features"], outputCol="features")

In [20]:
indexer_Label = StringIndexer(inputCol="class", outputCol="label")

Question 18: Perform train and test splits

In [21]:
splitDF = data_model.randomSplit([0.7, 0.3], seed=1234)
print(splitDF[0].count())
print(splitDF[1].count())
(trainingData, testData) = splitDF[0], splitDF[1]

120632
51710


In [22]:
trainingData.show(2)
testData.show(2)

+--------------------+--------------------+------------------+---------------+----+-----------+----------+-----+
|       EMPLOYER_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|        lon|       lat|class|
+--------------------+--------------------+------------------+---------------+----+-----------+----------+-----+
|101224 ENTERPRISE...|FINANCIAL & ADMIN...|                 Y|        67787.0|2012|-80.2331036|26.1275862|    0|
| 108 INFO SYSTEM LLC|COMPUTER SYSTEMS ...|                 N|        63814.0|2016|-77.3570028|38.9586307|    0|
+--------------------+--------------------+------------------+---------------+----+-----------+----------+-----+
only showing top 2 rows

+--------------------+--------------------+------------------+---------------+----+------------+----------+-----+
|       EMPLOYER_NAME|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|YEAR|         lon|       lat|class|
+--------------------+--------------------+------------------+-------

In [22]:
preprocessiong_Stages = [assembler_Num]+[min_Max_Scalar]+indexers_Cat+encoders_Cat+[assembler_Cat]+[assembler]+[indexer_Label]

Question 19: Build 3 models ML models,
a. logistic regression
b. logistic regression with cross validation
c. one other model of your choice.

Question 19 a: logistic regression

In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=20, labelCol="label", featuresCol="features")

In [31]:
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=preprocessiong_Stages+[lr]) 

In [177]:
lr_Pipeline_model = lr_Pipeline.fit(trainingData)

train_predictions_lr = lr_Pipeline_model.transform(trainingData)
test_predictions_lr = lr_Pipeline_model.transform(testData)

Question 20: Derive train and test accuracies for logistic regression model without cross validation

In [179]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

predictionAndLabels_train_lr = train_predictions_lr.select("prediction", "label")
train_accuracy_lr = evaluator.evaluate(predictionAndLabels_train_lr)

print("Train accuracy  = " + str(train_accuracy_lr))

predictionAndLabels_test_lr = test_predictions_lr.select("prediction", "label")
test_accuracy_lr = evaluator.evaluate(predictionAndLabels_test_lr)

print("Test accuracy = " + str(test_accuracy_lr))

Train accuracy  = 1.0
Test accuracy = 0.994150737455


Question 19: logistic regression with cross validation

In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [43]:
## using numFolds = 2
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1]) \
    .addGrid(lr.elasticNetParam, [0.5])\
    .build()
    
lr_crossval = CrossValidator(estimator=lr_Pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=2)     

In [44]:
# Run cross-validation, and choose the best set of parameters.
lr_crossval_Model = lr_crossval.fit(trainingData)

In [45]:
train_predictions_lrcv = lr_crossval_Model.transform(trainingData)
test_predictions_lrcv = lr_crossval_Model.transform(testData)

Question 20: Derive train and test accuracies for Logistic Regression Cross Validation

In [46]:
predictionAndLabels_train_lrcv = train_predictions_lrcv.select("prediction", "label")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_lrcv)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_lrcv = test_predictions_lrcv.select("prediction", "label")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_lrcv)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 0.970281517342
Test set accuracy = 0.972768744228


Question 19c: Decision Tree Classifier

In [24]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [25]:
from pyspark.ml import Pipeline

In [26]:
dt_Pipeline = Pipeline(stages=preprocessiong_Stages+[dt]) 

In [27]:
## build the model
dt_Pipeline_model = dt_Pipeline.fit(trainingData)

Py4JJavaError: An error occurred while calling o118.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 30.0 failed 1 times, most recent failure: Lost task 3.0 in stage 30.0 (TID 144, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.TreePoint$.org$apache$spark$ml$tree$impl$TreePoint$$labeledPointToTreePoint(TreePoint.scala:89)
	at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:73)
	at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:72)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:743)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1101)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1099)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:68)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:743)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:742)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:742)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:563)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:198)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:116)
	at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:45)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.ml.tree.impl.TreePoint$.org$apache$spark$ml$tree$impl$TreePoint$$labeledPointToTreePoint(TreePoint.scala:89)
	at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:73)
	at org.apache.spark.ml.tree.impl.TreePoint$$anonfun$convertToTreeRDD$2.apply(TreePoint.scala:72)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.storage.memory.PartiallyUnrolledIterator.next(MemoryStore.scala:743)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:174)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1101)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1$$anonfun$apply$7.apply(BlockManager.scala:1099)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:68)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 33874)
----------------------------------------


Traceback (most recent call last):
  File "/usr/lib64/python2.7/SocketServer.py", line 295, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 321, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib64/python2.7/SocketServer.py", line 334, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib64/python2.7/SocketServer.py", line 649, in __init__
    self.handle()
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/accumulators.py", line 235, in handle
    num_updates = read_int(self.rfile)
  File "/usr/hdp/current/spark2-client/python/lib/pyspark.zip/pyspark/serializers.py", line 685, in read_int
    raise EOFError
EOFError


In [None]:
train_predictions_dt = dt_Pipeline_model.transform(trainingData)
test_predictions_dt = dt_Pipeline_model.transform(testData)

Question 20: Derive train and test accuracies for Decision Tree

In [None]:
predictionAndLabels_train_dt = train_predictions_dt.select("prediction", "label")
train_accuracy_dt = evaluator.evaluate(predictionAndLabels_train_dt)

print("Train accuracy  = " + str(train_accuracy_dt))

predictionAndLabels_test_dt = test_predictions_dt.select("prediction", "label")
test_accuracy_dt = evaluator.evaluate(predictionAndLabels_test_dt)

print("Test accuracy = " + str(test_accuracy_dt))

Tried 5 times, could not run Decision Tree, it is giving OutOfMemory or Connection Refused error, so final try with SVM

In [24]:
from pyspark.ml.classification import OneVsRest

In [25]:
from pyspark.ml import Pipeline

In [26]:
ovr = OneVsRest(classifier=lr)
ovr_Pipeline = Pipeline(stages=preprocessiong_Stages+[ovr]) 

In [27]:
## build the model
ovr_Pipeline_model = ovr_Pipeline.fit(trainingData)

In [28]:
train_predictions_ovr = ovr_Pipeline_model.transform(trainingData)
test_predictions_ovr = ovr_Pipeline_model.transform(testData)

Question 20: Derive train and test accuracies for SVM

In [29]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

In [30]:
predictionAndLabels_train_ovr = train_predictions_ovr.select("prediction", "label")
train_accuracycv = evaluator.evaluate(predictionAndLabels_train_ovr)
print("Train set accuracy  = " + str(train_accuracycv))

predictionAndLabels_test_ovr = test_predictions_ovr.select("prediction", "label")
test_accuracycv = evaluator.evaluate(predictionAndLabels_test_ovr)
print("Test set accuracy = " + str(test_accuracycv))

Train set accuracy  = 1.0
Test set accuracy = 0.995046318323
