In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BDAS-jwu163').getOrCreate()

In [2]:
import pyspark.sql.functions as f
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType)
from pyspark.ml.feature import ChiSqSelector, VectorAssembler, OneHotEncoder,\
    StringIndexer, IndexToString
from pyspark.ml.linalg import Vectors
import gc

Reading the file and infer the data schema:

In [5]:
crime_raw = spark.read.csv('Data/crime1.csv', header = True, inferSchema=True).union(
    spark.read.csv('Data/crime2.csv', header = True, inferSchema=True)).union(
    spark.read.csv('Data/crime3.csv', header = True, inferSchema=True))

In [6]:
crime_desc = crime_raw.describe()

Produce a summary of the raw data frame

In [7]:
crime_desc.select(['summary']+crime_desc.columns[1:7]).show()
crime_desc.select(['summary']+crime_desc.columns[8:14]).show()
crime_desc.select(['summary']+crime_desc.columns[15:21]).show()

+-------+---------------+------------------+------------------+--------------------+--------+-----------------+
|summary|INCIDENT_NUMBER|      OFFENSE_CODE|OFFENSE_CODE_GROUP| OFFENSE_DESCRIPTION|DISTRICT|   REPORTING_AREA|
+-------+---------------+------------------+------------------+--------------------+--------+-----------------+
|  count|         319073|            319073|            319073|              319073|  317308|           319073|
|   mean|    1.4205255E8| 2317.546956339145|              null|                null|    null|383.2111316732648|
| stddev|            NaN|1185.2855429417043|              null|                null|    null|242.2869365644474|
|    min|      142052550|               111|Aggravated Assault|A&B HANDS, FEET, ...|      A1|                 |
|    max|     I182070945|              3831|   Warrant Arrests|WEAPON - OTHER - ...|      E5|               99|
+-------+---------------+------------------+------------------+--------------------+--------+-----------

In [50]:
crime_raw.printSchema()

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- OCCURRED_ON_DATE: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Location: string (nullable = true)



A lot of columns are not useful due to being duplicate information, they are dropped

In [51]:
waste_cols = ['INCIDENT_NUMBER', 'OFFENSE_DESCRIPTION', 'REPORTING_AREA', 
              'OCCURRED_ON_DATE', 'STREET', 'Lat', 'Long', 'Location']

crime_sel = crime_raw.drop(*waste_cols)
print(crime_sel.columns)

['OFFENSE_CODE', 'OFFENSE_CODE_GROUP', 'DISTRICT', 'SHOOTING', 'YEAR', 'MONTH', 'DAY_OF_WEEK', 'HOUR', 'UCR_PART', 'X', 'Y']


To discover the outliers, a Z-score is calculated for all records of X and Y.<br>
The z-score is calculated as <br>
$\frac{X-\mu}{\sigma}$

In [52]:
print('Missing records:')
crime_sel.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in crime_sel.columns]).show()

Missing records:
+------------+------------------+--------+--------+----+-----+-----------+----+--------+-----+-----+
|OFFENSE_CODE|OFFENSE_CODE_GROUP|DISTRICT|SHOOTING|YEAR|MONTH|DAY_OF_WEEK|HOUR|UCR_PART|    X|    Y|
+------------+------------------+--------+--------+----+-----+-----------+----+--------+-----+-----+
|           0|                 0|    1765|  318054|   0|    0|          0|   0|      90|35349|35349|
+------------+------------------+--------+--------+----+-----+-----------+----+--------+-----+-----+



In [53]:
stats = crime_sel.agg(
    f.stddev(f.col("X")).alias("X_sd"), 
    f.mean(f.col("X")).alias("X_avg"),
    f.stddev(f.col("Y")).alias("Y_sd"), 
    f.mean(f.col("Y")).alias("Y_avg")).first().asDict()
stats

{'X_avg': 769756.4109410138,
 'X_sd': 11417.424122806535,
 'Y_avg': 2943563.5425176807,
 'Y_sd': 34031.49817354141}

