## This is the spark implementation of binary classification 
you can find other code for your local machine non-spark code here 

https://github.com/prakritidev/DataScience-Projects/tree/master/Udacity%20Machine%20Learning%20Projects/Supervised%20Learning/finding_donors

https://archive.ics.uci.edu/ml/datasets/Adult

Same dataset is used for this taks you can find this data on above link

In [3]:
%fs ls databricks-datasets/adult/adult.data   #using databrics dataset available for us. 

In [4]:
%sql DROP TABLE IF EXISTS adult 
-- '''droping if the table already exist '''

In [5]:
%sql

CREATE TABLE adult (
  age DOUBLE,
  workclass STRING,
  fnlwgt DOUBLE,
  education STRING,
  education_num DOUBLE,
  marital_status STRING,
  occupation STRING,
  relationship STRING,
  race STRING,
  sex STRING,
  capital_gain DOUBLE,
  capital_loss DOUBLE,
  hours_per_week DOUBLE,
  native_country STRING,
  income STRING)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")


-- '''creating sql table '''

In [6]:
dataset = spark.table("adult")
cols = dataset.columns
print(cols)

In [7]:
display(dataset) #only available on databricks platform

# Transforming data for machine learning algo readable using spark Pipeline API

In [9]:
###One-Hot Encoding
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

The below code will only create a placeholder for the data so there will be no computation done

In [11]:

categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
stages = [] # stages in our Pipeline or transformations in dataset
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer similar to LabelEncoder in sklearn 
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  #    id | category | categoryIndex
  # ----|----------|---------------
  #  0  | a        | 0.0
  #  1  | b        | 2.0
  #  2  | c        | 1.0
  #  3  | a        | 0.0
  #  4  | a        | 0.0
  #  5  | c        | 1.0
  
  #stringHolder will hold the data like categoryIndex

  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [12]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "income", outputCol = "label")
stages += [label_stringIdx]

In [13]:
# Transform all features into a vector using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [14]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)

In [15]:
#now the data will flow in spark and it will take some seconds to complete. 
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

In [16]:
# Keep relevant columns 
# label column is the transformation of Income
# Features are the numberical representation of all the data numerical+categorical. 
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [17]:
### Randomly split data into training and test sets. set seed for reproducibility
### Simialr to sklearn test_train_split
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

As you have noticed that in spark you create a methods data0holders and tranfromation graphs but the data will only change when you call the methods you defined (Spark jobs)

## Now we can do some machine learning on the data we transformed.

### Logistic Regression

In [21]:
from pyspark.ml.classification import LogisticRegression
regressor = LogisticRegression(labelCol="label", featuresCol="features", maxIter=50)
regressorModel = regressor.fit(trainingData)

In [22]:
predictions = regressorModel.transform(testData) # Scikit-Learn uses predict() method for prediction 

In [23]:
predictions.printSchema()

In [24]:
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)

## Model Evaluation

In [26]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [27]:
evaluator.getMetricName()

In [28]:
print regressor.explainParams()

## Hyper Parameter selection

In [30]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation. In Sklearn gridsearchCV is available for this 
paramGrid = (ParamGridBuilder()
             .addGrid(regressor.regParam, [0.01, 0.5, 2.0])
             .addGrid(regressor.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(regressor.maxIter, [1, 5, 10])
             .build())

In [31]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=regressor, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [32]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [33]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [34]:
print 'Model Intercept: ', cvModel.bestModel.intercept

In [36]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
display(selected)