In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%matplotlib inline 

In [47]:
from yaml import load as yaml_load
import os
import sys
import json
sys.path.append('../')
import findspark
findspark.init()
import pyspark
from pyspark.sql.types import LongType
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler, VectorIndexer, StandardScaler
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, udf
from pyspark.sql.types import LongType, StringType, FloatType, IntegerType, DoubleType
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
import re
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml import Pipeline
spark = SparkSession.builder.master("local[*]").appName("Data cleaning").getOrCreate()

In [4]:
from src.data.make_dataset import LoadDataframe

In [5]:
def _load_config_file(config_file):
    """
    Load configuration file
    :param config_file: is the configuration file
    :return: configuration
    :rtype: dict
    """
    with open(config_file) as yml_config:
        return yaml_load(yml_config)

def _build_configuration(config_file):
    """
    Build the operation configuration dict
    :param config_file: is the path to the yaml config_file
    :type: string
    :return: config: global configuration
    :rtype dict
    """
    # yaml config
    config = _load_config_file(config_file)
    return config
def visualisation_prediction(y_test, y_pred):
    import matplotlib
    import matplotlib.pyplot as plt
    matplotlib.rc('xtick', labelsize=30) 
    matplotlib.rc('ytick', labelsize=30) 
    fig, ax = plt.subplots(figsize=(50, 40))
    plt.style.use('ggplot')
    plt.plot(y_pred, y_test, 'ro')
    plt.xlabel('Predicted Crime', fontsize = 30)
    plt.ylabel('Actual Crime', fontsize = 30)
    plt.title('Predicted Y (Crimes) to the Actual Y (Crimes)', fontsize = 30)
    ax.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'k--', lw=4)
    
    
def duration_day_func(x):
    """
    :return:
    """
    from astral import Astral
    city_name = 'Chicago'
    a = Astral()
    a.solar_depression = 'civil'
    city = a[city_name]
    sun = city.sun(date=x, local=True)
    return float((sun['sunset'] - sun['sunrise']).total_seconds())

extract_blok = udf(lambda x : re.findall(r"(\w+)$", x)[0], StringType())
isStreet = udf(lambda x :  1 if x in ['ST', 'St', 'st'] else 0, IntegerType())
isAV = udf(lambda x : 1 if x in ['Ave', 'AV', 'AVE'] else 0, IntegerType())
isBLVD = udf(lambda x : 1 if x in ['BLVD'] else 0, IntegerType())
isRD = udf(lambda x : 1 if x in ['RD'] else 0, IntegerType())
isPL = udf(lambda x : 1 if x in ['PL', 'pl'] else 0, IntegerType())
isBROADWAY = udf(lambda x : 1 if x in ['BROADWAY', 'Broadway'] else 0, IntegerType())
isPKWY = udf(lambda x : 1 if x in ['PKWY', 'Pkwy'] else 0, IntegerType())
duration_day_udf = udf(lambda x :   duration_day_func(x), FloatType())

config_file = "/home/ml/Documents/crimes_chigaco/config/config.yml"
config = _build_configuration(config_file)

In [6]:
%%time
obj_df_loaded = LoadDataframe(config, '2013', '2014')

CPU times: user 396 µs, sys: 51 µs, total: 447 µs
Wall time: 452 µs


In [7]:
df_temp = obj_df_loaded.df_temperature()
df_sky  = obj_df_loaded.df_sky()
df_socio = obj_df_loaded.df_socio()

In [8]:
from pyspark.sql.functions import isnan, when, count, col, isnull

In [9]:
features_socio = ['pct_housing_crowded',
 'pct_households_below_poverty','pct_age16_unemployed',
 'pct_age25_no_highschool',
 'pct_not_working_age',
 'per_capita_income',
 'hardship_index']
for f in features_socio:
    df_socio = df_socio.withColumn(f, df_socio[f].cast(FloatType()))

In [10]:
df_crime = obj_df_loaded.df_crime()
df_crime_socio = df_crime.join(df_socio, ['community_area_number'], "inner")



In [12]:
df_crime_socio = df_crime_socio.na.drop()

In [13]:
df_crime_socio = df_crime_socio.withColumn("block_extract", extract_blok(df_crime_socio.block))

In [14]:
df_crime_socio = (df_crime_socio.
                  withColumn("isStreet",isStreet(df_crime_socio.block_extract)).
                  withColumn("isAV", isAV(df_crime_socio.block_extract)).
                  withColumn("isBLVD", isBLVD(df_crime_socio.block_extract)).
                  withColumn("isRD", isRD(df_crime_socio.block_extract)).
                  withColumn("isPL", isPL(df_crime_socio.block_extract)).
                  withColumn("isBROADWAY", isBROADWAY(df_crime_socio.block_extract)).
                  withColumn("isPKWY", isPKWY(df_crime_socio.block_extract)))

