# Will analyse the data of Northern Ireland

## Description

The received data will be present in a "data" folder, which will contain only the necessary ranges that should be analysed
The collected data is from https://data.police.uk/data/

# Import

In [10]:
import org.apache.spark.sql.functions.input_file_name
import java.io.File
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{collect_list, map, udf}

import org.apache.spark.sql.functions.input_file_name
import java.io.File
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{collect_list, map, udf}


# Ingest

In [11]:
var outcomesDf = spark
                    .read
                    .option("header", "true")
                    .csv("./data/*/*-outcomes.csv")
                    .withColumn("districtName", input_file_name())
var streetDf = spark
                    .read
                    .option("header", "true")
                    .csv("./data/*/*-street.csv")
                    .withColumn("districtName", input_file_name())

outcomesDf: org.apache.spark.sql.DataFrame = [Crime ID: string, Month: string ... 9 more fields]
streetDf: org.apache.spark.sql.DataFrame = [Crime ID: string, Month: string ... 11 more fields]


# Analyse data

In [12]:
streetDf.select($"month").distinct.orderBy($"month").show()

println("**** The street dataframe:")
streetDf.schema.names.foreach(println)

println("\n**** The outcomes dataframe:")
outcomesDf.schema.names.foreach(println)

+-------+
|  month|
+-------+
|2018-01|
|2018-02|
|2018-03|
|2018-04|
|2018-05|
|2018-06|
|2018-07|
|2018-08|
|2018-09|
|2018-10|
|2018-11|
|2018-12|
|2019-01|
|2019-02|
|2019-03|
+-------+

**** The street dataframe:
Crime ID
Month
Reported by
Falls within
Longitude
Latitude
Location
LSOA code
LSOA name
Crime type
Last outcome category
Context
districtName

**** The outcomes dataframe:
Crime ID
Month
Reported by
Falls within
Longitude
Latitude
Location
LSOA code
LSOA name
Outcome type
districtName


In [13]:
streetDf.show(3, false)

+----------------------------------------------------------------+-------+---------------------------+---------------------------+---------+---------+--------------------------+---------+-------------------+----------------------------+---------------------------------------------+-------+--------------------------------------------------------------+
|Crime ID                                                        |Month  |Reported by                |Falls within               |Longitude|Latitude |Location                  |LSOA code|LSOA name          |Crime type                  |Last outcome category                        |Context|districtName                                                  |
+----------------------------------------------------------------+-------+---------------------------+---------------------------+---------+---------+--------------------------+---------+-------------------+----------------------------+---------------------------------------------+-------+--

In [14]:
println("total number of records: " + streetDf.count)
println("total number of distinct crime IDs: " + streetDf.select($"Crime ID").distinct.count)
println("total number of outcomes: " + outcomesDf.count)
println("total number of outcomes by distinct crime Ids: " + outcomesDf.select($"Crime ID").distinct.count)

total number of records: 8261884
total number of distinct crime IDs: 6273525
total number of outcomes: 5708337
total number of outcomes by distinct crime Ids: 5063409


### Verify the unicity of the Crime ID

From this analysis we can see that there are multiple crimes with the same ID

In [15]:
// STREET
println(streetDf.groupBy($"Crime ID")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count
)

// Trying to identify the posibility of a multi-value primary key
println(streetDf.groupBy($"Crime ID", $"Month")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count
)
println(streetDf.groupBy($"Crime ID", $"Longitude")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count
)
println(streetDf.groupBy($"Crime ID", $"LSOA code")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count
)
println(streetDf.groupBy($"Crime ID", 
                         $"Month", 
                         $"Longitude", 
                         $"Latitude", 
                         $"Reported by", 
                         $"LSOA code", 
                         $"Falls within")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count
)

60736
51716
46830
56233
46526


From the analysis we gather that there is a high chance that these values are just ERRORS, thus a filter should be but in place to rule them out

In [16]:
var streetMultiIdDf = streetDf.groupBy($"Crime ID")
                                .agg(count($"Crime ID").as("count"))
                                .where($"count" > 1)
                                .select($"Crime ID")
                                .withColumnRenamed("Crime ID", "crimeIdMulti")

streetDf.join(streetMultiIdDf, $"Crime ID" === $"crimeIdMulti", "inner")
        .groupBy($"Crime ID")
        .agg(            
             collect_list($"Longitude").as("longitude_list"), 
             collect_set($"Longitude").as("longitude_set"),
             countDistinct($"Longitude").as("count_dictinct")
            )
        .where($"count_dictinct" > 1)
        .show()