In [54]:
crime_out = crime_sel.withColumn(
    'X_z', f.abs(crime_sel['X'] - stats['X_avg']) / stats['X_sd']
).withColumn(
    'Y_z', f.abs(crime_sel['Y'] - stats['Y_avg']) / stats['Y_sd']
)

In [55]:
for c in ['X', 'Y']:
    print('Outlier count in '+c+':', crime_out.filter(c+'_z >= 3').count())

Outlier count in X: 34
Outlier count in Y: 34


All outlier are dropped<br>
Missing values in X and Y are imputed with mean<br>
Missing values in UCR_PART is imputed as 'Other'<br>

In [56]:
crime_clean = crime_out.\
    na.fill(0, subset=['X_z', 'Y_z']).\
    filter('X_z < 3').\
    filter('Y_z < 3').\
    na.drop(subset=["DISTRICT"]).\
    na.fill('N', subset=['SHOOTING']).\
    na.fill('Other', subset=['UCR_PART']).\
    na.fill(stats['X_avg'], subset=['X']).\
    na.fill(stats['Y_avg'], subset=['Y'])

for c in ['X', 'Y']:
    print('Outlier count in '+c+':', crime_clean.filter(c+'_z >= 3').count())
crime_clean = crime_clean.drop(*['X_z', 'Y_z'])
print('Missing records:')
crime_clean.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in crime_clean.columns]).show()
print('Number of records:', crime_clean.count())

Outlier count in X: 0
Outlier count in Y: 0
Missing records:
+------------+------------------+--------+--------+----+-----+-----------+----+--------+---+---+
|OFFENSE_CODE|OFFENSE_CODE_GROUP|DISTRICT|SHOOTING|YEAR|MONTH|DAY_OF_WEEK|HOUR|UCR_PART|  X|  Y|
+------------+------------------+--------+--------+----+-----+-----------+----+--------+---+---+
|           0|                 0|       0|       0|   0|    0|          0|   0|       0|  0|  0|
+------------+------------------+--------+--------+----+-----+-----------+----+--------+---+---+

Number of records: 317274


In [57]:
crime_clean = crime_clean.withColumn(
    'IS_NIGHT',
    f.when((f.col('HOUR') < 6) | (f.col('HOUR') > 20), True).otherwise(False)
)

In [58]:
crime_clean.select(['HOUR', 'IS_NIGHT']).show(n=5)

+----+--------+
|HOUR|IS_NIGHT|
+----+--------+
|  13|   false|
|   0|    true|
|  19|   false|
|  21|    true|
|  21|    true|
+----+--------+
only showing top 5 rows



In [59]:
names_raw = spark.read.csv('Data/rmsoffensecodes.csv', header = True, inferSchema=True)
names_raw.sort('CODE').show(n=5)

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
| 111|MURDER NON-NEGLIG...|
| 111|MURDER, NON-NEGLI...|
| 112|KILLING OF FELON ...|
| 112|KILLING OF FELON ...|
| 113|KILLING OF FELON ...|
+----+--------------------+
only showing top 5 rows



In [60]:
names_clean = names_raw.dropDuplicates(['CODE'])
names_clean.sort('CODE').show(n=5)

+----+--------------------+
|CODE|                NAME|
+----+--------------------+
| 111|MURDER, NON-NEGLI...|
| 112|KILLING OF FELON ...|
| 113|KILLING OF FELON ...|
| 114|KILLING OF POLICE...|
| 121|MANSLAUGHTER - VE...|
+----+--------------------+
only showing top 5 rows



In [61]:
df = crime_clean.join(names_clean, 
                               crime_clean.OFFENSE_CODE == names_clean.CODE, 
                               'left').drop('OFFENSE_CODE', 'CODE')
print('Missing records:')
df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns]).show()

