In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.types import StructField,IntegerType, StructType,StringType
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import *
import numpy as np
url = "jdbc:queryService-jdbc:https://login.salesforce.com"
properties = {
    "driver": "com.salesforce.cdp.queryservice.QueryServiceDriver",
    "userName": "<USER NAME>",
    "password": "PASSWORD"
}
newdataset = sqlContext.read.jdbc(url=url,\
                               table="CDPartyDemographics__dlm", \
      properties=properties)         
dataset= newdataset.withColumn("age__c", newdataset.age__c.cast(IntegerType())) \
.withColumn("fnlwgt__c", newdataset.fnlwgt__c.cast(IntegerType())) \
.withColumn("education_num__c", newdataset.education_num__c.cast(IntegerType())) \
.withColumn("capital_gain__c", newdataset.capital_gain__c.cast(IntegerType())) \
.withColumn("capital_loss__c", newdataset.capital_loss__c.cast(IntegerType())) \
.withColumn("hours_per_week__c", newdataset.hours_per_week__c.cast(IntegerType())) 
cols = dataset.columns
print(cols)



In [0]:
display(dataset)

capital_loss__c,age__c,education_num__c,__datasourceentityname,individualid__c,marital_status__c,workclass__c,education__c,occupation__c,InternalOrganization__c,income__c,native_country__c,relationship__c,capital_gain__c,DataSource__c,sex__c,DataSourceObject__c,fnlwgt__c,hours_per_week__c,race__c
0,36,9,indtypeml__dll,0034V00002ZUolWQAT,Married-civ-spouse,Private,HS-grad,Craft-repair,,<=50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,155537,40,White
0,28,10,indtypeml__dll,0034V00002ZUolXQAT,Divorced,Private,Some-college,Adm-clerical,,<=50K,United-States,Not-in-family,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,183175,40,White
0,53,9,indtypeml__dll,0034V00002ZUolYQAT,Married-civ-spouse,Private,HS-grad,Adm-clerical,,>50K,United-States,Wife,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,169846,40,White
0,29,13,indtypeml__dll,0034V00002ZUoldQAD,Married-civ-spouse,Self-emp-not-inc,Bachelors,Sales,,>50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,162298,70,White
0,23,10,indtypeml__dll,0034V00002ZUoleQAD,Never-married,Private,Some-college,Machine-op-inspct,,<=50K,United-States,Not-in-family,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,211678,40,White
0,79,10,indtypeml__dll,0034V00002ZUolfQAD,Married-civ-spouse,Private,Some-college,Prof-specialty,,<=50K,United-States,Other-relative,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,124744,20,White
0,27,9,indtypeml__dll,0034V00002ZUolgQAD,Never-married,Private,HS-grad,Other-service,,<=50K,Mexico,Own-child,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,213921,40,White
0,40,12,indtypeml__dll,0034V00002ZUolhQAD,Married-civ-spouse,Private,Assoc-acdm,Adm-clerical,,<=50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,32214,40,White
0,67,6,indtypeml__dll,0034V00002ZUoliQAD,Married-civ-spouse,?,10th,?,,<=50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,212759,2,White
0,18,7,indtypeml__dll,0034V00002ZUoljQAD,Never-married,Private,11th,Other-service,,<=50K,United-States,Own-child,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,309634,22,White


In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [0]:
categoricalColumns = ["workclass__c", "education__c", "marital_status__c", "occupation__c", "relationship__c", "race__c", "sex__c", "native_country__c"]
stages = [] # stages in our Pipeline or transformations in dataset
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer similar to LabelEncoder in sklearn 
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  #    id | category | categoryIndex
  # ----|----------|---------------
  #  0  | a        | 0.0
  #  1  | b        | 2.0
  #  2  | c        | 1.0
  #  3  | a        | 0.0
  #  4  | a        | 0.0
  #  5  | c        | 1.0
  
  #stringHolder will hold the data like categoryIndex

  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [0]:
label_stringIdx = StringIndexer(inputCol = "income__c", outputCol = "label")
stages += [label_stringIdx]

In [0]:
numericCols = [ "age__c","fnlwgt__c", "education_num__c", "capital_gain__c", "capital_loss__c", "hours_per_week__c"]
assemblerInputs = list(map(lambda c: c + "classVec", categoricalColumns)) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [0]:
pipeline = Pipeline(stages=stages)


In [0]:
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

In [0]:
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

