In [None]:
# Imports

import warnings
import itertools

import numpy as np
import pandas as pd

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import count, avg, sum, col, to_date, expr

import matplotlib.pyplot as plt

import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima_model import ARIMA

import seaborn as sns; sns.set_theme()


In [None]:
# Spark initialization

spark = SparkSession.builder.appName('KF7032 Assessment').getOrCreate()

In [None]:
# All crimes
"""
The all_crimes21_hdr.txt.gz file should be in the same path as the ipnb file is.

"""

df_all_crime = spark.read.csv("all_crimes21_hdr.txt.gz", header=True, inferSchema= True)
dates = (("2011-01-01",  "2020-12-01"))
df_all_crime = df_all_crime.select([col(column).alias(column.replace(' ', '_'))
                                    for column in df_all_crime.columns])
df_all_crime = df_all_crime.withColumn('Month', to_date(df_all_crime.Month,"yyyy-MM"))\
.withColumn('city', expr("substring(LSOA_name, 1, length(LSOA_name)-5)")) \
.filter(col('Month').between(*dates))\
.orderBy('Month')

In [None]:
# LSOA

"""
The LSOA_pop_v2.csv file should be in the same path as the ipnb file is.

"""

df_lsoa = spark.read.csv("LSOA_pop_v2.csv", header=True, inferSchema= True)
df_lsoa = df_lsoa.select([col(column).alias(column.replace(' ', '_'))
                                    for column in df_lsoa.columns]) # Renaming the column by replacing the space with '_'

In [None]:
"""Filtering the all crimes to extract violent climes and grouping them with date"""

df_violent_crimes = df_all_crime.filter(col('Crime_type').isin({'Violent crime', 'Violence and sexual offences'}))\
.groupby('Month') \
.count().orderBy('Month')

df_violent_crimes.show()

In [None]:
pdf_violent_crimes = df_violent_crimes.toPandas() # Converting to pandas
pdf_violent_crimes.Month = pdf_violent_crimes.Month.astype('datetime64[ns]')
pdf_violent_crimes = pdf_violent_crimes.set_index(pd.to_datetime(pdf_violent_crimes.Month))

In [None]:
# Ploting the time series month wise count
pdf_violent_crimes.plot(y='count', x='Month',marker='.', linestyle='-', linewidth=0.5,\
         subplots=False,
         label='Violent Crime Rate',
         title='Violent Crime Rate in last 10 year')
plt.ylabel('Count')
plt.show()

In [None]:
# Decompossition
decomposition = sm.tsa.seasonal_decompose(pdf_violent_crimes['count'], model='additive',extrapolate_trend='freq')

# plot the graphs
f, ax = plt.subplots(4, 1, sharex=True)
f.set_size_inches(14,7)
ax[0].plot(pdf_violent_crimes['Month'], results_df['trend'], 'b')
ax[0].set_title('Figure 2')
ax[0].set_xlabel('Trend')
ax[0].set_ylabel('Value')

ax[1].plot(pdf_violent_crimes['Month'], results_df['seasonal'], 'b')
ax[1].set_xlabel('Seasonal')
ax[1].set_ylabel('Value')

ax[2].plot(pdf_violent_crimes['Month'], results_df['resid'], 'b')
ax[2].set_xlabel('Residual')
ax[2].set_ylabel('Value')

ax[3].plot(pdf_violent_crimes['Month'], results_df['observed'], 'b')
ax[3].set_xlabel('Observed')
ax[3].set_ylabel('Value')

plt.show()


In [None]:
# Function to test stationary
def test_stationarity(timeseries, title):
    plt.clf()
    #Determing rolling statistics
    rolmean = pd.Series(timeseries).rolling(window=12).mean()
    rolstd = pd.Series(timeseries).rolling(window=12).std()

    fig, ax = plt.subplots(figsize=(16, 4))
    ax.plot(timeseries, label= title)
    ax.plot(rolmean, label='rolling mean');
    ax.plot(rolstd, label='rolling std (x12)');
    ax.legend()
    plt.show()

In [None]:
pd.options.display.float_format = '{:.8f}'.format
test_stationarity(pdf_violent_crimes['count'],'raw data')

In [None]:
# Augmented Dickey Fuller (ADF) test
no = 'not'
def ADF_test(timeseries, dataDesc):
    print(' > Is the {} stationary ?'.format(dataDesc))
    dftest = adfuller(timeseries.dropna(), autolag='AIC')
    print('Test statistic = {:.3f}'.format(dftest[0]))
    print('P-value = {:.3f}'.format(dftest[1]))
    print('Critical values :')
    for k, v in dftest[4].items():
        print(f"\t{k}: {v} - The data is {no if v < dftest[0] else ''} stationary with {100-int(k[:-1])}% confidence")

In [None]:
ADF_test(pdf_violent_crimes['count'],'raw data')

In [None]:
# Detrending
count = pdf_violent_crimes['count']
count_detrend =  (count - count.rolling(window=12).mean())/count.rolling(window=12).std()

ADF_test(count_detrend,'count')