Missing records:
+------------------+--------+--------+----+-----+-----------+----+--------+---+---+--------+----+
|OFFENSE_CODE_GROUP|DISTRICT|SHOOTING|YEAR|MONTH|DAY_OF_WEEK|HOUR|UCR_PART|  X|  Y|IS_NIGHT|NAME|
+------------------+--------+--------+----+-----+-----------+----+--------+---+---+--------+----+
|                 0|       0|       0|   0|    0|          0|   0|       0|  0|  0|       0|   0|
+------------------+--------+--------+----+-----+-----------+----+--------+---+---+--------+----+



In [62]:
df = df.drop('NAME')
df = df.withColumn('IF_SHOOT', f.when(f.col('SHOOTING') == 'Y', 1).otherwise(0)).drop('SHOOTING')
print(*df.columns, sep='\n')

OFFENSE_CODE_GROUP
DISTRICT
YEAR
MONTH
DAY_OF_WEEK
HOUR
UCR_PART
X
Y
IS_NIGHT
IF_SHOOT


In [19]:
scol = ['OFFENSE_CODE_GROUP','DISTRICT','DAY_OF_WEEK','UCR_PART']
strInd = {
    c: {'enc': StringIndexer(inputCol=c, outputCol=c+'_ind')}
    for c in scol
}

In [20]:
del names_clean
del crime_clean
del crime_sel
del stats
del crime_out
del crime_raw
del crime_desc
del names_raw
gc.collect()

554

In [21]:
idf = df.drop('?')
for c in scol:
    mod = strInd[c]['enc'].fit(idf)
    idf = mod.transform(idf)
    strInd[c]['lab'] = mod.labels
    idf = OneHotEncoder(inputCol = c+'_ind', outputCol = c+'_ohe').transform(idf)
print(*idf.columns, sep='\n')

OFFENSE_CODE_GROUP
DISTRICT
YEAR
MONTH
DAY_OF_WEEK
HOUR
UCR_PART
X
Y
IS_NIGHT
IF_SHOOT
OFFENSE_CODE_GROUP_ind
OFFENSE_CODE_GROUP_ohe
DISTRICT_ind
DISTRICT_ohe
DAY_OF_WEEK_ind
DAY_OF_WEEK_ohe
UCR_PART_ind
UCR_PART_ohe


In [22]:
idf = idf.drop(*[c+'_ind' for c in scol])
print(*idf.columns, sep='\n')

OFFENSE_CODE_GROUP
DISTRICT
YEAR
MONTH
DAY_OF_WEEK
HOUR
UCR_PART
X
Y
IS_NIGHT
IF_SHOOT
OFFENSE_CODE_GROUP_ohe
DISTRICT_ohe
DAY_OF_WEEK_ohe
UCR_PART_ohe


In [23]:
f_inputs = ['YEAR','MONTH','HOUR','X','Y','IS_NIGHT']+[c+'_ohe' for c in scol]

In [24]:
assembler = VectorAssembler(
  inputCols=f_inputs,
              outputCol="features")

# Let's transform the data. 
vec_data = assembler.transform(idf)

In [25]:
vec_data.select([c+'_ohe' for c in scol]).show(1)

+----------------------+--------------+---------------+-------------+
|OFFENSE_CODE_GROUP_ohe|  DISTRICT_ohe|DAY_OF_WEEK_ohe| UCR_PART_ohe|
+----------------------+--------------+---------------+-------------+
|        (66,[1],[1.0])|(11,[6],[1.0])|      (6,[],[])|(3,[2],[1.0])|
+----------------------+--------------+---------------+-------------+
only showing top 1 row



In [26]:
selector = ChiSqSelector(numTopFeatures=20, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="IF_SHOOT")

selector_model = selector.fit(vec_data)

fs_result = selector_model.transform(vec_data)

In [27]:
f_used = selector_model.selectedFeatures
print(selector_model.selectedFeatures)
print(f_inputs)