+--------------------+--------------------+--------------------+--------------+
|            Crime ID|      longitude_list|       longitude_set|count_dictinct|
+--------------------+--------------------+--------------------+--------------+
|01c4971330ba99d0d...|[-5.761179, -5.84...|[-6.317724, -6.30...|            14|
|05835ad9b99479fc3...|[-5.642241, -5.71...|[-6.665021, -5.94...|            14|
|06ac8d52630a7d582...|[-2.557898, -2.55...|[-2.557898, -2.55...|             2|
|0f16eef9874953881...|[-5.905221, -5.92...|[-5.959611, -7.65...|             9|
|14abaa4c688812a6b...|[-2.350023, -2.35...|[-2.351300, -2.35...|             2|
|180ec98648a24f6b8...|[-5.700116, -5.85...|[-6.688120, -7.62...|            13|
|19b0da5dc82df7534...|[-5.700995, -5.82...|[-5.860830, -7.30...|            15|
|1adffe4a3d31f5a5a...|[-5.535522, -5.90...|[-6.303628, -5.93...|            14|
|2081173e60040dcd2...|[-5.712780, -5.85...|[-5.931754, -6.11...|            15|
|247ff19de8d7a9544...|[-1.949217, -1.95.

streetMultiIdDf: org.apache.spark.sql.DataFrame = [crimeIdMulti: string]


In [17]:
// OUTCOMES
outcomesDf.groupBy($"Crime ID")
        .agg(count($"Crime ID").as("count"))
        .where($"count" > 1)
        .count

res8: Long = 492850


In [18]:
var streetMultiIdDf = streetDf.groupBy($"Crime ID")
                                .agg(count($"Crime ID").as("count"))
                                .where($"count" > 1)
                                .select($"Crime ID")
                                .withColumnRenamed("Crime ID", "crimeIdMulti")

streetDf.join(streetMultiIdDf, $"Crime ID" === $"crimeIdMulti", "inner")
        .groupBy($"Crime ID")
        .agg(            
             collect_list($"Longitude").as("longitude_list"), 
             collect_set($"Longitude").as("longitude_set"),
             countDistinct($"Longitude").as("count_dictinct")
            )
        .where($"count_dictinct" > 1)
        .show()

+--------------------+--------------------+--------------------+--------------+
|            Crime ID|      longitude_list|       longitude_set|count_dictinct|
+--------------------+--------------------+--------------------+--------------+
|01c4971330ba99d0d...|[-5.761179, -5.84...|[-6.317724, -6.30...|            14|
|05835ad9b99479fc3...|[-5.642241, -5.71...|[-6.665021, -5.94...|            14|
|06ac8d52630a7d582...|[-2.557898, -2.55...|[-2.557898, -2.55...|             2|
|0f16eef9874953881...|[-5.905221, -5.92...|[-5.959611, -7.65...|             9|
|14abaa4c688812a6b...|[-2.350023, -2.35...|[-2.351300, -2.35...|             2|
|180ec98648a24f6b8...|[-5.700116, -5.85...|[-6.688120, -7.62...|            13|
|19b0da5dc82df7534...|[-5.700995, -5.82...|[-5.860830, -7.30...|            15|
|1adffe4a3d31f5a5a...|[-5.535522, -5.90...|[-6.303628, -5.93...|            14|
|2081173e60040dcd2...|[-5.712780, -5.85...|[-5.931754, -6.11...|            15|
|247ff19de8d7a9544...|[-1.949217, -1.95.

streetMultiIdDf: org.apache.spark.sql.DataFrame = [crimeIdMulti: string]


# Transformations

# Ingest

In [19]:
outcomesDf = spark
                    .read
                    .option("header", "true")
                    .csv("./data/*/*-outcomes.csv")
                    .withColumn("districtName", input_file_name())
streetDf = spark
                    .read
                    .option("header", "true")
                    .csv("./data/*/*-street.csv")
                    .withColumn("districtName", input_file_name())

outcomesDf: org.apache.spark.sql.DataFrame = [Crime ID: string, Month: string ... 9 more fields]
streetDf: org.apache.spark.sql.DataFrame = [Crime ID: string, Month: string ... 11 more fields]


## Select the minimum required fields

In [20]:
// SELECT
streetDf = streetDf.select($"Crime ID".as("crimeID"), 
                           $"districtName",
                           $"Latitude".as("latitude"),
                           $"Longitude".as("longitude"),
                           $"Crime type".as("crimeType"),
                           $"Last outcome category".as("lastOutcomeCategory")
                          )
outcomesDf = outcomesDf.select($"Crime ID".as("crimeId"),
                               $"Outcome type".as("outcomeType")
                              )

streetDf: org.apache.spark.sql.DataFrame = [crimeID: string, districtName: string ... 4 more fields]
outcomesDf: org.apache.spark.sql.DataFrame = [crimeId: string, outcomeType: string]


## Removing duplicate IDs

In [21]:
// get DF with duplicate IDs
streetMultiIdDf = streetDf.groupBy($"crimeID")
                                .agg(count($"crimeID").as("count"))
                                .where($"count" > 1)
                                .select($"crimeID")
                                .withColumnRenamed("crimeID", "crimeIdMulti")

var outcomesMultiIdDf = outcomesDf.groupBy($"crimeID")
                                .agg(count($"crimeID").as("count"))
                                .where($"count" > 1)
                                .select($"crimeID")
                                .withColumnRenamed("crimeID", "crimeIdMulti")

streetMultiIdDf: org.apache.spark.sql.DataFrame = [crimeIdMulti: string]
outcomesMultiIdDf: org.apache.spark.sql.DataFrame = [crimeIdMulti: string]


In [22]:
// Filter duplicate IDs
streetDf = streetDf.join(streetMultiIdDf, $"crimeID" === $"crimeIdMulti", "leftanti")
outcomesDf = outcomesDf.join(outcomesMultiIdDf, $"crimeID" === $"crimeIdMulti", "leftanti")

streetDf: org.apache.spark.sql.DataFrame = [crimeID: string, districtName: string ... 4 more fields]
outcomesDf: org.apache.spark.sql.DataFrame = [crimeId: string, outcomeType: string]


In [23]:
// Verify the remaining values
println(streetDf.count)
println(outcomesDf.count)

8029355
4570559


## District name

Adding the name of the district

In [24]:
// extract function
val extractName = udf((path: String) => new File(path).getName().split("-").drop(2).dropRight(1).mkString(" "))

// DF with extract
streetDf = streetDf.withColumn("districtName", extractName($"districtName"))

// print
streetDf.distinct.show(5, false)

+----------------------------------------------------------------+---------------+---------+---------+----------------------------+---------------------------------------------+
|crimeID                                                         |districtName   |latitude |longitude|crimeType                   |lastOutcomeCategory                          |
+----------------------------------------------------------------+---------------+---------+---------+----------------------------+---------------------------------------------+
|0006c2f699d8672953293d94af50131ad6b9aee0d5a0c129a8cabc194b7aa912|merseyside     |53.425884|-2.952538|Shoplifting                 |Investigation complete; no suspect identified|
|0007023f792cd822fdb06eeb55e8ba44c96404434af0d957654e3b9c9b73ac1b|west midlands  |52.476815|-1.895378|Shoplifting                 |Investigation complete; no suspect identified|
|00070d3e37d069af5b16fe9f30cd33108f635dfc0738ba3d78af5d428cbb1710|bedfordshire   |52.134816|-0.483361|Public o

extractName: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4751/0x0000000840f43040@6aba2420,StringType,List(Some(class[value[0]: string])),None,true,true)
streetDf: org.apache.spark.sql.DataFrame = [crimeID: string, districtName: string ... 4 more fields]


## Join to get final outcome

In [25]:
val finalDf = streetDf.join(outcomesDf, Seq("crimeID"), "inner")
                      .withColumn("lastOutcome", when($"outcomeType".isNotNull, $"outcomeType")
                                                      .otherwise($"lastOutcomeCategory") 
                                 )
                      .select($"crimeID",
                              $"districtName",
                              $"latitude",
                              $"longitude",
                              $"crimeType",
                              $"lastOutcome"                              
                             )

finalDf.show

+--------------------+------------------+---------+---------+--------------------+--------------------+
|             crimeID|      districtName| latitude|longitude|           crimeType|         lastOutcome|
+--------------------+------------------+---------+---------+--------------------+--------------------+
|0006c2f699d867295...|        merseyside|53.425884|-2.952538|         Shoplifting|Investigation com...|
|0007023f792cd822f...|     west midlands|52.476815|-1.895378|         Shoplifting|Investigation com...|
|001cec56bc43aa35a...|            sussex|51.064738|-0.332847|       Bicycle theft|Investigation com...|
|002163e215119dce0...|devon and cornwall|50.351150|-3.600262|Violence and sexu...|Unable to prosecu...|
|0024c795b8692eace...|           norfolk|52.485340| 0.519512|Violence and sexu...|Action to be take...|
|002c5bd49c4a165f9...|     thames valley|51.458122|-1.476708|Violence and sexu...|Investigation com...|
|002e2c23238dbbcd6...|            sussex|50.812559|-0.374688|   

finalDf: org.apache.spark.sql.DataFrame = [crimeID: string, districtName: string ... 4 more fields]


# KPI

In [26]:
finalDf.groupBy("crimeType")
       .agg(count($"crimeID").as("count"))
       .orderBy($"count".desc)
       .show(false)

+----------------------------+-------+
|crimeType                   |count  |
+----------------------------+-------+
|Violence and sexual offences|1491597|
|Criminal damage and arson   |533891 |
|Other theft                 |468270 |
|Vehicle crime               |400012 |
|Burglary                    |371944 |
|Public order                |361977 |
|Shoplifting                 |302199 |
|Drugs                       |105926 |
|Other crime                 |79373  |
|Bicycle theft               |76815  |
|Theft from the person       |63594  |
|Robbery                     |56783  |
|Possession of weapons       |29818  |
+----------------------------+-------+

