
## Uk Crime 

For this project, I have selected the UK Police API as the data source. The API provides detailed crime data across various categories for the specified locations and dates. The specific endpoint used is:

https://data.police.uk/api/crimes-street/all-crime?lat=52.629729&lng=-1.131592&date=2023-01

The producer and consumer were implemented using kafka and java, the producer fetches data fro the api and sends it to the consumer. The consumer writes the data to csv files that are used for this analysis and visualization. 

Because of the nature of the api that when you call a get request it gets you the data for specific area that is bounded by the longitude and latituted, eg. lat and lng that are in the api URL. Because that area is too narrow i wanted to analyse the whole area of london.

I created a box area the size of London based on the cities latitude and longitude:

        double minLat = 51.286;
        double maxLat = 51.686;
        double minLng = -0.510;
        double maxLng = 0.334;

        double latStep = 0.05;
        double lngStep = 0.05;

The steps are used for iteration in this virtual box they can be vertically and horizontally. The coordinates are placed in the api in the producer and the producer scans the city of London that we bounded.

Because i wanted to analyse crime overtime i had to increment the number of the month in the api URL. That way i got that through every iteration i scanned the city of London for every month of the year. 

The official data UK government API provides.

| Column Name      | Description                                                                                 |
|------------------|---------------------------------------------------------------------------------------------|
| `category`       | The category of the crime, providing a clear description of the type of offense committed.   |
| `persistent_id`  | A 64-character unique identifier for each crime. This ID is persistent and remains the same. |
| `month`          | The month when the crime occurred.                                                          |
| `location`       | The approximate location of the crime.                                                      |
| `latitude`       | The latitude of the crime's location.                                                       |
| `longitude`      | The longitude of the crime's location.                                                      |
| `street`         | The approximate street where the crime occurred.                                            |
| `id`             | A unique identifier for the street where the crime occurred.                                |
| `name`           | Name of the location. Note that this is only an approximation of where the crime happened.   |
| `location_type`  | Type of the location (Force or BTP). Force indicates a normal police location; BTP refers to British Transport Police. |
| `location_subtype`| For BTP locations, this specifies the type of location (e.g., train station).               |
| `outcome_status` | The outcome of the crime, including the category and date of the latest recorded outcome.    |


In the kafka consumer i divided the data into to files that are used for location based analysis, streaming analysis and general purpose analysis. The streaming files are divided in batches of files that are around 10k rows. Those files are then use to mimic near real time data streaming.

The crime location csv contains columns: 
| Column Name   | Description                                                               |
|---------------|---------------------------------------------------------------------------|
| `Latitude`    | The geographical latitude of the location where the crime occurred.        |
| `Longitude`   | The geographical longitude of the location where the crime occurred.       |
| `Category`    | The category of the crime (e.g., violent crime, theft, etc.).              |



The general purpose analysis csv contains columns: 

| Column Name        | Description                                                                            |
|--------------------|----------------------------------------------------------------------------------------|
| `ID`               | Unique identifier for the crime. This ID is related to the API and is not a police identifier. |
| `Category`         | The type or category of crime (e.g., violent crime, theft, etc.).                       |
| `Location Type`    | Indicates the type of location (Force or BTP). "Force" means a normal police area, while "BTP" refers to British Transport Police areas. |
| `Street ID`        | A unique identifier for the street where the crime occurred.                            |
| `Street Name`      | The name of the street where the crime was approximately recorded.                      |
| `Outcome Status`   | The status and result of the crime investigation (e.g., charged, under investigation, no further action). |
| `Month`            | The month when the crime was recorded.                                                  |

The streaming csv files contain columns:

| Column Name        | Description                                                                            |
|--------------------|----------------------------------------------------------------------------------------|
| `Category`         | The type or category of crime (e.g., violent crime, theft, etc.).                       |
| `Month`            | The month when the crime was recorded.                                                  |





##Analysing Location dataset 

####General purpose crime analysis

#####Crime locations by category

This code reads a CSV file containing location data into a DataFrame and processes it for further analysis. It converts the Latitude and Longitude columns to DoubleType for accurate numerical operations. A new column with random values is added to shuffle the DataFrame rows, which helps in randomizing the order of records. Finally, the DataFrame is reordered based on these random values, and the temporary random column is dropped.

Shuffling the data is essential for ensuring that any subsequent analyses or modeling is unbiased and not influenced by the original order of the records. This process can help in achieving more reliable and generalized results, especially in tasks like training machine learning.

