In [1]:
import pyspark
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.linalg import DenseVector
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import *

## Basic operation with PySpark

In [2]:
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"

In [3]:
sc = SparkContext()
sc.addFile(url)
sqlContext = SQLContext(sc)

In [4]:
# df with inderShema to True
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [5]:
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [6]:
# see the data with show
df.limit(5).toPandas()

Unnamed: 0,x,age,workclass,fnlwgt,education,educational-num,marital-status,occupation,relationship,race,gender,capital-gain,capital-loss,hours-per-week,native-country,income
0,1,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K
1,2,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K
2,3,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K
3,4,44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K
4,5,18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K


In [7]:
# df with inderShema to False
df_string = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=False)

In [8]:
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [9]:
df_string.limit(5).toPandas()

Unnamed: 0,x,age,workclass,fnlwgt,education,educational-num,marital-status,occupation,relationship,race,gender,capital-gain,capital-loss,hours-per-week,native-country,income
0,1,25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K
1,2,38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K
2,3,28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K
3,4,44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K
4,5,18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K


In [10]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']

# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())

# Check the dataset
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [11]:
# Select columns
df_string.select('age','fnlwgt').show(5)

+----+--------+
| age|  fnlwgt|
+----+--------+
|25.0|226802.0|
|38.0| 89814.0|
|28.0|336951.0|
|44.0|160323.0|
|18.0|103497.0|
+----+--------+
only showing top 5 rows



In [12]:
"""
Count by group
If you want to count the number of occurence by group, you can chain:

1.groupBy()
2.count()
together. In the example below, you count the number of rows by the education level.
"""
df_string.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [13]:
"""
Describe the data
To get a summary statistics, of the data, you can use describe(). It will compute the :

1.count
2.mean
3.standarddeviation
4.min
5.max
"""
df_string.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

In [14]:
df_string.describe('capital-gain').show()

+-------+------------------+
|summary|      capital-gain|
+-------+------------------+
|  count|             48842|
|   mean|1079.0676262233324|
| stddev| 7452.019057655418|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



In [15]:
"""
Crosstab computation
In some occasion, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.
"""
df_string.crosstab('age', 'income').sort("age_income").show()

+----------+-----+----+
|age_income|<=50K|>50K|
+----------+-----+----+
|      17.0|  595|   0|
|      18.0|  862|   0|
|      19.0| 1050|   3|
|      20.0| 1112|   1|
|      21.0| 1090|   6|
|      22.0| 1161|  17|
|      23.0| 1307|  22|
|      24.0| 1162|  44|
|      25.0| 1119|  76|
|      26.0| 1068|  85|
|      27.0| 1117| 115|
|      28.0| 1101| 179|
|      29.0| 1025| 198|
|      30.0| 1031| 247|
|      31.0| 1050| 275|
|      32.0|  957| 296|
|      33.0| 1045| 290|
|      34.0|  949| 354|
|      35.0|  997| 340|
|      36.0|  948| 400|
+----------+-----+----+
only showing top 20 rows



In [16]:
"""
Drop column
There are two intuitive API to drop columns:

drop(): Drop a column
dropna(): Drop NA's
"""
df_string.drop('educational-num').columns

['x',
 'age',
 'workclass',
 'fnlwgt',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender',
 'capital-gain',
 'capital-loss',
 'hours-per-week',
 'native-country',
 'income']

In [17]:
"""
Filter data
You can use filter() to apply descriptive statistics in a subset of data. For instance, you can count the number of people above 40 year old
"""
df_string.filter(df_string.age > 40).count()

20211

In [18]:
df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()

+--------------------+------------------+
|      marital-status| avg(capital-gain)|
+--------------------+------------------+
|           Separated| 581.8424836601307|
|       Never-married|  384.382639449029|
|Married-spouse-ab...| 629.0047770700637|
|            Divorced| 793.6755615860094|
|             Widowed| 603.6442687747035|
|   Married-AF-spouse|2971.6216216216217|
|  Married-civ-spouse|1739.7006121810625|
+--------------------+------------------+



## Data preprocessing

In [19]:
# 1 Select the column
age_square = df_string.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df_string = df_string.withColumn("age_square", col("age")**2)

df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



In [20]:
df_string = df_string.withColumnRenamed("native-country", "native_country")
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'educational-num', 'marital-status',
           'occupation', 'relationship', 'race', 'gender', 'capital-gain', 'capital-loss',
           'hours-per-week', 'native_country', 'income']
df_string = df_string.select(COLUMNS)
df_string.first()

Row(age=25.0, age_square=625.0, workclass='Private', fnlwgt=226802.0, education='11th', educational-num=7.0, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0.0, capital-loss=0.0, hours-per-week=40.0, native_country='United-States', income='<=50K')

