In [27]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [18]:
#https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
data_train = '/home/yang/toyData/adult_train.csv'
data_test = '/home/yang/toyData/adult_test.csv'
path = '/home/yang/derby.log'
df.withColumn('c1', when(df.c1.isNotNull(), 1).otherwise(0))
  .withColumn('c2', when(df.c2.isNotNull(), 1).otherwise(0))
  .withColumn('c3', when(df.c3.isNotNull(), 1).otherwise(0))

In [3]:
text_file = sc.textFile(path)

In [60]:
schema = StructType([
    StructField("age",IntegerType()),
    StructField("workclass", StringType()),
    StructField("fnlwgt", DoubleType()),
    StructField("education", StringType()),
    StructField("education-num",IntegerType()),
    StructField("marital-status", StringType()),
    StructField("occupation", StringType()),
    StructField("relationship", StringType()),
    StructField("race", StringType()),
    StructField("sex", StringType()),
    StructField("capital-gain", DoubleType()),
    StructField("capital-loss", DoubleType()),
    StructField("hours-per-week", DoubleType()),
    StructField("native-country", StringType()),
    StructField("class", StringType())
])
df = spark.read.csv(data_train, header=True, mode="DROPMALFORMED", schema=schema)

#na.fill({'age': 50, 'name': 'unknown'}
print df.count()
df = df.dropna()
print df.count()
df.show(1)
cols = df.columns

32561
30162
+---+----------+-------+----------+-------------+--------------+-------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|age| workclass| fnlwgt| education|education-num|marital-status|   occupation|  relationship|  race|  sex|capital-gain|capital-loss|hours-per-week|native-country| class|
+---+----------+-------+----------+-------------+--------------+-------------+--------------+------+-----+------------+------------+--------------+--------------+------+
| 39| State-gov|77516.0| Bachelors|           13| Never-married| Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|          40.0| United-States| <=50K|
+---+----------+-------+----------+-------------+--------------+-------------+--------------+------+-----+------------+------------+--------------+--------------+------+
only showing top 1 row



In [56]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["native-country","sex","race","relationship","occupation","marital-status","education","workclass"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [57]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "class", outputCol = "label")
stages += [label_stringIdx]

In [58]:
numericCols = ["fnlwgt","hours-per-week","capital-loss","capital-gain", "education-num","age"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [63]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
print selectedcols
dataset = dataset.select(selectedcols)
print dataset.show(1)

['label', 'features', 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'class']
+-----+--------------------+---+----------+-------+----------+-------------+--------------+-------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|label|            features|age| workclass| fnlwgt| education|education-num|marital-status|   occupation|  relationship|  race|  sex|capital-gain|capital-loss|hours-per-week|native-country| class|
+-----+--------------------+---+----------+-------+----------+-------------+--------------+-------------+--------------+------+-----+------------+------------+--------------+--------------+------+
|  0.0|(96,[0,40,41,46,5...| 39| State-gov|77516.0| Bachelors|           13| Never-married| Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|          40.0| United-State

In [64]:
print dataset.columns

['label', 'features', 'age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'class']


In [65]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print trainingData.count()
print testData.count()

21160
9002


In [66]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [67]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [None]:

model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")

In [74]:
lrModel.summary.show()

AttributeError: 'BinaryLogisticRegressionTrainingSummary' object has no attribute 'show'

In [69]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "probability", "age", "occupation")
selected.show(4)

+-----+----------+--------------------+---+---------------+
|label|prediction|         probability|age|     occupation|
+-----+----------+--------------------+---+---------------+
|  0.0|       0.0|[0.66279954980149...| 32| Prof-specialty|
|  0.0|       0.0|[0.69474761254951...| 26| Prof-specialty|
|  0.0|       0.0|[0.66205880188559...| 31| Prof-specialty|
|  0.0|       0.0|[0.54759540543821...| 47| Prof-specialty|
+-----+----------+--------------------+---+---------------+
only showing top 4 rows



In [77]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print evaluator.evaluate(predictions)
evaluator.getMetricName()

0.901204168212


'areaUnderROC'

In [76]:
print lr.explainParams()

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: label)
maxIter: max number of iterations (>= 0). (default: 100, current: 10)
predictionCol: prediction column name. (default: prediction)
probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)
rawPredictionCol: raw p

In [112]:
prediction = predictions.select('label', 'prediction')
resull_row = prediction.rdd.collect()
result_dict = map(lambda c: c.asDict(), resull_row)
#result = map(lambda c: c, result_dict)
result = sc.parallelize([tuple(d.values()) for d in result_dict])


In [113]:
from pyspark.mllib.evaluation import MulticlassMetrics

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print 'Precision of True ', metrics.precision(1)
    print 'Precision of False', metrics.precision(0)
    print 'Recall of True    ', metrics.recall(1)
    print 'Recall of False   ', metrics.recall(0)
    print 'F-1 Score         ', metrics.fMeasure()
    print 'Confusion Matrix\n', metrics.confusionMatrix().toArray()

printMetrics(result)

Precision of True  0.730379071009
Precision of False 0.872773179969
Recall of True     0.601318681319
Recall of False    0.924929389029
F-1 Score          0.843145967563
Confusion Matrix




[[ 6222.   505.]
 [  907.  1368.]]


In [114]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [115]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

0.9007463771548666

In [117]:
print 'Model Intercept: ', cvModel.bestModel.intercept

Model Intercept:  -1.33815361249


In [119]:
weights = cvModel.bestModel.coefficients
# on Spark 2.X weights are available as ceofficients
# weights = cvModel.bestModel.coefficients
weights = map(lambda w: (float(w),), weights)  # convert numpy type to float, and to tuple
print weights
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
weightsDF.show(10)

[(-0.8735867181722526,), (-1.4715388170278123,), (-0.6258818196179033,), (-0.613905948870067,), (-1.4752616506231644,), (-0.8367092445608016,), (-1.8467401412166615,), (-1.1795819016777898,), (-0.646673098855344,), (-0.8692149014946261,), (-1.1090133115006933,), (-1.8887448632479413,), (-1.2865960236273457,), (-0.39384178799897623,), (-2.116383796353025,), (-1.4209921718510272,), (-1.1281140773015748,), (-0.3473857953741246,), (-0.7071060213873145,), (-2.9203423813399834,), (-0.721663868594423,), (-0.8126116326269156,), (-1.1968075526279958,), (-0.6863103410389289,), (-1.0589023976670786,), (-1.4904023523235366,), (-0.47961952122367724,), (-0.12895622389256026,), (-1.5529272078183434,), (-0.5268365742057032,), (-1.0524341838484204,), (-1.4210045644120506,), (-0.8701369403870748,), (-0.5396424866221428,), (-1.3563644337503775,), (-0.48091363482437494,), (-2.4798542614764223,), (-0.8248301683596001,), (-1.3092246838212545,), (-1.5920813357464134,), (0.4106521682256246,), (-0.547891637154

In [120]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)


In [121]:

evaluator.evaluate(predictions)

0.8867171656944173