### PySpark-LogisticRegression
1. use bank.csv data to do binary classification

In [1]:
# Install pyspark and findspark
!pip install --ignore-install -q pyspark
# Install findspark library
!pip install --ignore-install -q findspark


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m604.9 kB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# Import findspark
import findspark
findspark.init()

### 1.  Set up SparkSession

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
            .builder \
            .appName("Python Spark Logistic Regression example") \
            .getOrCreate()

In [4]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


### 2. Load dataset

In [6]:
df = spark.read.format('com.databricks.spark.csv') \
            .options(header='true', inferschema='true') \
            .load("/content/drive/MyDrive/bank.csv",header=True);
df.drop('day','month','poutcome').show(5)

+---+-----------+-------+---------+-------+-------+-------+----+--------+--------+--------+-----+--------+---+
|age|        job|marital|education|default|balance|housing|loan| contact|duration|campaign|pdays|previous|  y|
+---+-----------+-------+---------+-------+-------+-------+----+--------+--------+--------+-----+--------+---+
| 30| unemployed|married|  primary|     no|   1787|     no|  no|cellular|      79|       1|   -1|       0|  0|
| 33|   services|married|secondary|     no|   4789|    yes| yes|cellular|     220|       1|  339|       4|  0|
| 35| management| single| tertiary|     no|   1350|    yes|  no|cellular|     185|       1|  330|       1|  0|
| 30| management|married| tertiary|     no|   1476|    yes| yes| unknown|     199|       4|   -1|       0|  0|
| 59|blue-collar|married|secondary|     no|      0|    yes|  no| unknown|     226|       1|   -1|       0|  0|
+---+-----------+-------+---------+-------+-------+-------+----+--------+--------+--------+-----+--------+---+
o

In [7]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: integer (nullable = true)



In [8]:
def get_dummy(df,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data.select('features','label')


In [9]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator



### 3. Deal with categorical data and convert the data to dense vector

In [10]:
catcols = ['job','marital','education','default',
           'housing','loan','contact','poutcome']

num_cols = ['balance', 'duration','campaign','pdays','previous',]
labelCol = 'y'

data = get_dummy(df,catcols,num_cols,labelCol)
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(29,[8,11,15,16,1...|    0|
|(29,[4,11,13,16,1...|    0|
|(29,[0,12,14,16,1...|    0|
|(29,[0,11,14,16,1...|    0|
|(29,[1,11,13,16,1...|    0|
+--------------------+-----+
only showing top 5 rows



### 4. Deal with categorical label and variable

In [11]:
from pyspark.ml.feature import StringIndexer
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(5, True)

+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|(29,[8,11,15,16,1...|    0|         0.0|
|(29,[4,11,13,16,1...|    0|         0.0|
|(29,[0,12,14,16,1...|    0|         0.0|
|(29,[0,11,14,16,1...|    0|         0.0|
|(29,[1,11,13,16,1...|    0|         0.0|
+--------------------+-----+------------+
only showing top 5 rows



In [12]:
from pyspark.ml.feature import VectorIndexer
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                                  outputCol="indexedFeatures", \
                                  maxCategories=4).fit(data)
featureIndexer.transform(data).show(5, True)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(29,[8,11,15,16,1...|    0|(29,[8,11,15,16,1...|
|(29,[4,11,13,16,1...|    0|(29,[4,11,13,16,1...|
|(29,[0,12,14,16,1...|    0|(29,[0,12,14,16,1...|
|(29,[0,11,14,16,1...|    0|(29,[0,11,14,16,1...|
|(29,[1,11,13,16,1...|    0|(29,[1,11,13,16,1...|
+--------------------+-----+--------------------+
only showing top 5 rows



### 5. Split the data to training and test data sets

In [13]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

trainingData.show(5,False)
testData.show(5,False)

+------------------------------------------------------------------------------------------------+-----+
|features                                                                                        |label|
+------------------------------------------------------------------------------------------------+-----+
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-588.0,81.0,4.0,-1.0])|0    |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,-105.0,60.0,2.0,-1.0])|0    |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,117.0,635.0,1.0,-1.0])|0    |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,238.0,808.0,1.0,-1.0])|0    |
|(29,[0,11,13,16,17,18,19,21,24,25,26,27],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,407.0,145.0,2.0,-1.0])|0    |
+------------------------------------------------------------------------------------------------+-----+
only showing top 5 rows

+-----------------------------

### 6. Fit logistic Regression Model

In [14]:
from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol='indexedFeatures', labelCol='indexedLabel')

### 7. Create Pipeline

In [15]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [16]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr,labelConverter])

In [17]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

### 8. Make predictions

In [18]:
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|(29,[0,11,13,16,1...|    0|             0|
|(29,[0,11,13,16,1...|    0|             0|
|(29,[0,11,13,16,1...|    0|             0|
|(29,[0,11,13,16,1...|    0|             0|
|(29,[0,11,13,16,1...|    0|             0|
+--------------------+-----+--------------+
only showing top 5 rows



In [19]:
predictions.select('label').distinct().collect()

[Row(label=1), Row(label=0)]

### 9. Evaluation

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.0941112


In [21]:
lrModel = model.stages[2]
trainingSummary = lrModel.summary

# Obtain the objective per iteration
# objectiveHistory = trainingSummary.objectiveHistory
# print("objectiveHistory:")
# for objective in objectiveHistory:
#     print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show(5)
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head(5)
# bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
#     .select('threshold').head()['threshold']
# lr.setThreshold(bestThreshold)

+--------------------+--------------------+
|                 FPR|                 TPR|
+--------------------+--------------------+
|                 0.0|                 0.0|
|4.187604690117253E-4|0.003164556962025...|
|4.187604690117253E-4| 0.00949367088607595|
|8.375209380234506E-4|0.012658227848101266|
|8.375209380234506E-4|  0.0189873417721519|
+--------------------+--------------------+
only showing top 5 rows

areaUnderROC: 0.8765981807773043


In [22]:
spark.stop()