In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
import pandas as pd
spark = SparkSession.builder\
.master("local[4]")\
.appName("DataExplore")\
.config("spak.executer.memory", "4g")\
.config("spark.driver.memory", "2g")\
.getOrCreate()

In [2]:
adultTrain = spark.read\
.option("header", "True")\
.option("inferSchema", "True")\
.option("sep", ",")\
.csv("adult_data")

In [3]:
adultTest = spark.read\
.option("header", "True")\
.option("inferSchema", "True")\
.option("sep", ",")\
.csv("adult_test")

In [4]:
adultTrain.limit(5).toPandas().head()

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,output
0,39,State-gov,77516.0,Bachelors,13.0,Never-married,Adm-clerical,Not-in-family,White,Male,2174.0,0.0,40.0,United-States,<=50K
1,50,Self-emp-not-inc,83311.0,Bachelors,13.0,Married-civ-spouse,Exec-managerial,Husband,White,Male,0.0,0.0,13.0,United-States,<=50K
2,38,Private,215646.0,HS-grad,9.0,Divorced,Handlers-cleaners,Not-in-family,White,Male,0.0,0.0,40.0,United-States,<=50K
3,53,Private,234721.0,11th,7.0,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0.0,0.0,40.0,United-States,<=50K
4,28,Private,338409.0,Bachelors,13.0,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0.0,0.0,40.0,Cuba,<=50K


In [5]:
adultTest.limit(5).toPandas().head()

Unnamed: 0,age,workclass,fnlwgt,education,education_num,marital_status,occupation,relationship,race,sex,capital_gain,capital_loss,hours_per_week,native_country,output
0,25,Private,226802.0,11th,7.0,Never-married,Machine-op-inspct,Own-child,Black,Male,0.0,0.0,40.0,United-States,<=50K.
1,38,Private,89814.0,HS-grad,9.0,Married-civ-spouse,Farming-fishing,Husband,White,Male,0.0,0.0,50.0,United-States,<=50K.
2,28,Local-gov,336951.0,Assoc-acdm,12.0,Married-civ-spouse,Protective-serv,Husband,White,Male,0.0,0.0,40.0,United-States,>50K.
3,44,Private,160323.0,Some-college,10.0,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688.0,0.0,40.0,United-States,>50K.
4,18,?,103497.0,Some-college,10.0,Never-married,?,Own-child,White,Female,0.0,0.0,30.0,United-States,<=50K.