In [21]:
"""
Exclude Holand-Netherlands

When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.

Let's check the origin of the household
"""
df_string.filter(df_string.native_country == 'Holand-Netherlands').count()
df_string.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|             Hungary|                   19|
|            Honduras|                   20|
|            Scotland|                   21|
|          Yugoslavia|                   23|
|Outlying-US(Guam-...|                   23|
|                Laos|                   23|
|     Trinadad&Tobago|                   27|
|            Cambodia|                   28|
|                Hong|                   30|
|            Thailand|                   30|
|             Ireland|                   37|
|              France|                   38|
|             Ecuador|                   45|
|                Peru|                   46|
|           Nicaragua|                   49|
|              Greece|                   49|
|                Iran|                   59|
|              Taiwan|                   65|
|         

In [22]:
df_remove = df_string.filter(df_string.native_country != 'Holand-Netherlands')

## Build a data processing pipeline

In [23]:
# First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column.
stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")

# Fit the data and transform it
model = stringIndexer.fit(df_string)
indexed = model.transform(df_string)

# Create the news columns based on the group. 
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)

+----+----------+---------+--------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
| age|age_square|workclass|  fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native_country|income|workclass_encoded|workclass_vec|
+----+----------+---------+--------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+-----------------+-------------+
|25.0|     625.0|  Private|226802.0|     11th|            7.0|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|         0.0|         0.0|          40.0| United-States| <=50K|              0.0|(9,[0],[1.0])|
|38.0|    1444.0|  Private| 89814.0|  HS-grad|            9.0|Married-civ-spouse|  Farming-fishing|     Husband|Whit

In [24]:
# Encode the categorical data
categorical_variables = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native_country']

indexers = [StringIndexer(inputCol=column, outputCol=column+"-index") for column in categorical_variables]

encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=["{0}-encoded".format(indexer.getOutputCol()) for indexer in indexers]
)
assembler = VectorAssembler(
    inputCols=encoder.getOutputCols(),
    outputCol="categorical-features"
)

In [25]:
# # Create a Pipeline.
pipeline = Pipeline(stages=indexers + [encoder, assembler])
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [26]:
model.printSchema()

root
 |-- age: float (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- gender-index: double (nullable = fal

In [27]:
df_data = model.limit(5).toPandas()

In [28]:
continuous_variables = ['age', 'fnlwgt', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week']
assembler = VectorAssembler(
    inputCols=['categorical-features', *continuous_variables],
    outputCol='features'
)
train_df = assembler.transform(model)

In [29]:
train_df.printSchema()

root
 |-- age: float (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- gender-index: double (nullable = fal

In [30]:
train_df.limit(5).toPandas()['features'][0]

SparseVector(99, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 93: 25.0, 94: 226802.0, 95: 7.0, 98: 40.0})

In [31]:
indexer = StringIndexer(inputCol='income', outputCol='label')
train_df = indexer.fit(train_df).transform(train_df)

In [32]:
# Split the data into train and test sets
train_data, test_data = train_df.randomSplit([.8,.2],seed=1234)

In [33]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train_df)

In [34]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [-1.8311385808489677,-2.3710362639505056,-1.9753291190442144,-1.3421117683592219,-2.147634333059852,-1.6796502769353376,-1.3420132894181955,-2.747241967205135,0.22021819630884176,0.35784348559826124,0.42997365357962464,0.5434319844303361,0.2571551001872803,-0.09469006183201673,0.11640809721020415,0.03230985764138884,0.07405295919629493,0.8247343999583894,0.05877945855466732,0.10173281743346264,0.6613511445138025,0.5564371514229924,0.39714328665086523,-0.5724335599888886,-3.443260430701769,-3.017133638638736,-3.0833445474699004,-2.939140976359601,-2.8689167283829,-0.516030528848523,-0.9782111466195672,-0.280370904340365,-1.0515250805590117,-0.7886770948165618,-1.9192888370772316,-1.348500778876363,-2.3023927211430806,-1.1487165181637462,-1.7286008377592295,-2.007881657276677,-0.4837224197834709,-0.564310352515928,-2.9895711634084683,0.3968132902190918,1.0729087634095062,-0.03575989298094328,0.8949931073402168,1.5330145767401124,0.024867691817727813,-0.220250420435625,0.250

## Train and evaluate the model

In [35]:
predictions = model.transform(test_data)

In [36]:
predictions.printSchema()

root
 |-- age: float (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- workclass-index: double (nullable = false)
 |-- education-index: double (nullable = false)
 |-- marital-status-index: double (nullable = false)
 |-- occupation-index: double (nullable = false)
 |-- relationship-index: double (nullable = false)
 |-- race-index: double (nullable = false)
 |-- gender-index: double (nullable = fal

In [37]:
predictions.limit(10).toPandas()[['label', 'prediction']]

Unnamed: 0,label,prediction
0,0.0,0.0
1,0.0,0.0
2,0.0,0.0
3,0.0,0.0
4,0.0,0.0
5,0.0,0.0
6,0.0,0.0
7,0.0,0.0
8,0.0,0.0
9,0.0,0.0


In [38]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = model)

Model accuracy: 85.252%


In [39]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.9091272202831788
areaUnderROC