In [0]:
from pyspark.sql.functions import col,rand
from pyspark.sql.types import DoubleType
#file_location = "/FileStore/tables/final_location_dataset.csv"

file_location = "/FileStore/tables/final_location_dataset.csv"
file_type = "csv"

df_loc = spark.read.option("header", "true").csv(file_location)
df_loc.cache()

df_loc = df_loc.withColumn("Latitude", col("Latitude").cast(DoubleType())) \
                   .withColumn("Longitude", col("Longitude").cast(DoubleType()))

df_loc = df_loc.withColumn("random", rand())

df_loc_shuffled = df_loc.orderBy("random").drop("random")


display(df_loc_shuffled)

Latitude,Longitude,Category
51.530566,-0.200361,burglary
51.428319,-0.056586,violent-crime
51.49321,-0.145896,public-order
51.589936,-0.145063,other-theft
51.446419,0.224786,public-order
51.53119,-0.119727,theft-from-the-person
51.47472,-0.122756,violent-crime
51.596023,0.032546,vehicle-crime
51.534751,0.118739,public-order
51.374541,-0.451792,violent-crime


Databricks visualization. Run in Databricks to view.

####Analysing and reverse geocoding the location data
This code performs reverse geocoding to convert latitude and longitude coordinates into city names using the reverse_geocode library. It then adds a city column to the DataFrame based on these coordinates. The analysis focuses on violent crimes, filtering the data to only include records with the violent-crime category. It aggregates violent crime counts by city and identifies the top 10 areas with the highest number of violent crimes. Additionally, it identifies the top 10 most dangerous and safest areas overall based on all crime types.

Converts spatial coordinates into meaningful city names, making it easier to analyze crime data by geographic locations.
Helps to identify the areas with the highest incidence of violent crimes, which can inform public safety efforts and resource allocation.
Provides insights into both the most dangerous and safest areas based on the total number of reported crimes, aiding in crime prevention strategies and urban planning.

In [0]:
%pip install reverse_geocode

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import reverse_geocode
from pyspark.sql.types import *
from pyspark.sql.functions import *


file_location = "/FileStore/tables/final_location_dataset.csv"
file_type = "csv"

df_loc = spark.read.option("header", "true").csv(file_location)
df_loc.cache()

#westminster 51.494720, -0.135278
x = 51.494720
y = -0.135278
coordinates = [(x,y)]
data = reverse_geocode.search(coordinates) 
print(data)


#UDF reverse geocoding
def get_city(lat, lon):
    coordinates = [(lat, lon)]
    result = reverse_geocode.search(coordinates)
    return result[0]['city'] if result else None

get_city_udf = udf(lambda lat, lon: get_city(lat, lon), StringType())

df_with_city = df_loc.withColumn('city', get_city_udf(col('latitude'), col('longitude')))
df_with_city.cache()


violent_crimes_df = df_with_city.filter(col('category') == 'violent-crime')

violent_crimes_df.cache()

violent_crimes_by_area = violent_crimes_df.groupBy('city').agg(count("*").alias("violent_crime_count"))

most_violent_area = violent_crimes_by_area.orderBy(col("violent_crime_count").desc()).limit(10)

# top 10 most violent areas
display(most_violent_area)

crimes_by_area = df_with_city.groupBy('city').agg(count("*").alias("crime_count"))
most_dangerous_area = crimes_by_area.orderBy(col("crime_count").desc())

#top 10 most dangerous areas with all types of crimes
display(most_dangerous_area.limit(10))


#top 10 safest areas in london
safest_areas = crimes_by_area.orderBy(col("crime_count").asc())
display(safest_areas.limit(10))


#city centre areas 



[{'country_code': 'GB', 'city': 'City of Westminster', 'latitude': 51.4975, 'longitude': -0.1357, 'population': 247614, 'state': 'England', 'county': 'Greater London', 'country': 'United Kingdom'}]


city,violent_crime_count
Croydon,2337
Barking,1623
West Ham,1609
Harlesden,1583
Bow,1509
Bethnal Green,1407
East Ham,1396
Kennington,1354
Walthamstow,1346
Peckham,1265


Databricks visualization. Run in Databricks to view.

city,crime_count
Croydon,9000
Bethnal Green,6947
King's Cross,6912
East Ham,6645
Barking,6454
Kennington,5951
West Ham,5857
Walthamstow,5836
Bow,5675
Harlesden,5625