In [15]:
df_crime_socio = df_crime_socio.withColumn('duree_day', duration_day_udf('date'))

In [16]:
df_crime_socio.printSchema()

root
 |-- community_area_number: string (nullable = true)
 |-- id: string (nullable = true)
 |-- cas_number: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- block: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: string (nullable = true)
 |-- domestic: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- district: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- x_coordinate: string (nullable = true)
 |-- y_coordinate: string (nullable = true)
 |-- year: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- location: string (nullable = true)
 |-- community_area_name: string (nullable = true)
 |-- pct_housing_crowded: float (nullable = tru

In [17]:
df_crime_socio.limit(1000).toPandas().sample(5)

Unnamed: 0,community_area_number,id,cas_number,date,block,iucr,primary_type,description,location_description,arrest,...,hardship_index,block_extract,isStreet,isAV,isBLVD,isRD,isPL,isBROADWAY,isPKWY,duree_day
955,23,8951086,HW100206,2013-01-01 02:30:00,016XX N KEDZIE AVE,041A,ASSAULT_BATTERY,AGGRAVATED: HANDGUN,VEHICLE NON-COMMERCIAL,False,...,85.0,AVE,0,1,0,0,0,0,0,33111.0
608,75,21027,HW410418,2013-08-16 15:40:00,011XX W 111TH PL,0110,HOMICIDE,FIRST DEGREE MURDER,HOUSE,False,...,30.0,PL,0,0,0,0,1,0,0,49742.0
672,25,21099,HW468901,2013-09-27 01:22:00,011XX N MAYFIELD AVE,0110,HOMICIDE,FIRST DEGREE MURDER,APARTMENT,True,...,73.0,AVE,0,1,0,0,0,0,0,43035.0
980,6,8951120,HW100378,2013-01-01 05:10:00,017XX W ADDISON ST,1310,CRIMINAL DAMAGE,TO PROPERTY,OTHER,True,...,5.0,ST,1,0,0,0,0,0,0,33117.0
604,43,21023,HW405675,2013-08-13 10:20:00,074XX S COLFAX AVE,0110,HOMICIDE,FIRST DEGREE MURDER,STREET,False,...,55.0,AVE,0,1,0,0,0,0,0,50218.0


In [18]:
df_crime_socio = (
df_crime_socio.withColumn("month", func.month(func.col("date"))).
withColumn("year", func.year(func.col("date"))).
withColumn("day", func.dayofmonth(func.col("date"))).
withColumn("hour", func.hour(func.col("date"))).withColumn("minute", func.minute(func.col("date"))).
withColumn("dayofmonth", func.dayofmonth(func.col("date"))).   
withColumn("dayofyear", func.dayofyear(func.col("date"))).
withColumn("dayofweek", func.dayofweek(func.col("date")))
)

In [19]:
df_crime_socio.limit(100).toPandas().sample(5)

Unnamed: 0,community_area_number,id,cas_number,date,block,iucr,primary_type,description,location_description,arrest,...,isBROADWAY,isPKWY,duree_day,month,day,hour,minute,dayofmonth,dayofyear,dayofweek
73,29,10027097,HY216564,2013-06-01 12:00:00,013XX S SPAULDING AVE,1154,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT $300 AND UNDER,APARTMENT,False,...,0,0,54061.0,6,1,12,0,1,152,7
36,26,10007456,HY196681,2013-03-24 09:00:00,040XX W GLADYS AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,APARTMENT,False,...,0,0,44401.0,3,24,9,0,24,83,1
21,32,9251356,HW395403,2013-07-20 17:00:00,004XX E LOWER WACKER DR,820,THEFT_ROBBERY_BURGLARY,$500 AND UNDER,PARKING LOT/GARAGE(NON.RESID.),False,...,0,0,53212.0,7,20,17,0,20,201,7
54,16,10015624,HY204811,2013-12-24 14:00:00,041XX N CALIFORNIA AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,...,0,0,32890.0,12,24,14,0,24,358,3
58,69,10017381,HY206488,2013-04-01 15:00:00,003XX E 73RD ST,890,THEFT_ROBBERY_BURGLARY,FROM BUILDING,RESIDENCE,False,...,0,0,45794.0,4,1,15,0,1,91,2


In [20]:
df_crime_socio.printSchema()

root
 |-- community_area_number: string (nullable = true)
 |-- id: string (nullable = true)
 |-- cas_number: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- block: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: string (nullable = true)
 |-- domestic: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- district: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- x_coordinate: string (nullable = true)
 |-- y_coordinate: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- location: string (nullable = true)
 |-- community_area_name: string (nullable = true)
 |-- pct_housing_crowded: float (nullable = tr

In [21]:
df_temp.show(5)

+-----------+-------------------+-----+----+---+----+
|Temperature|           datetime|month|year|day|hour|
+-----------+-------------------+-----+----+---+----+
|     273.43|2013-01-01 01:00:00|    1|2013|  1|   1|
|     273.48|2013-01-01 02:00:00|    1|2013|  1|   2|
|     273.27|2013-01-01 03:00:00|    1|2013|  1|   3|
|     273.19|2013-01-01 04:00:00|    1|2013|  1|   4|
|     273.11|2013-01-01 05:00:00|    1|2013|  1|   5|
+-----------+-------------------+-----+----+---+----+
only showing top 5 rows



In [22]:
df_temp = df_temp.drop('datetime')

In [23]:
df_total = df_crime_socio.join(df_temp, ['year', 'month','day','hour'], how = "inner").\
                        join(df_sky, ['year', 'month','day','hour'], how = "inner")

In [24]:
df_total = df_total.na.drop()

In [25]:
df_total.limit(1000).toPandas().sample(5).T

Unnamed: 0,11,384,138,394,14
year,2013,2013,2013,2013,2013
month,2,1,4,1,3
day,7,25,12,29,13
hour,15,14,9,14,20
community_area_number,54,68,32,39,54
id,9001193,20783,10092737,20795,9045921
cas_number,HW147978,HW131095,HY281152,HW136256,HW191139
date,2013-02-07 15:45:00,2013-01-25 14:59:00,2013-04-12 09:00:00,2013-01-29 14:22:00,2013-03-13 20:35:00
block,130XX S LANGLEY AVE,058XX S SHIELDS AVE,002XX N COLUMBUS DR,044XX S OAKENWALD AVE,130XX S LANGLEY AVE
iucr,0460,0110,1120,0110,2027


In [26]:
df_total = df_total.withColumn('latitude', df_total['latitude'].cast(FloatType()))
df_total = df_total.withColumn('longitude', df_total['longitude'].cast(FloatType()))


In [27]:
list_of_crimes = config["List_of_crimes_prediction"]["with_merge_pred"]

In [28]:
df = df_total.where(col('primary_type').isin(list_of_crimes))

In [29]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- community_area_number: string (nullable = true)
 |-- id: string (nullable = true)
 |-- cas_number: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- block: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: string (nullable = true)
 |-- domestic: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- district: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- x_coordinate: string (nullable = true)
 |-- y_coordinate: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- location: string (nulla

In [39]:
categoricalColumns = ['domestic']
numericCols = ['year', 'month', 'day', 'hour','minute', 'latitude',
               'longitude', 'isStreet', 'isAV',
               'isBLVD', 'isRD', 'isPL', 'isBROADWAY',
               'isPKWY', 'duree_day', 'minute','dayofmonth',
               'dayofyear','dayofweek','Temperature',
               'pct_housing_crowded',
               'pct_households_below_poverty',
               'pct_age16_unemployed',
               'pct_age25_no_highschool',
               'pct_not_working_age',
               'per_capita_income',
               'hardship_index',
                'Chicago_broken clouds',
                'Chicago_drizzle',
                 'Chicago_few clouds',
                'Chicago_fog',
                 'Chicago_haze',
                 'Chicago_heavy intensity drizzle',
                 'Chicago_heavy intensity rain',
                 'Chicago_heavy snow',
                 'Chicago_light intensity drizzle',
                 'Chicago_light rain',
                 'Chicago_light rain and snow',
                 'Chicago_light snow',
                 'Chicago_mist',
                 'Chicago_moderate rain',
                 'Chicago_overcast clouds',
                 'Chicago_proximity thunderstorm',
                 'Chicago_scattered clouds',
                 'Chicago_sky is clear',
                 'Chicago_snow',
                 'Chicago_thunderstorm',
                 'Chicago_thunderstorm with heavy rain',
                 'Chicago_thunderstorm with light rain',
                 'Chicago_thunderstorm with rain',
                'Chicago_very heavy rain'
]


In [45]:
stages = []
label_stringIdx = StringIndexer(inputCol = 'primary_type', outputCol = 'label')
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages.append(stringIndexer)
    stages.append(encoder)
stages.append(label_stringIdx)
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
rf = RandomForestClassifier(labelCol= "label", featuresCol="features", numTrees=10)
stages.append(assembler)
stages.append(rf)

In [46]:
stages

[StringIndexer_bd8322ac15e9,
 OneHotEncoderEstimator_0a3fd9634c0a,
 StringIndexer_61bd9a16306c,
 VectorAssembler_00dffcc6e574,
 RandomForestClassifier_fa81955defb2]

In [49]:
pipeline_and_model = Pipeline(stages = stages)

In [None]:
pipeline_model = pipeline_and_model.fit(df)

In [None]:
prepared = pipelineModel.transform(df)

In [None]:
selectedCols = ['label', 'features'] + categoricalColumns + numericCols

In [None]:
df = df.select(selectedCols)
df.printSchema()

In [None]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

In [None]:
df.show()

In [None]:
train, test = df.randomSplit([0.7, 0.3])

In [None]:
train.count()

In [None]:
test.count()