In [1]:
%matplotlib inline
import wrangle
from pydataset import data

import pyspark
import pyspark.ml
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
df = wrangle.wrangle_311(spark)

[wrangle.py] reading case.csv
[wrangle.py] handling data types
[wrangle.py] parsing dates
[wrangle.py] adding features
[wrangle.py] joining departments


# Use the .randomSplit method to split the 311 data into training and test sets.

In [3]:
train, test = df.randomSplit([0.8, 0.2], seed=123)

In [4]:
def shape(df: pyspark.sql.DataFrame):
    return df.count(), len(df.columns)

In [5]:
shape(train)

(672840, 20)

In [6]:
shape(test)

(168864, 20)

In [7]:
train.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'num_weeks_late',
 'zipcode',
 'case_age',
 'days_to_closed',
 'case_lifetime',
 'department',
 'dept_subject_to_SLA']

# Create a classification model to predict whether a case will be late or not (i.e. predict case_late). Experiment with different combinations of features and different classification algorithms.

In [8]:
rf = pyspark.ml.feature.RFormula(formula="case_late ~ department").fit(train)
rf

rf.transform(train).show(5)

+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+------------------+-----------+---------+--------------------+----------------+-------------------+-------+--------+--------------+-------------+--------------------+-------------------+-------------+-----+
|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|      num_days_late|case_closed|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|     num_weeks_late|zipcode|case_age|days_to_closed|case_lifetime|          department|dept_subject_to_SLA|     features|label|
+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+------------------+-----------+---------+--------------------+----------------+-------------------+-------+--------+--------------+-------------+--------------------+---------------

In [9]:
train_input = rf.transform(train).select('features', 'label')
train_input.show(5)

+-------------+-----+
|     features|label|
+-------------+-----+
|(7,[2],[1.0])|  0.0|
|(7,[2],[1.0])|  1.0|
|(7,[3],[1.0])|  0.0|
|(7,[0],[1.0])|  0.0|
|(7,[0],[1.0])|  0.0|
+-------------+-----+
only showing top 5 rows



In [10]:
lr = pyspark.ml.classification.LogisticRegression()
lr

LogisticRegression_559cfc766544

In [11]:
lr_fit = lr.fit(train_input)
lr_fit.transform(train_input).show(5)