In [6]:
adultTrain.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: double (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: double (nullable = true)
 |-- capital_loss: double (nullable = true)
 |-- hours_per_week: double (nullable = true)
 |-- native_country: string (nullable = true)
 |-- output: string (nullable = true)



In [7]:
adultTrain.describe(['age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']).toPandas()

Unnamed: 0,summary,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week
0,count,32561.0,32561.0,32561.0,32561.0,32561.0,32561.0
1,mean,38.58164675532078,189778.36651208505,10.0806793403151,1077.6488437087312,87.303829734959,40.437455852093
2,stddev,13.640432553581356,105549.97769702227,2.572720332067397,7385.292084840354,402.960218649002,12.347428681731838
3,min,17.0,12285.0,1.0,0.0,0.0,1.0
4,max,90.0,1484705.0,16.0,99999.0,4356.0,99.0


## <font color='red'>  DATA CLEANING
</font>

In [52]:
def dataCleaning(df):
    df = df \
    .withColumn("workclass", trim(col("workclass"))) \
    .withColumn("education", trim(col("education"))) \
    .withColumn("marital_status", trim(col("marital_status"))) \
    .withColumn("occupation", trim(col("occupation"))) \
    .withColumn("relationship", trim(col("relationship"))) \
    .withColumn("race", trim(col("race"))) \
    .withColumn("sex", trim(col("sex"))) \
    .withColumn("native_country", trim(col("native_country"))) \
    .withColumn("output", trim(col("output")))
    
    df = df \
    .withColumn("output", regexp_replace(col("output"), "<=50K.", "<=50K")) \
    .withColumn("output", regexp_replace(col("output"), ">50K.", ">50K"))
    
    df = df.filter(~(col("workclass").contains("?") | col("occupation").contains("?") | 
                               col("native_country").contains("?")))
    df = df.filter(~(col("workclass").contains("never-worked") | col("workclass").contains("without-pay") | 
                           col("occupation").contains("Armed-Forces") | col("native_country").contains("Holand-Netherlands")))
    df = df.withColumn("educationMerged", 
    when(col("education").isin("1st-4th", "5th-6th", "7th-8th"), "Elementary-School")
    .when(col("education").isin("9th", "11th", "12th", "10th"), "High-School")
    .when(col("education").isin("Masters", "Doctorate"), "postgraduate")
    .when(col("education").isin("Bachelors", "Some-college"), "Undergarduate")
    .otherwise(col("education")))

    
    orderedFeature = ["workclass", "educationMerged", "marital_status", "occupation", "relationship", "race",
                    "sex", "native_country", "age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week",
                    "output"]

    df = df.select(orderedFeature)
        
    return df


In [53]:
adultTrain = dataCleaning(adultTrain)
adultTrain.toPandas().head()

Unnamed: 0,workclass,educationMerged,marital_status,occupation,relationship,race,sex,native_country,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,output
0,State-gov,Undergarduate,Never-married,Adm-clerical,Not-in-family,White,Male,United-States,39,77516.0,13.0,2174.0,0.0,40.0,<=50K
1,Self-emp-not-inc,Undergarduate,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,50,83311.0,13.0,0.0,0.0,13.0,<=50K
2,Private,HS-grad,Divorced,Handlers-cleaners,Not-in-family,White,Male,United-States,38,215646.0,9.0,0.0,0.0,40.0,<=50K
3,Private,High-School,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,United-States,53,234721.0,7.0,0.0,0.0,40.0,<=50K
4,Private,Undergarduate,Married-civ-spouse,Prof-specialty,Wife,Black,Female,Cuba,28,338409.0,13.0,0.0,0.0,40.0,<=50K


In [54]:
adultTest = dataCleaning(adultTest)

In [55]:
adultTest.toPandas().head()

Unnamed: 0,workclass,educationMerged,marital_status,occupation,relationship,race,sex,native_country,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,output
0,Private,High-School,Never-married,Machine-op-inspct,Own-child,Black,Male,United-States,25,226802.0,7.0,0.0,0.0,40.0,<=50K
1,Private,HS-grad,Married-civ-spouse,Farming-fishing,Husband,White,Male,United-States,38,89814.0,9.0,0.0,0.0,50.0,<=50K
2,Local-gov,Assoc-acdm,Married-civ-spouse,Protective-serv,Husband,White,Male,United-States,28,336951.0,12.0,0.0,0.0,40.0,>50K
3,Private,Undergarduate,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,United-States,44,160323.0,10.0,7688.0,0.0,40.0,>50K
4,Private,High-School,Never-married,Other-service,Not-in-family,White,Male,United-States,34,198693.0,6.0,0.0,0.0,30.0,<=50K


In [91]:
Indexer = StringIndexer().setInputCols(["workclass", "educationMerged", "marital_status", "occupation", "relationship", "race", "sex", 
                                        "native_country"])\
.setOutputCols(["workclassIndex", "educationIndexer", "marital_statusIndex", "occupationIndex", "relationshipIndex", "raceIndex", 
                "sexIndex", "native_countryIndex",])

In [92]:
labelIndexer = StringIndexer().setInputCol("output").setOutputCol("label")

In [93]:
encoder = OneHotEncoder().setInputCols(["workclassIndex", "educationIndexer", "marital_statusIndex", "occupationIndex", "relationshipIndex", 
                                        "raceIndex", "sexIndex", "native_countryIndex"])\
.setOutputCols(["workclassEncoded", "EducationEncoded", "marital_statusEncoded", "occupationEncoded", "relationshipEncoded", "raceEncoded", 
                "sexEncoded", "native_countryEncoded"])

In [94]:
assembler = VectorAssembler().setInputCols(["age", "fnlwgt", "capital_gain", "capital_loss", "hours_per_week", "EducationEncoded",
                                       "workclassEncoded", "marital_statusEncoded", "occupationEncoded", "relationshipEncoded", 
                                        "raceEncoded", "sexEncoded", "native_countryEncoded"])\
.setOutputCol("vectorizedFeatures")

In [95]:
scaler = StandardScaler().setInputCol("vectorizedFeatures").setOutputCol("features")

In [96]:
lrObject = LogisticRegression().setFeaturesCol("features").setLabelCol("label").setPredictionCol("predictions")

In [98]:
piplineObject = Pipeline().setStages([Indexer, labelIndexer, encoder, assembler, scaler, lrObject])

In [99]:
adultTrain.toPandas().head()

Unnamed: 0,workclass,educationMerged,marital_status,occupation,relationship,race,sex,native_country,age,fnlwgt,education_num,capital_gain,capital_loss,hours_per_week,output
0,State-gov,Undergarduate,Never-married,Adm-clerical,Not-in-family,White,Male,United-States,39,77516.0,13.0,2174.0,0.0,40.0,<=50K
1,Self-emp-not-inc,Undergarduate,Married-civ-spouse,Exec-managerial,Husband,White,Male,United-States,50,83311.0,13.0,0.0,0.0,13.0,<=50K
2,Private,HS-grad,Divorced,Handlers-cleaners,Not-in-family,White,Male,United-States,38,215646.0,9.0,0.0,0.0,40.0,<=50K
3,Private,High-School,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,United-States,53,234721.0,7.0,0.0,0.0,40.0,<=50K
4,Private,Undergarduate,Married-civ-spouse,Prof-specialty,Wife,Black,Female,Cuba,28,338409.0,13.0,0.0,0.0,40.0,<=50K


In [100]:
piplineModel = piplineObject.fit(adultTrain)

In [106]:
pred = piplineModel.transform(adultTest).select("label", "predictions")
pred.toPandas().head()

Unnamed: 0,label,predictions
0,0.0,0.0
1,0.0,0.0
2,1.0,0.0
3,1.0,1.0
4,0.0,0.0


In [109]:
pred.show()

+-----+-----------+
|label|predictions|
+-----+-----------+
|  0.0|        0.0|
|  0.0|        0.0|
|  1.0|        0.0|
|  1.0|        1.0|
|  0.0|        0.0|
|  1.0|        1.0|
|  0.0|        0.0|
|  0.0|        0.0|
|  1.0|        1.0|
|  0.0|        0.0|
|  0.0|        0.0|
|  1.0|        0.0|
|  1.0|        1.0|
|  0.0|        0.0|
|  0.0|        0.0|
|  0.0|        0.0|
|  1.0|        1.0|
|  0.0|        0.0|
|  0.0|        0.0|
|  0.0|        0.0|
+-----+-----------+
only showing top 20 rows



In [110]:
def metrices(df):
    tp = df.select("label", "predictions").where("predictions == 1 and label == 1").count()
    tn = df.select("label", "predictions").where("predictions == 0 and label == 0").count()
    fp = df.select("label", "predictions").where("predictions == 1 and label == 0").count()
    fn = df.select("label", "predictions").where("predictions == 0 and label == 1").count()
    
    accuracy = float((tp+tn)/(tp+tn+fp+fn))
    recall = float(tp/(tp+fn))
    precision = float(tp/(tp+fp))
    print("accuracy: " , accuracy , "\n" , "recall: " , recall , "\n" , "precision: " , precision)

In [111]:
metrices(pred)

accuracy:  0.844902025905015 
 recall:  0.6002163916689207 
 precision:  0.7213914174252276