Databricks visualization. Run in Databricks to view.

city,crime_count
Nutfield,1
West Horndon,1
Hadley Wood,2
West Clandon,2
Kings Langley,3
New Denham,4
High Ongar,5
Grays,8
Gerrards Cross,8
Bricket Wood,9


Databricks visualization. Run in Databricks to view.

#####Central London Crime Analysis

This code filters crime data to focus on central London areas, aggregates the crime counts for each area, and compares the total crime in central London with the total crime across all of London. It identifies specific areas such as Soho, Covent Garden, and Mayfair, and computes the crime count for each location. The code also creates a comparison DataFrame showing the proportion of crimes in central London relative to the entire city.

This analysis is useful for identifying how much crime occurs in specific central London areas, which is valuable for policy-making, safety measures, and resource distribution.
By comparing crime in central London to the entire city, it helps understand whether central areas are more or less crime-prone compared to the broader London region. This data can help with targeted policing and crime prevention strategies.

In [0]:
from pyspark.sql.types import LongType, StructType, StructField
from pyspark.sql.functions import sum as _sum

# List of central London areas for comparison
central_london_areas = [
    'Soho',
    'Covent Garden',
    'Holborn',
    'Mayfair',
    'Marylebone',
    'Fitzrovia',
    'Bloomsbury',
    'St. James\'s',
    'Victoria',
    'King\'s Cross',
    'Paddington',
    'Southbank',
    'Pimlico',
    'Clerkenwell',
    'Knightsbridge',
    'Belgravia',
    'West End',
    'Bayswater',
    'Chelsea',
    'Kensington',
    'Charing Cross',
    'Aldwych',
    'Temple',
    'Lambeth',
    'Vauxhall',
    'Borough',
    'London Bridge',
    'Shoreditch',
    'Whitechapel',
    'Spitalfields',
    'Barbican',
    'Farringdon',
    'South Kensington',
    'Hyde Park',
    'Regent\'s Park',
    'Somers Town',
    'Elephant and Castle',
    'Bermondsey',
    'Notting Hill',
    'Euston'
]
#
central_london_crimes = df_with_city.filter(col('city').isin(central_london_areas))
central_london_crimes.cache()

central_crime_stats = central_london_crimes.groupBy('city').agg(count("*").alias("crime_count"))

total_crime_in_central = central_crime_stats.agg(_sum('crime_count').alias('total_central_crime')).collect()[0][0]

total_crime_in_london = crimes_by_area.agg(_sum('crime_count').alias('total_london_crime')).collect()[0][0]

comparison_data = [(total_crime_in_central, total_crime_in_london)]

schema = StructType([
    StructField("Total Central London Crime", LongType(), True),
    StructField("Total London Crime", LongType(), True)
])

comparison_df = spark.createDataFrame(comparison_data, schema)

display(central_crime_stats)

display(comparison_df)


city,crime_count
Pimlico,3932
Marylebone,861
Regent's Park,830
Clerkenwell,2030
Barbican,1102
Holborn,72
Kensington,27
Chelsea,5252
Lambeth,2545
Whitechapel,655


Databricks visualization. Run in Databricks to view.

Total Central London Crime,Total London Crime
26315,383245


####General Purpose Dataset Analysis

#####Cleaning and editing the data

The dataset was loaded and cleaned by converting relevant columns to appropriate data types and handling missing values with default entries. Column names were standardized for consistency, and entries with generic street names were replaced with "Unknown". The resulting DataFrame is now prepared for analysis and visualization.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


file_location = "/FileStore/tables/ukYearFull.csv"
file_type = "csv"
df = spark.read.option("header", "true").csv(file_location)

# Data cleanup
df_cleaned = df.withColumn("ID", col("ID").cast(IntegerType())) \
               .withColumn("Latitude", col("Latitude").cast(DoubleType())) \
               .withColumn("Longitude", col("Longitude").cast(DoubleType())) \
               .withColumn("Month", col("Month").cast(DateType())) 

df_cleaned = df_cleaned.fillna({
  'Street Name':'Unknown',
  'Outcome Status':'Unknown'
})            
df_cleaned = df_cleaned.withColumnRenamed('Location Type', 'location_type') \
       .withColumnRenamed('Street ID', 'street_id') \
       .withColumnRenamed('Street Name', 'street_name') \
       .withColumnRenamed('Outcome Status', 'outcome_status') \
       .withColumnRenamed('Month', 'month') \
       .withColumnRenamed('Category', 'category') \
       .withColumnRenamed('Latitude', 'latitude') \
       .withColumnRenamed('Longitude', 'longitude') 



