In [1]:
### Import libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.context import SparkContext, SparkConf


# ### Configure Spark
app_name = 'HPC Project'

### PySpark session initialization
conf = SparkConf().setAppName(app_name).set("spark.executor.cores", "8")

sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
# print(sc._conf.getAll())  # Get all the configuration parameters info


### Load the source data
csv = spark.read.csv('bank_1g.csv', inferSchema=True, header=True, sep=',')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,application_1548666752303_0022,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
### Select features and label
data = csv.select(*(csv.columns[:-1]+ [((col("y")).cast("Int").alias("label"))]))
# print(data)


### Split the data and rename Y column
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")


### Define the pipeline
assembler = VectorAssembler(inputCols = data.columns[:-1], outputCol="features")
print("Input Columns: ", assembler.getInputCols())
print("Output Column: ", assembler.getOutputCol())

algorithm = LogisticRegression(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[assembler, algorithm])

('Input Columns: ', ['age', 'default', 'balance', 'housing', 'loan', 'duration', 'campaign', 'pdays', 'previous', 'job_admin', 'job_blue-collar', 'job_entrepreneur', 'job_housemaid', 'job_management', 'job_retired', 'job_self-employed', 'job_services', 'job_student', 'job_technician', 'job_unemployed', 'job_unknown', 'marital_divorced', 'marital_married', 'marital_single', 'education_primary', 'education_secondary', 'education_tertiary', 'education_unknown', 'contact_cellular', 'contact_telephone', 'contact_unknown', 'day_1', 'day_2', 'day_3', 'day_4', 'day_5', 'day_6', 'day_7', 'day_8', 'day_9', 'day_10', 'day_11', 'day_12', 'day_13', 'day_14', 'day_15', 'day_16', 'day_17', 'day_18', 'day_19', 'day_20', 'day_21', 'day_22', 'day_23', 'day_24', 'day_25', 'day_26', 'day_27', 'day_28', 'day_29', 'day_30', 'day_31', 'month_apr', 'month_aug', 'month_dec', 'month_feb', 'month_jan', 'month_jul', 'month_jun', 'month_mar', 'month_may', 'month_nov', 'month_oct', 'month_sep', 'poutcome_failure', 

In [3]:
### Tune Parameters
lr_reg_params = [0.01, 0.5, 2.0]
lr_elasticnet_param = [0.0, 0.5, 1.0]
lr_max_iter = [1,5,10]


### CrossValidation
folds = 2
parallelism = 3

evaluator=BinaryClassificationEvaluator()
paramGrid = ParamGridBuilder().addGrid(algorithm.regParam, lr_reg_params).addGrid(algorithm.maxIter, lr_max_iter).addGrid(algorithm.elasticNetParam, lr_elasticnet_param).build()

cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=folds).setParallelism(parallelism)

#cv = DagCrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, parallelism=parallelism)

In [4]:
#### Training
import time
tic = time.time()

model = cv.fit(train)

toc = time.time()
print("Elapsed time ", toc-tic)

KeyboardInterrupt: 

In [5]:
### Test the Model
prediction = model.transform(test)
predicted = prediction.select("features", "prediction", "probability", "trueLabel")
# print(*predicted.select('prediction', 'trueLabel').collect(), sep='\n')
print('true positives:', predicted.filter('trueLabel == 1').count())
print('true negatives:', predicted.filter('trueLabel == 0').count())

An error was encountered:
Session 13 did not reach idle status in time. Current status is busy.


In [6]:
### Compute Confusion Matrix Metrics
tp = float(predicted.filter("prediction == 1.0 AND trueLabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND trueLabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND trueLabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND trueLabel == 1").count())
precision = tp / (tp + fp)
recall = tp / (tp + fn)
correctly_classified = (tp+tn) / predicted.count()
F1 = 2 * (precision * recall) / (precision + recall)

metrics = spark.createDataFrame([
					("TP", tp),
					("FP", fp),
					("TN", tn),
					("FN", fn),
					("Precision", precision),
					("Recall", recall),
					("Correctly Classified", correctly_classified),
                    ("F1", F1)
				],
				["metric", "value"])

### Print Results
metrics.show()

An error was encountered:
Session 13 did not reach idle status in time. Current status is busy.
