#Logistic Regression on the London Fire Brigade dataset

In this exercise you will apply Logistic Regression on a real dataset provided by the London Fire Brigade (See https://data.london.gov.uk/dataset/london-fire-brigade-incident-records). This dataset can be downloaded as 3 csv files (data from 2009 to 2012, 2012 to 2016 and January 2017) and contains information regarding incidents (mainly fire incidents) in the city of London from 2009 until today. The goal is to split this dataset into training and testing data and to use logistic regression to predict whether a new incident (from the test data) will be likely a true or a false alarm.

You will have to choose yourself the apropriate features in order to get a good accuracy (at least 70%).

First, upload to the cluster the file that we already provide (and then replace the path to it in the code below):
* LFB_Incident_data_from_January_2017.csv

Note: In this exercise we will only work with the smallest of the three of files in order to minimze storage requirements on Databricks.

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import functions as F
import hashlib
from pyspark.ml.feature import StringIndexer
from datetime import datetime
from pyspark.sql.functions import UserDefinedFunction, col
from pyspark.sql.types import IntegerType

# TODO: REPLACE PATHS with the ones from your csv upload
df = spark.read.csv("/FileStore/tables/lehl/LFB_Incident_data_from_January_2017.csv", sep=",", header=True)

In [3]:
#How many entries are we going to test?
print("Total number of rows parsed from csv files: ", df.count())

Start by looking at the contents of a few elements in the dataset by using the *display(df)* call:

In [5]:
display(df)

IncidentNumber,DateOfCall,TimeOfCall,IncidentGroup,StopCodeDescription,SpecialServiceType,PropertyCategory,PropertyType,AddressQualifier,Postcode_full,Postcode_district,IncGeo_BoroughCode,IncGeo_BoroughName,IncGeo_WardCode,IncGeo_WardName,Easting_m,Northing_m,Easting_rounded,Northing_rounded,FRS,IncidentStationGround,FirstPumpArriving_AttendanceTime,FirstPumpArriving_DeployedFromStation,SecondPumpArriving_AttendanceTime,SecondPumpArriving_DeployedFromStation,NumStationsWithPumpsAttending,NumPumpsAttending
000003-01012017,01.Jan.17,00:04:27,Special Service,Special Service,Lift Release,Dwelling,Purpose Built Flats/Maisonettes - 10 or more storeys,Within same building,,E9,E09000012,HACKNEY,E05009379,KING'S PARK,,,536650,185450,London,Homerton,,,,,1.0,1.0
000004-01012017,01.Jan.17,00:06:18,False Alarm,AFA,,Non Residential,Engineering manufacturing plant,Nearby address - street not listed in gazetteer,SE2 9SG,SE2,E09000011,GREENWICH,E05000214,ABBEY WOOD,547178.0,179210.0,547150,179250,London,Plumstead,835.0,Plumstead,,,1.0,1.0
000005-01012017,01.Jan.17,00:06:34,Special Service,Special Service,Effecting entry/exit,Dwelling,Purpose Built Flats/Maisonettes - Up to 3 storeys,Correct incident location,,N1,E09000019,ISLINGTON,E05000366,BARNSBURY,,,530750,183050,London,Euston,237.0,Euston,,,1.0,1.0
000006-01012017,01.Jan.17,00:07:57,Fire,Primary Fire,,Non Residential,Restaurant/cafe,Correct incident location,IG11 8TB,IG11,E09000002,BARKING AND DAGENHAM,E05000026,ABBEY,544654.0,184596.0,544650,184550,London,Barking,282.0,Barking,,,1.0,2.0
000007-01012017,01.Jan.17,00:08:59,Special Service,Special Service,Effecting entry/exit,Dwelling,Purpose Built Flats/Maisonettes - 10 or more storeys,Correct incident location,,SW8,E09000022,LAMBETH,E05000426,OVAL,,,530150,177950,London,Lambeth,,,,,1.0,1.0
000008-01012017,01.Jan.17,00:12:44,Fire,Secondary Fire,,Outdoor,Loose refuse,On land associated with building,UB6 8LY,UB6,E09000009,EALING,E05000178,GREENFORD GREEN,514211.0,183917.0,514250,183950,London,Northolt,415.0,Northolt,420.0,Northolt,1.0,2.0
000009-01012017,01.Jan.17,00:13:27,False Alarm,AFA,,Non Residential,Purpose built office,Correct incident location,EC1Y 8LZ,EC1Y,E09000019,ISLINGTON,E05000367,BUNHILL,532603.0,182151.0,532650,182150,London,Shoreditch,230.0,Shoreditch,,,1.0,1.0
000011-01012017,01.Jan.17,00:17:35,False Alarm,AFA,,Dwelling,Purpose Built Flats/Maisonettes - Up to 3 storeys,Within same building,,HA1,E09000005,BRENT,E05000096,NORTHWICK PARK,,,516750,187450,London,Wembley,421.0,Wembley,442.0,Wembley,1.0,2.0
000013-01012017,01.Jan.17,00:20:01,Fire,Primary Fire,,Outdoor Structure,Other outdoor structures,On land associated with building,IG5 0AZ,IG5,E09000026,REDBRIDGE,E05000496,BARKINGSIDE,542787.0,188990.0,542750,188950,London,Ilford,415.0,Ilford,882.0,Woodford,2.0,2.0
000015-01012017,01.Jan.17,00:22:37,Special Service,Special Service,Lift Release,Dwelling,Purpose Built Flats/Maisonettes - 10 or more storeys,Within same building,,E9,E09000012,HACKNEY,E05009379,KING'S PARK,,,536650,185450,London,Homerton,,,,,1.0,1.0


# Feature Selection
Perhaps the most important part for making good predictions is to select the right features for the regression model.

Which features do you think could be useful for predictions about False Alarms in our case? For example, think about the following:
* location: false alarms might occur more frequently at specific addresses
* time-related properties: some hours in the day might have peaks of false alarms
* categories: outdoor fires might turn out to be false alarms more often than others

Add to the set below the names of the columns from the dataframe that you think are relevant. The following steps will compute the accuracy of your model, so remember to re-run every step from this one on (e.g. by hitting "Run All" at the top of this page) whenever you change the features to improve the model.

In [7]:
features = ["Postcode_full", "TODO", ....]   

One of the first steps required for most machine learning algorithms is pre-processing the data. For our case, we want to filter out rows where feature values are null. For this use the following code:

In [9]:
# drop rows where feature values are null (can't have this for fitting)
print "Filtering out null values..."
filteredDf = df.na.drop(subset=features)
print "Now remaining: ", filteredDf.count()

For experimenting, you might want to start by using just a few of the entries in the dataset. You can use the *limit* API for this.

In [11]:
# TODO: uncomment the following lines to limit the number of entries, e.g. to 1000
# print "Keeping only first N rows..."
# df = df.limit(...)

Then, you need to provide a column that gives the label (True or False alarm) for each row in the dataset. For this, you need to explicitly add to our dataframe a new column called *label*. You can do this using the *withColumn* API as shown below.

For the London Fire dataset, there already exists a column that indicates if an incident is a true or a false alarm. Try to find out which one and then use the *F.when* API to construct the label one. Essentially, the *label* column should contain only the value 0.0 (false alarm) or 1.0 (true alarm).

In [13]:
# map named labels to numerical
# TODO: find the column name that contains entries of type "False Alarm"
# TODO: fill the result of the condition with the appropiate numbers
filteredDf = filteredDf.withColumn("label", F.when(df.REPLACE_COLUMN_NAME=="False Alarm", REPLACE_VALUE_HERE).otherwise(REPLACE_VALUE_HERE))

#Logistic Regression with String columns
Logistic Regression can only be used with numerical features, therefore we need to first index columns that are of String type in order to be able to use them. For this purpose, use a StringIndexer for every String feature, as in the example below:

In [15]:
# This line is an example of how to construct a string indexer on the Postcode_district column:
# indexer_ZipCode = StringIndexer(inputCol="Postcode_district", outputCol="Postcode_district_indexed")
# To apply the transformation, you can use the following:
# df = indexer_ZipCode.fit(df).transform(df).drop("Postcode_district")

#here, we give the indexed feature columns some new names:
indexedFeatures = []
for featureCol in features:
    indexedFeatures.append(featureCol + "_indexed")
    
df_withIndexedFeatures = filteredDf
    
#TODO: do this for all features you defined earlier
for featureCol in features:
  indexer = StringIndexer(inputCol=REPLACE_VARIABLE_HERE, outputCol=featureCol + "_indexed")
  df_withIndexedFeatures = TODO

How does the dataset look like now? Look at the first 2-3 examples. The indexed columns should appear last.

In [17]:
#TODO: display the first rows of the dataset
display(df_withIndexedFeatures)

# Time Features

It might be useful to use time features in the Logistic Regression as well. 

For example, you can look at the day of the week when the incident was received. For this we need to transform the DateOfCall column into the equivalent DayOfWeek index. 

You can write a UserDefinedFunction (UDF, see a tutorial for these here: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-udfs.html), which essentially applies a transformation on each value of a column and returns a column of a new (UserDefined) type. In our case, we want to process the DateOfCall column and return an Integer.

First, define the function that takes a String (in our case, this will be the DateOfCall) and returns the index of the day in the week for that date. This is just a regular Python function that we will use later:

In [19]:
### User defined function that takes a String describing a date, e.g. 27.Jan.09 or 27-Jan-09 and returns
#   the index of that day in the week (e.g. Monday = 1) using a datetime object in Python
def dayOfWeek(text):
  for fmt in ('%d.%b.%y', '%d-%b-%y'):
        try:
            return datetime.strptime(text, fmt).weekday()
        except ValueError:
            pass
  raise ValueError('no valid date format found')

Then, use it to return a new column, of type Integer. Name this new column DayOfWeek.

In [21]:
#TODO: replace with the correct function name
udf_getDayOfWeek = F.udf(REPLACE_FUNCTION_NAME, IntegerType())

#TODO: name the resulting column as described above
df_withIndexedFeatures = df_withIndexedFeatures.withColumn(TODO, udf_getDayOfWeek(col("TimeOfCall")))

Let's also use the HourOfDay. Pyspark functions (see all here: https://spark.apache.org/docs/1.5.2/api/python/_modules/pyspark/sql/functions.html) already contain a method that can compute the hour of day from a time value, so we don't need to write our own UDF for this. Hint: look at the *hour* function in the provided link. You can use it here with *F.hour()*:

In [23]:
#TODO: find the column to use for extracting the hour of day and give the resulting column an appropriate name
df_withIndexedFeatures = df_withIndexedFeatures.withColumn(TODO, F.hour(col(REPLACE_COLUMN_NAME)))

Finally, select only the feature columns we created so far, split the train and test data and run the algorithm.

In [25]:
#TODO: add here all the fields that you want to use and select only those from the dataframe. Don't forget the label column!
finalFeaturesList = []
finalFeaturesList.append("label")
# TODO: extend the features list with all the entries in indexedFeatures. You can also append the time columns we defined above.

# keep only these features in training the model
featureVectorDf = df_withIndexedFeatures.select(finalFeaturesList)

# Split into training and test data
# TODO: define the fraction of training and test data
training, testing = featureVectorDf.randomSplit([TRAINING_FRACTION_TODO, TESTING_FRACTION_TODO], seed=42)

# Configure an ML pipeline, which consists of two stages: feature assembler and lr.
# Transform n feature vectors into one single vector column
assembler = VectorAssembler(inputCols=training.columns[1:], outputCol='features')
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[assembler, lr])
#print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"

# predict 
model = pipeline.fit(training)
prediction = model.transform(testing)

#print "prediction-schema: ", prediction.printSchema()
selected = prediction.select("features", "label", "probability", "prediction")

# Caculate prediction accuracy
numRows = selected.count()
print "Total Number of Predictions: " + str(numRows)

# to compute the number of correct predictions, 
# TODO: select all where the prediction matches the label and count
correct = selected.filter(TODO_CONDITION).count()

# the accuracy is between 0 and 1, convert the variable 'correct' to a float to avoid the division returning an int (0 or 1)
accuracy = float(correct) / selected.count()
print "Accuracy: ", accuracy