label,features,capital_loss__c,age__c,education_num__c,__datasourceentityname,individualid__c,marital_status__c,workclass__c,education__c,occupation__c,InternalOrganization__c,income__c,native_country__c,relationship__c,capital_gain__c,DataSource__c,sex__c,DataSourceObject__c,fnlwgt__c,hours_per_week__c,race__c
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 6, 18, 29, 35, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 36.0, 155537.0, 9.0, 40.0))",0,36,9,indtypeml__dll,0034V00002ZUolWQAT,Married-civ-spouse,Private,HS-grad,Craft-repair,,<=50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,155537,40,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 7, 20, 27, 36, 40, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 28.0, 183175.0, 10.0, 40.0))",0,28,10,indtypeml__dll,0034V00002ZUolXQAT,Divorced,Private,Some-college,Adm-clerical,,<=50K,United-States,Not-in-family,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,183175,40,White
1.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 6, 18, 27, 39, 40, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 53.0, 169846.0, 9.0, 40.0))",0,53,9,indtypeml__dll,0034V00002ZUolYQAT,Married-civ-spouse,Private,HS-grad,Adm-clerical,,>50K,United-States,Wife,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,169846,40,White
1.0,"Map(vectorType -> sparse, length -> 60, indices -> List(7, 18, 24, 35, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 49.0, 191681.0, 10.0, 50.0))",0,49,10,indtypeml__dll,0034V00002ZUolZQAT,Married-civ-spouse,Self-emp-inc,Some-college,Exec-managerial,,>50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,191681,50,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(4, 7, 19, 31, 37, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 25.0, 200681.0, 10.0, 40.0))",0,25,10,indtypeml__dll,0034V00002ZUolaQAD,Never-married,?,Some-college,?,,<=50K,United-States,Own-child,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,200681,40,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 7, 19, 23, 37, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 19.0, 101509.0, 10.0, 32.0))",0,19,10,indtypeml__dll,0034V00002ZUolbQAD,Never-married,Private,Some-college,Prof-specialty,,<=50K,United-States,Own-child,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,101509,32,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 8, 21, 25, 37, 41, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 31.0, 309974.0, 13.0, 40.0))",0,31,13,indtypeml__dll,0034V00002ZUolcQAD,Separated,Private,Bachelors,Sales,,<=50K,United-States,Own-child,0,s3dtc,Female,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,309974,40,Black
1.0,"Map(vectorType -> sparse, length -> 60, indices -> List(1, 8, 18, 25, 35, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 29.0, 162298.0, 13.0, 70.0))",0,29,13,indtypeml__dll,0034V00002ZUoldQAD,Married-civ-spouse,Self-emp-not-inc,Bachelors,Sales,,>50K,United-States,Husband,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,162298,70,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 7, 19, 28, 36, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 23.0, 211678.0, 10.0, 40.0))",0,23,10,indtypeml__dll,0034V00002ZUoleQAD,Never-married,Private,Some-college,Machine-op-inspct,,<=50K,United-States,Not-in-family,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,211678,40,White
0.0,"Map(vectorType -> sparse, length -> 60, indices -> List(0, 7, 18, 23, 40, 44, 45, 54, 55, 56, 59), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 79.0, 124744.0, 10.0, 20.0))",0,79,10,indtypeml__dll,0034V00002ZUolfQAD,Married-civ-spouse,Private,Some-college,Prof-specialty,,<=50K,United-States,Other-relative,0,s3dtc,Male,storage_direct_20210603-051037_c7390dd6-8b02-4d3a-b3de-43cee98bc936,124744,20,White


In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
### Simialr to sklearn test_train_split
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
trainingData.count()
testData.count()

In [0]:
from pyspark.ml.classification import LogisticRegression
regressor = LogisticRegression(labelCol="label", featuresCol="features", maxIter=50)
regressorModel = regressor.fit(trainingData)

In [0]:
predictions = regressorModel.transform(testData)

In [0]:
predictions.printSchema()

In [0]:
selected = predictions.select("label", "prediction", "probability", "age__c", "occupation__c")
display(selected)

label,prediction,probability,age__c,occupation__c
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(4.2889883441529015E-12, 0.999999999995711))",41,Adm-clerical
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",18,Other-service
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",32,Machine-op-inspct
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",39,Exec-managerial
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",53,Sales
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9947067603985947, 0.005293239601405264))",49,Adm-clerical
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(3.002746657901286E-15, 0.999999999999997))",19,Adm-clerical
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(1.0, 0.0))",23,Machine-op-inspct
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9999999954423282, 4.557671795524243E-9))",37,Craft-repair
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.9999999877995503, 1.2200449650556777E-8))",24,Tech-support


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [0]:
evaluator.getMetricName()

In [0]:
regressor.explainParams()


In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation. In Sklearn gridsearchCV is available for this 
paramGrid = (ParamGridBuilder()
             .addGrid(regressor.regParam, [0.01, 0.5, 2.0])
             .addGrid(regressor.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(regressor.maxIter, [1, 5, 10])
             .build())

In [0]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=regressor, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [0]:
predictions = cvModel.transform(testData)


In [0]:
evaluator.evaluate(predictions)

In [0]:
'Model Intercept: ', cvModel.bestModel.intercept

In [0]:
selected = predictions.select("label", "prediction", "probability", "age__c", "occupation__c")


In [0]:
display(selected)


label,prediction,probability,age__c,occupation__c
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5366920838401095, 0.46330791615989053))",41,Adm-clerical
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6808379287883379, 0.31916207121166207))",18,Other-service
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.664230989495714, 0.335769010504286))",32,Machine-op-inspct
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.5266299808748713, 0.47337001912512866))",39,Exec-managerial
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6257257239558041, 0.37427427604419594))",53,Sales
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6163982388834882, 0.3836017611165118))",49,Adm-clerical
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6161962596166497, 0.3838037403833503))",19,Adm-clerical
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6455311826593393, 0.3544688173406607))",23,Machine-op-inspct
0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.592501973911947, 0.40749802608805297))",37,Craft-repair
0.0,1.0,"Map(vectorType -> dense, length -> 2, values -> List(0.4868144749387218, 0.5131855250612782))",24,Tech-support