df_cleaned = df_cleaned.withColumn("street_name", when(trim(col("street_name")) == "On or near", "Unknown").otherwise(col("street_name")))

display(df_cleaned)

ID,category,location_type,latitude,longitude,street_id,street_name,outcome_status,month
107810326,anti-social-behaviour,Force,51.291537,-0.528145,2003082,On or near Tannery Lane,Unknown,2023-01-01
107810050,anti-social-behaviour,Force,51.27786,-0.526116,2003033,On or near Woodhill,Unknown,2023-01-01
107806339,burglary,Force,51.27786,-0.526116,2003033,On or near Woodhill,Court result unavailable,2023-01-01
107805369,burglary,Force,51.291537,-0.528145,2003082,On or near Tannery Lane,Investigation complete; no suspect identified,2023-01-01
107805381,criminal-damage-arson,Force,51.297943,-0.499382,2003938,On or near Haynes Close,Investigation complete; no suspect identified,2023-01-01
107805526,other-theft,Force,51.287223,-0.503574,2003725,On or near Kiln Lane,Investigation complete; no suspect identified,2023-01-01
107811525,other-theft,Force,51.293127,-0.489886,2004029,Unknown,Investigation complete; no suspect identified,2023-01-01
107808100,other-theft,Force,51.284942,-0.53293,2002809,On or near Send Hill,Status update unavailable,2023-01-01
107806909,other-theft,Force,51.297943,-0.499382,2003938,On or near Haynes Close,Investigation complete; no suspect identified,2023-01-01
107810282,other-theft,Force,51.293127,-0.489886,2004029,Unknown,Investigation complete; no suspect identified,2023-01-01


#####Most Common Outcomes for Each Category
Retrieves the most frequent outcome for each crime category based on the counts.
Highlights the most common resolution for each crime type, which can be useful for assessing patterns in crime resolution.

In [0]:
# Most Common Outcome for Each Category
spark.sql("""
SELECT category, outcome_status, count as most_likely_outcome
FROM (
    SELECT category, outcome_status, count,
           ROW_NUMBER() OVER (PARTITION BY category ORDER BY count DESC) as row_num
    FROM category_outcome
) tmp
WHERE row_num = 1
""").display()


category,outcome_status,most_likely_outcome
anti-social-behaviour,Unknown,21702
bicycle-theft,Investigation complete; no suspect identified,1551
burglary,Investigation complete; no suspect identified,4408
criminal-damage-arson,Investigation complete; no suspect identified,5376
drugs,Investigation complete; no suspect identified,1344
other-crime,Investigation complete; no suspect identified,631
other-theft,Investigation complete; no suspect identified,9877
possession-of-weapons,Court result unavailable,221
public-order,Investigation complete; no suspect identified,4783
robbery,Investigation complete; no suspect identified,2245


Databricks visualization. Run in Databricks to view.

#####Most Crimes per Relative Location of Crime
Counts crimes by street name and ranks them by frequency.
Helps identify streets with the highest number of crimes, which can be useful for targeting interventions and resource allocation.

In [0]:
# Most Crimes per Relative Location of Crime
most_crimes_per_relative_location = df_cleaned.filter(col("street_name") != "Unknown") \
    .groupBy("street_name") \
    .agg(count("*").alias("crime_count")) \
    .orderBy(col("crime_count").desc())
display(most_crimes_per_relative_location.limit(10))


street_name,crime_count
On or near Supermarket,3237
On or near Shopping Area,2999
On or near Parking Area,2519
On or near Petrol Station,1323
On or near Nightclub,700
On or near Further/higher Educational Building,681
On or near Hospital,649
On or near North End,553
On or near Theatre/concert Hall,542
On or near Sports/recreation Area,526


Databricks visualization. Run in Databricks to view.

#####Crimes by Street ID

Aggregates and counts crimes by street ID, ordering by frequency.
Provides insights into crime distribution based on street IDs, allowing for identification of high-crime areas for focused investigation and management.

In [0]:
# Most Crimes per Street ID
most_crimes_per_street_id = df_cleaned.groupBy("street_id") \
    .agg(count("*").alias("crime_count")) \
    .orderBy(col("crime_count").desc())

# Top 10 Street IDs with Crimes
display(most_crimes_per_street_id.limit(10))


