In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import OneHotEncoderEstimator, MinMaxScaler, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
conf = SparkConf().setMaster('local').setAppName('Predict Adult Salary')
sc = SparkContext(conf = conf)

In [3]:
spark = SparkSession.builder.appName("Predict Adult Salary").getOrCreate()

In [4]:
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("workclass", StringType(), True),
    StructField("fnlwgt", IntegerType(), True),
    StructField("education", StringType(), True),
    StructField("education-num", IntegerType(), True),
    StructField("marital-status", StringType(), True),
    StructField("occupation", StringType(), True),
    StructField("relationship", StringType(), True),
    StructField("race", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("capital-gain", IntegerType(), True),
    StructField("capital-loss", IntegerType(), True),
    StructField("hours-per-week", IntegerType(), True),
    StructField("native-country", StringType(), True),
    StructField("salary", StringType(), True)
])

In [5]:
train_df = spark.read.csv('train.csv', header=False, schema=schema)

test_df = spark.read.csv('test.csv', header=False, schema=schema)

In [6]:
train_df.head(5)

[Row(age=39, workclass='State-gov', fnlwgt=77516, education='Bachelors', education-num=13, marital-status='Never-married', occupation='Adm-clerical', relationship='Not-in-family', race='White', sex='Male', capital-gain=2174, capital-loss=0, hours-per-week=40, native-country='United-States', salary='<=50K'),
 Row(age=50, workclass='Self-emp-not-inc', fnlwgt=83311, education='Bachelors', education-num=13, marital-status='Married-civ-spouse', occupation='Exec-managerial', relationship='Husband', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=13, native-country='United-States', salary='<=50K'),
 Row(age=38, workclass='Private', fnlwgt=215646, education='HS-grad', education-num=9, marital-status='Divorced', occupation='Handlers-cleaners', relationship='Not-in-family', race='White', sex='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', salary='<=50K'),
 Row(age=53, workclass='Private', fnlwgt=234721, education='11th', educati

In [7]:
train_df.limit(5).toPandas()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country,salary
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States,<=50K
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States,<=50K
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States,<=50K
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba,<=50K


In [8]:
categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']

indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]

encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)

scaler = MinMaxScaler(
    inputCol="categorical-features",
    outputCol="scaled-categorical-features"
)

pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler])

train_df = pipeline.fit(train_df).transform(train_df)

test_df = pipeline.fit(test_df).transform(test_df)

In [9]:
train_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- sex-index: double (nullable = false)
 |-- native-country-index: double 

In [10]:
df = train_df.limit(5).toPandas()

In [11]:
df['scaled-categorical-features'][1]

DenseVector([0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])

In [12]:
continuous_variables = ['age', 'fnlwgt', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']

assembler = VectorAssembler(
    inputCols=['scaled-categorical-features', *continuous_variables],
    outputCol='features'
)

train_df = assembler.transform(train_df)

test_df = assembler.transform(test_df)

In [13]:
train_df.limit(5).toPandas()['features'][0]

SparseVector(99, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 93: 39.0, 94: 77516.0, 95: 13.0, 96: 2174.0, 98: 40.0})

In [14]:
indexer = StringIndexer(inputCol='salary', outputCol='label')

train_df = indexer.fit(train_df).transform(train_df)

test_df = indexer.fit(test_df).transform(test_df)

In [15]:
train_df.limit(10).toPandas()['label']

0    0.0
1    0.0
2    0.0
3    0.0
4    0.0
5    0.0
6    0.0
7    1.0
8    1.0
9    1.0
Name: label, dtype: float64

In [16]:
lr = LogisticRegression(featuresCol='features', labelCol='label')

model = lr.fit(train_df)

In [17]:
pred = model.transform(test_df)

In [21]:
pred.limit(10).toPandas()[['label', 'prediction']]

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,0.0
2,1.0,0.0
3,1.0,1.0
4,0.0,0.0
5,0.0,0.0
6,0.0,0.0
7,1.0,1.0
8,0.0,0.0
9,0.0,0.0