+-------------+-----+--------------------+--------------------+----------+
|     features|label|       rawPrediction|         probability|prediction|
+-------------+-----+--------------------+--------------------+----------+
|(7,[2],[1.0])|  0.0|[1.39705845806300...|[0.80171669533291...|       0.0|
|(7,[2],[1.0])|  1.0|[1.39705845806300...|[0.80171669533291...|       0.0|
|(7,[3],[1.0])|  0.0|[2.80807436948043...|[0.94311059144247...|       0.0|
|(7,[0],[1.0])|  0.0|[2.41904045591730...|[0.91826775765355...|       0.0|
|(7,[0],[1.0])|  0.0|[2.41904045591730...|[0.91826775765355...|       0.0|
+-------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



In [12]:
[x for x in dir(lr_fit.summary) if not x.startswith('_')]

['accuracy',
 'areaUnderROC',
 'fMeasureByLabel',
 'fMeasureByThreshold',
 'falsePositiveRateByLabel',
 'featuresCol',
 'labelCol',
 'labels',
 'objectiveHistory',
 'pr',
 'precisionByLabel',
 'precisionByThreshold',
 'predictionCol',
 'predictions',
 'probabilityCol',
 'recallByLabel',
 'recallByThreshold',
 'roc',
 'totalIterations',
 'truePositiveRateByLabel',
 'weightedFMeasure',
 'weightedFalsePositiveRate',
 'weightedPrecision',
 'weightedRecall',
 'weightedTruePositiveRate']

In [13]:
# area under TPR (recall) vs FPR (FP / (FP + TN)) curve
# https://en.wikipedia.org/wiki/Receiver_operating_characteristic
lr_fit.summary.areaUnderROC

0.6225099331762476

In [14]:
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(lr_fit.transform(rf.transform(test)))
test_auc

0.6276238295880469

# Create a regression model to predict how many days late a case will be (i.e. predict num_days_late). Experiment with different combinations of features and different regression algorithms.

In [17]:
rf = pyspark.ml.feature.RFormula(formula="num_days_late ~ department").fit(train)
rf

rf.transform(train).show(5)

+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+------------------+-----------+---------+--------------------+----------------+-------------------+-------+--------+--------------+-------------+--------------------+-------------------+-------------+-------------------+
|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|      num_days_late|case_closed|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|     num_weeks_late|zipcode|case_age|days_to_closed|case_lifetime|          department|dept_subject_to_SLA|     features|              label|
+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+------------------+-----------+---------+--------------------+----------------+-------------------+-------+--------+--------------+-------------+--------

In [18]:
lr = pyspark.ml.regression.LinearRegression()
lr

LinearRegression_971c5431db60

In [20]:
lr_fit = lr.fit(train_input)
lr_fit.transform(train_input).show(5)

+-------------+-----+--------------------+
|     features|label|          prediction|
+-------------+-----+--------------------+
|(7,[2],[1.0])|  0.0| 0.19828325260405505|
|(7,[2],[1.0])|  1.0| 0.19828325260405505|
|(7,[3],[1.0])|  0.0|0.056889420700777786|
|(7,[0],[1.0])|  0.0| 0.08173245392037004|
|(7,[0],[1.0])|  0.0| 0.08173245392037004|
+-------------+-----+--------------------+
only showing top 5 rows



In [21]:
test_input = rf.transform(test)
lr_fit.transform(test_input).show(4)

+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+---------+--------------------+----------------+--------------------+-------+--------+--------------+-------------+--------------------+-------------------+-------------+-------------------+--------------------+
|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|      num_days_late|case_closed|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|      num_weeks_late|zipcode|case_age|days_to_closed|case_lifetime|          department|dept_subject_to_SLA|     features|              label|          prediction|
+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+---------+--------------------+----------------+--------------------+-------+--------+-------------

In [22]:
evaluator = pyspark.ml.evaluation.RegressionEvaluator()
rmse = evaluator.evaluate(lr_fit.transform(test_input))
rmse

Py4JJavaError: An error occurred while calling o634.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 198.0 failed 1 times, most recent failure: Lost task 13.0 in stage 198.0 (TID 1508, localhost, executor driver): scala.MatchError: [-2.974418726197174E-13,null] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.evaluation.RegressionEvaluator$$anonfun$1.apply(RegressionEvaluator.scala:83)
	at org.apache.spark.ml.evaluation.RegressionEvaluator$$anonfun$1.apply(RegressionEvaluator.scala:83)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$26.apply(RDD.scala:1190)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$26.apply(RDD.scala:1190)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$27.apply(RDD.scala:1191)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$27.apply(RDD.scala:1191)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1143)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1206)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1182)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary$lzycompute(RegressionMetrics.scala:57)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.summary(RegressionMetrics.scala:54)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr$lzycompute(RegressionMetrics.scala:65)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.SSerr(RegressionMetrics.scala:65)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.meanSquaredError(RegressionMetrics.scala:100)
	at org.apache.spark.mllib.evaluation.RegressionMetrics.rootMeanSquaredError(RegressionMetrics.scala:109)
	at org.apache.spark.ml.evaluation.RegressionEvaluator.evaluate(RegressionEvaluator.scala:86)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: scala.MatchError: [-2.974418726197174E-13,null] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.evaluation.RegressionEvaluator$$anonfun$1.apply(RegressionEvaluator.scala:83)
	at org.apache.spark.ml.evaluation.RegressionEvaluator$$anonfun$1.apply(RegressionEvaluator.scala:83)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$26.apply(RDD.scala:1190)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$26.apply(RDD.scala:1190)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$27.apply(RDD.scala:1191)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$27.apply(RDD.scala:1191)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