[2, 3, 4, 5, 6, 7, 19, 20, 34, 42, 44, 57, 61, 72, 75, 76, 89, 90, 91, 88]
['YEAR', 'MONTH', 'HOUR', 'X', 'Y', 'IS_NIGHT', 'OFFENSE_CODE_GROUP_ohe', 'DISTRICT_ohe', 'DAY_OF_WEEK_ohe', 'UCR_PART_ohe']


In [28]:
selected_lab = {}
for ind in f_used:
    if ind < 6:
        selected_lab[ind] = f_inputs[ind]
    elif ind < 72:
        selected_lab[ind] = 'OFFENSE_CODE_GROUP: ' +  strInd['OFFENSE_CODE_GROUP']['lab'][ind - 6]
    elif ind < 83:
        selected_lab[ind] = 'DISTRICT: ' + strInd['DISTRICT']['lab'][ind - 72]
    elif ind < 89:
        selected_lab[ind] = 'DAY_OF_WEEK: ' + strInd['DAY_OF_WEEK']['lab'][ind - 83]
    else:
        selected_lab[ind] = 'UCR_PART: ' + strInd['UCR_PART']['lab'][ind - 89]

In [29]:
full_df = fs_result.select(['selectedFeatures', 
                            'IF_SHOOT']).withColumnRenamed('selectedFeatures', 
                                                           'features')
full_df.show(3)

