In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction, col
from pyspark.sql.types import *

In [None]:
crime_one = spark.read.csv("wasb:///ChicagoCrimeData/Chicago_Crimes_2001_to_2004.csv", header=True, inferSchema=True)
crime_two = spark.read.csv("wasb:///ChicagoCrimeData/Chicago_Crimes_2005_to_2007.csv", header=True, inferSchema=True)
crime_three = spark.read.csv("wasb:///ChicagoCrimeData/Chicago_Crimes_2008_to_2011.csv", header=True, inferSchema=True)
crime_four = spark.read.csv("wasb:///ChicagoCrimeData/Chicago_Crimes_2012_to_2017.csv", header=True, inferSchema=True)

In [None]:
crime_one.printSchema();

In [None]:
crime_two.printSchema();

In [None]:
crime_three.printSchema();

In [None]:
crime_four.printSchema()

In [None]:
crime_schema = {field.name:field.dataType for field in crime_two.schema.fields}
crime_one_cols = crime_one.columns

In [None]:
for i in crime_one_cols:
    crime_one = crime_one.withColumn(i, crime_one[i].cast(crime_schema[i]))

In [None]:
ChicagoCrime = crime_one.union(crime_two).union(crime_three).union(crime_four)

In [None]:
from functools import reduce
OldColumnNames = ChicagoCrime.columns
NewColumnNames = ['_c0', 'ID', 'CaseNumber', 'Date', 'Block', 'IUCR', 'PrimaryType', 'Description', 'LocationDescription', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'CommunityArea', 'FBICode', 'XCoordinate', 'YCoordinate', 'Year', 'UpdatedOn', 'Latitude', 'Longitude', 'Location']
ChicagoCrime = reduce(lambda ChicagoCrime, idx: ChicagoCrime.withColumnRenamed(OldColumnNames[idx], NewColumnNames[idx]), range(len(OldColumnNames)), ChicagoCrime)
ChicagoCrime.printSchema()

In [None]:
ChicagoCrime.count()

In [None]:
ChicagoCrime_Cols = ['ID', 'CaseNumber', 'Date', 'Block', 'IUCR', 'PrimaryType', 'Description', 'LocationDescription', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'CommunityArea', 'FBICode', 'Year', 'UpdatedOn']
ChicagoCrime = ChicagoCrime.select(ChicagoCrime_Cols)
ChicagoCrime.printSchema()

In [None]:
ChicagoCrime.fillna('Missing Data', ChicagoCrime_Cols)

In [None]:
ChicagoCrime.registerTempTable("ChicagoCrime")

In [None]:
%%sql
SELECT *
FROM ChicagoCrime
LIMIT 10

In [None]:
%%sql
SELECT DISTINCT(PrimaryType)
FROM ChicagoCrime

In [None]:
%%sql
SELECT 
    ID
    ,CaseNumber
    ,Date
    ,Block
    ,IUCR
    ,PrimaryType
    ,Description
    ,LocationDescription
    ,Arrest
    ,Domestic
    ,Beat
    ,District
    ,Ward
    ,CommunityArea
    ,FBICode
    ,Year
    ,UpdatedOn
    ,CASE
        WHEN PrimaryType IN ('OFFENSE INVOLVING CHILDREN', 'ARSON', 'DOMESTIC VIOLENCE', 'ASSAULT', 'ROBBERY', 'HOMICIDE', 'CRIM SEXUAL ASSAULT', 'SEX OFFENSE', 'BURGLARY')
        THEN 'SERIOUS CRIME'
        ELSE 'NON-SERIOUS CRIME'
    END AS SeriousCrime
    ,CASE
        WHEN PrimaryType IN ('OFFENSE INVOLVING CHILDREN', 'ARSON', 'DOMESTIC VIOLENCE', 'ASSAULT', 'ROBBERY', 'HOMICIDE', 'CRIM SEXUAL ASSAULT', 'SEX OFFENSE', 'BURGLARY')
        THEN 1
        ELSE 0
    END AS SeriousCrimeIndicator
FROM ChicagoCrime

In [74]:
ChicagoCrime = sqlContext.sql(
"SELECT ID ,CaseNumber ,Date ,Block ,IUCR, PrimaryType ,Description ,LocationDescription ,Arrest ,Domestic ,Beat ,District ,Ward "
+ ",CommunityArea ,FBICode ,Year ,UpdatedOn "
+ ",CASE "
+ "WHEN PrimaryType IN ('OFFENSE INVOLVING CHILDREN', 'ARSON', 'DOMESTIC VIOLENCE', 'ASSAULT', 'ROBBERY', 'HOMICIDE', 'CRIM SEXUAL ASSAULT', 'SEX OFFENSE', 'BURGLARY') "
+ "THEN 'SERIOUS CRIME' "
+ "ELSE 'NON-SERIOUS CRIME' "
+ "END AS SeriousCrime "
+ ",CASE "
+ "WHEN PrimaryType IN ('OFFENSE INVOLVING CHILDREN', 'ARSON', 'DOMESTIC VIOLENCE', 'ASSAULT', 'ROBBERY', 'HOMICIDE', 'CRIM SEXUAL ASSAULT', 'SEX OFFENSE', 'BURGLARY')"
+ "THEN 1 "
+ "ELSE 0 "
+ "END AS SeriousCrimeIndicator "
+ "FROM ChicagoCrime"
)

In [None]:
# Uncomment the following line to write .csv to blob storage
ChicagoCrime.write.csv('wasb:///ChicagoCrimeData/CompleteCrime.csv')