street_id,crime_count
2136892,445
1665456,364
1708460,332
1683607,318
1681353,304
1657768,288
1681290,284
1665604,278
1679222,272
1693220,268


Databricks visualization. Run in Databricks to view.

####Streaming Dataset Analysis

In the consumer the data was batch written to csv files that are used for near real time streaming in databricks.
The files contain about 10k lines each. They were stored in the stream_data_dir directory in databricks. 

In [0]:
%fs ls /FileStore/tables/stream_data_dir

path,name,size,modificationTime
dbfs:/FileStore/tables/stream_data_dir/stream_1.csv,stream_1.csv,239705,1725564101000
dbfs:/FileStore/tables/stream_data_dir/stream_10-1.csv,stream_10-1.csv,242137,1725564122000
dbfs:/FileStore/tables/stream_data_dir/stream_10.csv,stream_10.csv,243261,1725553728000
dbfs:/FileStore/tables/stream_data_dir/stream_11-1.csv,stream_11-1.csv,242317,1725564126000
dbfs:/FileStore/tables/stream_data_dir/stream_11.csv,stream_11.csv,243363,1725553734000
dbfs:/FileStore/tables/stream_data_dir/stream_12-1.csv,stream_12-1.csv,241254,1725564129000
dbfs:/FileStore/tables/stream_data_dir/stream_12.csv,stream_12.csv,243551,1725553741000
dbfs:/FileStore/tables/stream_data_dir/stream_13-1.csv,stream_13-1.csv,242230,1725564131000
dbfs:/FileStore/tables/stream_data_dir/stream_13.csv,stream_13.csv,245984,1725553748000
dbfs:/FileStore/tables/stream_data_dir/stream_14-1.csv,stream_14-1.csv,246285,1725564133000


#####Crimes per Category Streaming
This code is designed to process and stream crime data in real-time. It reads CSV files containing crime categories, processes the data incrementally as new files arrive, and calculates the count of crimes per category. The processed data is stored in memory for real-time visualization or further analysis.
This setup is valuable for streaming and tracking crime patterns as data is updated. It can be used to monitor crime categories as they increase or decrease over time in near real-time.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

file_location = "/FileStore/tables/stream_data_dir"

schema = StructType([
    StructField("Category", StringType(), True)
])

df_stream = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(file_location)
# df_stream.cache()


# display(df_stream)


df_counts = df_stream.groupBy("Category").agg(count("Category").alias("Count"))
df_counts.isStreaming
 # df_counts.cache()

spark.conf.set("spark.sql.shuffle.partitions", "2")  

query = (
  df_counts
    .writeStream
    .format("memory")       
    .queryName("scounts")     
    .outputMode("complete")  
    .trigger(availableNow=True)
    .start()
)

query.awaitTermination(10)


Out[7]: False

In [0]:
display(spark.sql("SELECT * FROM scounts ORDER BY Count DESC"))



Category,Count
violent-crime,184568
anti-social-behaviour,147988
other-theft,77751
vehicle-crime,71015
public-order,42461
criminal-damage-arson,41202
shoplifting,40441
burglary,35862
theft-from-the-person,31946
drugs,25807


Databricks visualization. Run in Databricks to view.

####Violent Crimes over Time

This code streams and analyzes crime data, specifically focusing on violent crimes. It reads incoming crime data files, filters them for violent crimes, and groups them by month to count occurrences. The results are stored in memory for real-time querying and visualization.

This analysis is useful for real-time monitoring of violent crimes, allowing users to observe how such crimes evolve over time, month by month.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# File location and schema
file_location = "/FileStore/tables/stream_data_dir"

schema = StructType([
    StructField("Category", StringType(), True),
    StructField("Month", StringType(), True)
])

df_stream = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).csv(file_location)

violent_crimes_df = df_stream.filter(col("Category") == "violent-crime")

df_counts = violent_crimes_df.groupBy("Month").agg(count("Category").alias("Count"))

query = (
  df_counts
    .writeStream
    .format("memory")
    .queryName("violent_crimes_counts")
    .outputMode("complete")  
    .trigger(processingTime='10 seconds')  
    .start()
)

query.awaitTermination(10) 


Out[39]: False

In [0]:
display(spark.table("violent_crimes_counts"))


Month,Count
2023-07,10969
2023-06,12173
2023-09,8847
2023-03,10403
2023-02,8841
2023-05,11076
2023-01,6700
2023-08,7931
2023-04,9795


Databricks visualization. Run in Databricks to view.