+--------------------+--------+
|            features|IF_SHOOT|
+--------------------+--------+
|(20,[0,1,2,5,19],...|       0|
|(20,[1,2,3,18],[7...|       0|
|(20,[0,1,2,17],[1...|       0|
+--------------------+--------+
only showing top 3 rows



In [30]:
del selector
del selector_model
del fs_result
del assembler
del vec_data
del idf
gc.collect()

254

In [31]:
neg_split = full_df.filter('IF_SHOOT == 0').randomSplit([0.7, 0.3], 1)
pos_split = full_df.filter('IF_SHOOT == 1').randomSplit([0.7, 0.3], 1)
train_df = neg_split[0].union(pos_split[0])
test_df = neg_split[1].union(pos_split[1])

print('train:', train_df.count())
train_df.show(5)
print('test:', test_df.count())
test_df.show(5)

train: 222391
+--------------------+--------+
|            features|IF_SHOOT|
+--------------------+--------+
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[6.0,...|       0|
+--------------------+--------+
only showing top 5 rows

test: 94883
+--------------------+--------+
|            features|IF_SHOOT|
+--------------------+--------+
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[6.0,...|       0|
|(20,[0,1,2],[7.0,...|       0|
|(20,[0,1,2],[7.0,...|       0|
|(20,[0,1,2],[7.0,...|       0|
+--------------------+--------+
only showing top 5 rows



In [32]:
pos_train = train_df.filter('IF_SHOOT == 1')
neg_train = train_df.filter('IF_SHOOT == 0')
ratio = pos_train.count() / neg_train.count()
pos_train = pos_train.withColumn("_", f.explode(f.array([f.lit(x) for x in range(10)]))).drop('_')
neg_train = neg_train.randomSplit([ratio*10, 1-ratio*10])[0]
balance_train = pos_train.union(neg_train)


print('ratio:', ratio)
print('pos after balance:', pos_train.count())
print('neg after balance:', neg_train.count())

ratio: 0.003225428102275393
pos after balance: 7150
neg after balance: 7112


In [33]:
del train_df
del pos_train
del neg_train
gc.collect()

29

In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier

In [35]:
def custom_eval(pred):
    pos = pred.filter('IF_SHOOT == 1')
    tp = pos.filter('prediction == 1').count()
    fn = pos.filter('prediction == 0').count()
    del pos
    neg = pred.filter('IF_SHOOT == 0')
    fp = neg.filter('prediction == 1').count()
    tn = neg.filter('prediction == 0').count() 
    del neg
    print('True positive rate:', tp/(tp+fn))
    print('False positive rate', fp/(tn+fp))

In [36]:
dtree6 = DecisionTreeClassifier(maxDepth=5, minInstancesPerNode=20, labelCol="IF_SHOOT")
custom_eval(dtree6.fit(balance_train).transform(test_df))
del dtree6
gc.collect()

True positive rate: 0.804635761589404
False positive rate 0.049608272274558315


263

In [37]:
dtree7 = DecisionTreeClassifier(maxDepth=5, minInstancesPerNode=50, labelCol="IF_SHOOT")
custom_eval(dtree7.fit(balance_train).transform(test_df))
del dtree7

True positive rate: 0.804635761589404
False positive rate 0.050369524534525964


In [38]:
dtree8 = DecisionTreeClassifier(maxDepth=5, minInstancesPerNode=80, labelCol="IF_SHOOT")
custom_eval(dtree8.fit(balance_train).transform(test_df))
del dtree8
gc.collect()

True positive rate: 0.8145695364238411
False positive rate 0.057389962043116485


242

In [39]:
dtree_final = DecisionTreeClassifier(maxDepth=5, minInstancesPerNode=50, labelCol="IF_SHOOT")
model_final = dtree_final.fit(balance_train)
custom_eval(model_final.transform(test_df))

True positive rate: 0.804635761589404
False positive rate 0.050369524534525964


In [40]:
print(*sorted([ (i, n) for (i, n) in
               zip(model_final.featureImportances, selected_lab.values())
               if i != 0], reverse = True), sep='\n')

(0.5079763223171156, 'DISTRICT: B3')
(0.19699819885680744, 'OFFENSE_CODE_GROUP: Firearm Violations')
(0.10013788686387753, 'OFFENSE_CODE_GROUP: Ballistics')
(0.09241739101227918, 'DISTRICT: B2')
(0.08631491855681483, 'DISTRICT: A1')
(0.005579221641675884, 'X')
(0.004746632523628654, 'UCR_PART: Part Three')
(0.0026617732900121524, 'OFFENSE_CODE_GROUP: Aggravated Assault')
(0.0015171478560944923, 'HOUR')
(0.0009474747636907397, 'Y')
(0.0007030323180032877, 'IS_NIGHT')


In [41]:
import re
regx = re.compile(r'(feature (\d+))', re.M)
m_desc = regx.sub(lambda m: selected_lab[f_used[int(m.group(2))]], model_final.toDebugString)
print(m_desc.replace('in {1.0}', 'is True').\
      replace('in {0.0}', 'is False').\
      replace('not is True', 'is False').\
      replace('not is False', 'is True'))

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_47e298cb7fafce275cdc) of depth 5 with 39 nodes
  If (OFFENSE_CODE_GROUP: Aggravated Assault is False)
   If (OFFENSE_CODE_GROUP: Homicide is True)
    If (Y <= 2956095.092)
     If (HOUR <= 9.0)
      If (HOUR <= 4.0)
       Predict: 1.0
      Else (HOUR > 4.0)
       Predict: 1.0
     Else (HOUR > 9.0)
      Predict: 1.0
    Else (Y > 2956095.092)
     Predict: 1.0
   Else (OFFENSE_CODE_GROUP: Homicide is False)
    If (OFFENSE_CODE_GROUP: Warrant Arrests is True)
     If (X <= 774205.8892)
      If (DISTRICT: B2 is False)
       Predict: 1.0
      Else (DISTRICT: B2 is True)
       Predict: 1.0
     Else (X > 774205.8892)
      Predict: 0.0
    Else (OFFENSE_CODE_GROUP: Warrant Arrests is False)
     If (OFFENSE_CODE_GROUP: Firearm Violations is True)
      If (DISTRICT: B2 is True)
       Predict: 1.0
      Else (DISTRICT: B2 is False)
       Predict: 1.0
     Else (OFFENSE_CODE_GROUP: Firearm Violations is False)
      If 