In [2]:
"""
When starting the Docker container don't forget to mount the local directory to the container using parameter -v <host_directory>:/home/jovyan/work/. 
Otherwise, we will lose the work once the container is closed.
"""
import pandas as pd
import numpy as np
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
# First of all, you need to initialize the SQLContext is not already in initiated yet.

from pyspark.sql import SQLContext
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc = SparkContext()
sc.addFile(url)
sqlContext = SQLContext(sc)

In [4]:
# then, you can read the cvs file with sqlContext.read.csv. You use inferSchema set to True to tell Spark to guess automatically the type of data. 
# By default, it is turn to False.

# df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)

In [5]:
import pandas as pd
url = "https://raw.githubusercontent.com/sadhana1002/PredictingSalaryClass-Classification/master/adult.csv"
df = sqlContext.createDataFrame(pd.read_csv(url, 
                                      names=['Age','workclass',
                                             'fnlwgt','education',
                                             'education_num',
                                             'marital',
                                             'occupation',
                                             'relationship','race',
                                             'sex','capital_gain',
                                             'capital_loss',
                                             'hours_week',
                                             'native_country','label']))
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [6]:
# Let's have a look at the data type

df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [7]:
df.show(5, truncate = False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|workclass        |fnlwgt|education |education_num|marital            |occupation        |relationship  |race  |sex    |capital_gain|capital_loss|hours_week|native_country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40        | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13        | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divorced          | Hand

In [8]:
# # for inferSchema = False


# # Import all from `sql.types`
# from pyspark.sql.types import *

# # Write a custom function to convert the data type of DataFrame columns
# def convertColumn(df, names, newType):
#     for name in names: 
#         df = df.withColumn(name, df[name].cast(newType))
#     return df 
# # List of continuous features
# CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
# # Convert the type
# df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# # Check the dataset
# df_string.printSchema()

In [9]:
# You can select and show the rows with select and the names of the features. Below, age and fnlwgt are selected.

df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
+---+------+
only showing top 5 rows



In [10]:
# If you want to count the number of occurence by group, you can chain:

# groupBy()
# count()
# together. In the PySpark example below, you count the number of rows by the education level.

df.groupBy("education").count().sort("count",ascending=True).show()	

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



In [11]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+-----------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|summary|               Age|   workclass|            fnlwgt|    education|    education_num|  marital|       occupation|relationship|               race|    sex|      capital_gain|     capital_loss|        hours_week|native_country| label|
+-------+------------------+------------+------------------+-------------+-----------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|            32561|    32561|            32561|       32561|              32561|  32561|             32561|            32561|             32561|         32561| 32561|
|   mean| 38.58164675532078|        null

In [None]:
df.crosstab('age', 'label').sort("age_label").show()

In [12]:
# Drop column
# There are two intuitive API to drop columns:

# drop(): Drop a column
# dropna(): Drop NA's
# Below you drop the column education_num

df.drop('education_num').columns

['Age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

In [14]:
# You can use filter() to apply descriptive statistics in a subset of data. For instance, you can count the number of people above 40 year old

df.filter(df.Age > 40).count()

13443

In [15]:
# Finally, you can group data by group and compute statistical operations like the mean.

df.groupby('marital').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



In [16]:
# To add a new feature, you need to:

# Select the column
# Apply the transformation and add it to the DataFrame

from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [17]:
df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

# The feature native_country has only one household coming from Netherland. You exclude it.

df_remove = df.filter(df.native_country!='Holand-Netherlands')			


+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|            Honduras|                   13|
|             Hungary|                   13|
| Outlying-US(Guam...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [None]:
# Similar to scikit-learn, Pyspark has a pipeline API.

# Index the string to numeric
# Create the one hot encoder
# Transform the data

# First of all, you select the string column to index. The inputCol is the name of the column in the dataset. 
# outputCol is the new name given to the transformed column.
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")		

# Fit the data and transform it
model = stringIndexer.fit(df)		
`indexed = model.transform(df)``

# Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")



### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)

In [21]:
# You will build a pipeline to convert all the precise features and add them to the final dataset
# Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [22]:
# Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages

# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

In [29]:
df.columns

['Age',
 'workclass',
 'fnlwgt',
 'education',
 'education_num',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label',
 'age_square']

In [27]:
CATE_FEATURES

['workclass',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'native_country']

In [30]:
CONTI_FEATURES = [
    'Age',
    'fnlwgt',
    'capital_gain',
    'capital_loss',
    'hours_week',
    'label',
    'age_square'
]

In [31]:
# The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below popluate the list with 
# encoded categorical features and the continuous features.

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

In [32]:
# Finally, you pass all the steps in the VectorAssembler

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [33]:
# Now that all the steps are ready, you push the data to the pipeline.

# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

IllegalArgumentException: Data type string of column label is not supported.

In [None]:
# To make the computation faster, you convert model to a DataFrame.

#You need to select newlabel and features from model using map.

from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [None]:
# You are ready to create the train data as a DataFrame. You use the sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

In [None]:
# You split the dataset 80/20 with randomSplit.

# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [None]:
# Let's count how many people with income below/above 50k in both training and test set

train_data.groupby('label').agg({'label': 'count'}).show()

# +-----+------------+
# |label|count(label)|
# +-----+------------+
# |  0.0|       19698|
# |  1.0|        6263|
# +-----+------------+

test_data.groupby('label').agg({'label': 'count'}).show()

# +-----+------------+
# |label|count(label)|
# +-----+------------+
# |  0.0|        5021|
# |  1.0|        1578|
# +-----+------------+

In [None]:
# Last but not least, you can build the classifier. Pyspark has an API called LogisticRegression to perform logistic regression.

# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)


#You can see the coefficients from the regression
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

In [None]:
# To generate prediction for your test set,

# You can use linearModel with transform() on test_data

# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)
predictions.printSchema()


# You are interested by the label, prediction and the probability
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

In [None]:
# You create a DataFrame with the label and the `prediction.
cm = predictions.select("label", "prediction")

# You can check the number of class in the label and the prediction
cm.groupby('label').agg({'label': 'count'}).show()

# +-----+------------+
# |label|count(label)|
# +-----+------------+
# |  0.0|        5021|
# |  1.0|        1578|
# +-----+------------+

cm.groupby('prediction').agg({'prediction': 'count'}).show()

# +----------+-----------------+
# |prediction|count(prediction)|
# +----------+-----------------+
# |       0.0|             5982|
# |       1.0|              617|
# +----------+-----------------+

# You can compute the accuracy by computing the count when the label are correctly classified over the total number of rows.
cm.filter(cm.label == cm.prediction).count() / cm.count()

In [None]:
# You can wrap everything together and write a function to compute the accuracy.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

In [None]:
# ROC

### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

print(evaluator.evaluate(predictions))

In [None]:
# Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

# Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes around 16 minutes to train.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [None]:
accuracy_m(model = cvModel)

In [None]:
# You can exctract the recommended parameter by chaining cvModel.bestModel with extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()

# SUMMARY

In [None]:
# To begin with Spark, you need to initiate a Spark Context with:

# `SparkContext()``

# and and SQL context to connect to a data source:

# `SQLContext()``

# In the tutorial, you learn how to train a logistic regression:


# Convert the dataset to a Dataframe with:
input_data = rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

# Create the train/test set
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

# Train the model
lr = LogisticRegression(labelCol="label",featuresCol="features",maxIter=10, regParam=0.3)
linearModel = lr.fit(train_data)

# Make prediction
linearModel.transform(test_data)