In [None]:
# Differencing
y_12lag =  count - count.shift(12)

test_stationarity(y_12lag,'12 lag differenced data')
ADF_test(y_12lag,'12 lag differenced data')

In [None]:
# Detrending + Differencing

y_12lag_detrend =  count_detrend - count_detrend.shift(12)

test_stationarity(y_12lag_detrend,'12 lag differenced de-trended data')

ADF_test(y_12lag_detrend,'12 lag differenced de-trended data')

In [None]:
y = y_12lag_detrend.dropna()

In [None]:
# Model Trainin and Fitting
y_to_train = y[:'2019-12-01'] # dataset to train
y_to_test = y['2020-01-01': ] # last X months for test
y_to_val = y_to_test

warnings.filterwarnings("ignore") # specify to ignore warning messages

p = d = q = range(0, 2)
seasonal_period = 12
pdq = list(itertools.product(p, d, q))
seasonal_pdq = [(x[0], x[1], x[2],seasonal_period) for x in list(itertools.product(p, d, q))]


for param in pdq:
    for param_seasonal in seasonal_pdq:
        try:
            mod = sm.tsa.statespace.SARIMAX(y_to_train,
                                      order=param,
                                      seasonal_order=param_seasonal,
                                      enforce_invertibility=False)
            results = mod.fit()
        except Exception as ex:
            continue

order = (0, 0, 1) #Using Values from the previous step
seasonal_order = (1, 1, 1, 12)
model = sm.tsa.statespace.SARIMAX(y_to_train,
                            order=order,
                            seasonal_order=seasonal_order,
                            enforce_invertibility=False)
results = model.fit()


In [None]:
# Forcast Ploating
pred_uc = results.get_forecast(steps=24)
pred_ci = pred_uc.conf_int()

ax = y.plot(label='observed', figsize=(14, 7))

pred_uc.predicted_mean.plot(ax=ax, label='Forecast')
ax.fill_between(pred_ci.index,
                pred_ci.iloc[:, 0],
                pred_ci.iloc[:, 1], color='k', alpha=.25)
ax.set_xlabel('Date')
ax.set_ylabel('Count')

plt.legend()
plt.show()


In [None]:
df_firearms = df_all_crime.filter(
    (col('Crime_type') == 'Possession of weapons') & \
    (col('Last_outcome_category') == 'Offender sent to prison')) \
.select('city') \
.groupBy('city').count().withColumnRenamed('count', 'crime_count')

In [None]:
df_firearms.show()

In [None]:
df_lsoa_population = df_lsoa.select('geography_code',
                                    col('Variable:_All_usual_residents;_measures:_Value').alias('Population'),
                                    'geography')\
.withColumn('geography', expr("substring(geography, 1, length(geography)-5)"))\
.groupBy('geography').sum('Population').withColumnRenamed('sum(Population)', 'Population')

In [None]:
df_lsoa_population.show()

In [None]:
per_head_firearms = df_firearms.join(df_lsoa_population, col('city') == col('geography'))\
.select('Population', 'city', 'crime_count')

In [None]:
per_head_firearms.show()

In [None]:
per_head_firearms = per_head_firearms.withColumn('per_head_value',
                                                 (per_head_firearms['crime_count']/per_head_firearms['Population'])*1000)\
.sort(col('per_head_value').desc()).cache()

In [None]:
cities = [str(data.city) for data in per_head_firearms.select('city').collect()]


In [None]:
pdf_per_head_guns = per_head_firearms.toPandas()

In [None]:
pdf_per_head_plot = pdf_per_head_guns.iloc[[0,1,2,3,4, 6, 7, 8, 9, 10, 87],:]

In [None]:
pdf_per_head_plot.plot.bar('city', 'per_head_value')
plt.show()

In [None]:
df_drugs = df_all_crime.filter(
     col('Crime_type') == 'Drugs') \
.select('city') \
.groupBy('city').count().withColumnRenamed('count', 'crime_count')

In [None]:
df_drugs.show()

In [None]:
per_head_drugs = df_drugs.join(df_lsoa_population, col('city') == col('geography'))\
.select('Population', 'city', 'crime_count')

In [None]:
per_head_drugs = per_head_drugs.withColumn('per_head_value',
                                                 (per_head_drugs['crime_count']/per_head_drugs['Population'])*10000)\
.sort(col('per_head_value').desc())

In [None]:
per_head_drugs.show()

In [None]:
drug_firearms = per_head_firearms.join(per_head_drugs, per_head_firearms.city == per_head_drugs.city)\
.select(per_head_firearms['city'], per_head_firearms['per_head_value'].alias('per_head_firearms'),
        per_head_drugs['per_head_value'].alias('per_head_drugs'))

In [None]:
drug_firearms.show()

In [None]:
pdf_drug_firearms= drug_firearms.toPandas()
sns.regplot(pdf_drug_firearms['per_head_firearms'], pdf_drug_firearms['per_head_drugs'])
plt.show()

In [None]:
drug_firearms.stat.corr('per_head_firearms', 'per_head_drugs')