# Crime and Date Dimension Scripts

## Preprocessing for Vancouver
#### Crime Dimension

In [None]:
    def preprocessCrimeCategories(dataframe):
        df1 = dataframe.withColumn("crimeCategory", dataframe["TYPE"])
        df = df1.withColumnRenamed("TYPE","crimeDetails")

        #Group crimes into crime categories
        df2 = df.withColumn('crimeCategory', when(df['crimeCategory'].contains("Homicide"), 1) \
                                    .when(df['crimeCategory'].contains("Offence Against a Person"), 2) \
                                    .when(df['crimeCategory'].contains("Theft of Vehicle"), 5) \
                                    .when(df['crimeCategory'].contains("Theft from Vehicle"), 6) \
                                    .when(df['crimeCategory'].contains("Break and Enter Residential/Other"), 7) \
                                    .when(df['crimeCategory'].contains("Break and Enter Commercial"), 7) \
                                    .when(df['crimeCategory'].contains("Theft of Bicycle"), 8) \
                                    .when(df['crimeCategory'].contains("Other Theft"), 7) \
                                    .when(df['crimeCategory'].contains("Mischief"), 9) \
                                    .when(df['crimeCategory'].contains("Vehicle Collision or Pedestrian Struck (with Fatality)"), 10) \
                                    .when(df['crimeCategory'].contains("Vehicle Collision or Pedestrian Struck (with Injury"), 10).otherwise(0))
        #Group crimes into severity rating (5 is most severe, 1 is least severe)
        df2 = df2.withColumn('crimeSeverity', when(df2['crimeCategory'].contains(1), 5) \
                                    .when(df2['crimeCategory'].contains(2), 4) \
                                    .when(df2['crimeCategory'].contains(3), 4) \
                                    .when(df2['crimeCategory'].contains(4), 4) \
                                    .when(df2['crimeCategory'].contains(5), 3) \
                                    .when(df2['crimeCategory'].contains(6), 3) \
                                    .when(df2['crimeCategory'].contains(7), 3) \
                                    .when(df2['crimeCategory'].contains(8), 2) \
                                    .when(df2['crimeCategory'].contains(9), 1) \
                                    .when(df2['crimeCategory'].contains(10), 1).otherwise(0))
        #group hour column into general time of day
        df2 = df2.withColumn('timeOfDay', when(df1['HOUR'].contains(23), 3) \
                                        .when(df1['HOUR'].contains(22), 3) \
                                        .when(df1['HOUR'].contains(21), 3) \
                                        .when(df1['HOUR'].contains(20), 3) \
                                        .when(df1['HOUR'].contains(19), 3) \
                                        .when(df1['HOUR'].contains(18), 3) \
                                        .when(df1['HOUR'].contains(17), 2) \
                                        .when(df1['HOUR'].contains(16), 2) \
                                        .when(df1['HOUR'].contains(15), 2) \
                                        .when(df1['HOUR'].contains(14), 2) \
                                        .when(df1['HOUR'].contains(13), 2) \
                                        .when(df1['HOUR'].contains(12), 2) \
                                        .when(df1['HOUR'].contains(11), 1) \
                                        .when(df1['HOUR'].contains(10), 1) \
                                        .when(df1['HOUR'].contains(9), 1) \
                                        .when(df1['HOUR'].contains(8), 1) \
                                        .when(df1['HOUR'].contains(7), 1) \
                                        .when(df1['HOUR'].contains(6), 1) \
                                        .when(df1['HOUR'].contains(5), 0) \
                                        .when(df1['HOUR'].contains(4), 0) \
                                        .when(df1['HOUR'].contains(3), 0) \
                                        .when(df1['HOUR'].contains(2), 0) \
                                        .when(df1['HOUR'].contains(1), 0) \
                                        .when(df1['HOUR'].contains(0), 0).otherwise(-1))

        #add startTime and endTime with null representation
        df2 = df2.withColumn('CrimeStartTime', lit(-1).cast(IntegerType()))
        df2 = df2.withColumn('CrimeEndTime', lit(-1).cast(IntegerType()))
        return df2

#### Date Dimension

In [None]:
    def preprocessTimeCategories(dataframe):
        df1 = dataframe.withColumn("year", dataframe["Year"])
        df1 = df1.withColumn("month", df1["MONTH"])
        df1 = df1.withColumn("dayOfMonth", df1["DAY"])

        #create timestamp
        df2 = df1.withColumn("timestamp",
            to_timestamp(concat(df1["year"].cast("String"),
            lit("-"),
            df1["month"].cast("String"),
            lit("-"),
            df1["dayOfMonth"].cast("String"),
         ), "yyyy-MM-dd"))

        #calc dayOfWeek based on timestamp
        df2 = df2.withColumn('dayOfWeek', date_format('timestamp', 'u'))
        df2 = df2.withColumn('isWeekend', when(df2['dayOfWeek'].contains(6), 1).when(df2['dayOfWeek'].contains(7), 1).otherwise(0))
        df2 = df2.withColumn('eventOfSignificance', lit(None).cast(StringType()))

        df2 = df2.withColumn("timestamp", substring(col("timestamp"), 1,10))

        return df2

## Preprocessing for Denver
#### Crime Dimension

In [None]:
    def preprocessCrimeCategoriesDen(dataframe):
        df1 = dataframe.withColumn("crimeCategory", dataframe["OFFENSE_CATEGORY_ID"])
        df = df1.withColumnRenamed("OFFENSE_TYPE_ID","crimeDetails")

        #Group crimes into crime categories
        df2 = df.withColumn('crimeCategory', when(df['crimeCategory'].contains("Murder"), 1) \
                                    .when(df['crimeCategory'].contains("other-crimes-against-persons"), 2) \
                                    .when(df['crimeCategory'].contains("aggravated-assault"), 2) \
                                    .when(df['crimeCategory'].contains("sexual-assault"), 3) \
                                    .when(df['crimeCategory'].contains("arson"), 4) \
                                    .when(df['crimeCategory'].contains("auto-theft"), 5) \
                                    .when(df['crimeCategory'].contains("theft-from-motor-vehicle"), 6) \
                                    .when(df['crimeCategory'].contains("burglary"), 7) \
                                    .when(df['crimeCategory'].contains("robbery"), 7) \
                                    .when(df['crimeCategory'].contains("larceny"), 7) \
                                    .when(df['crimeCategory'].contains("white-collar-crime"), 8) \
                                    .when(df['crimeCategory'].contains("all-other-crimes"), 8) \
                                    .when(df['crimeCategory'].contains("drug-alcohol"), 8) \
                                    .when(df['crimeCategory'].contains("public-disorder "), 9) \
                                    .when(df['crimeCategory'].contains("traffic-accident"), 10).otherwise(0))
        #Group crimes into severity rating (5 is most severe, 1 is least severe)
        df2 = df2.withColumn('crimeSeverity', when(df2['crimeCategory'].contains(1), 5) \
                                    .when(df2['crimeCategory'].contains(2), 4) \
                                    .when(df2['crimeCategory'].contains(3), 4) \
                                    .when(df2['crimeCategory'].contains(4), 4) \
                                    .when(df2['crimeCategory'].contains(5), 3) \
                                    .when(df2['crimeCategory'].contains(6), 3) \
                                    .when(df2['crimeCategory'].contains(7), 3) \
                                    .when(df2['crimeCategory'].contains(8), 2) \
                                    .when(df2['crimeCategory'].contains(9), 1) \
                                    .when(df2['crimeCategory'].contains(10), 1).otherwise(0))
        #convert into proper timestamp format
        df2 = df2.withColumn("crimeStartTime", from_unixtime(unix_timestamp("FIRST_OCCURRENCE_DATE",'MM/dd/yyyy hh:mm:ss aa'),'MM/dd/yyyy HH:mm:ss'))
        df2 = df2.withColumn("crimeEndTime", from_unixtime(unix_timestamp("LAST_OCCURRENCE_DATE",'MM/dd/yyyy hh:mm:ss aa'),'MM/dd/yyyy HH:mm:ss'))

        #group hour column into general time of day
        df2 = df2.withColumn("timestamp", from_unixtime(unix_timestamp("REPORTED_DATE",'MM/dd/yyyy hh:mm:ss aa'),'MM/dd/yyyy HH'))
        df2 = df2.withColumn("HOUR", substring(col("timestamp"), -2,2))
        df2 = df2.withColumn('timeOfDay', when(df2['HOUR'].contains(23), 3) \
                                            .when(df2['HOUR'].contains(22), 3) \
                                            .when(df2['HOUR'].contains(21), 3) \
                                            .when(df2['HOUR'].contains(20), 3) \
                                            .when(df2['HOUR'].contains(19), 3) \
                                            .when(df2['HOUR'].contains(18), 3) \
                                            .when(df2['HOUR'].contains(17), 2) \
                                            .when(df2['HOUR'].contains(16), 2) \
                                            .when(df2['HOUR'].contains(15), 2) \
                                            .when(df2['HOUR'].contains(14), 2) \
                                            .when(df2['HOUR'].contains(13), 2) \
                                            .when(df2['HOUR'].contains(12), 2) \
                                            .when(df2['HOUR'].contains(11), 1) \
                                            .when(df2['HOUR'].contains(10), 1) \
                                            .when(df2['HOUR'].contains(9), 1) \
                                            .when(df2['HOUR'].contains(8), 1) \
                                            .when(df2['HOUR'].contains(7), 1) \
                                            .when(df2['HOUR'].contains(6), 1) \
                                            .when(df2['HOUR'].contains(5), 0) \
                                            .when(df2['HOUR'].contains(4), 0) \
                                            .when(df2['HOUR'].contains(3), 0) \
                                            .when(df2['HOUR'].contains(2), 0) \
                                            .when(df2['HOUR'].contains(1), 0) \
                                            .when(df2['HOUR'].contains(0), 0).otherwise(-1))

        return df2

#### Date Dimension

In [None]:
    def preprocessTimeCategoriesDen(dataframe):

        #convert to timestamp
        df1 = dataframe.withColumn("timestamp1", from_unixtime(unix_timestamp("REPORTED_DATE",'MM/dd/yyyy hh:mm:ss aa'),'MM/dd/yyyy HH'))

        #create year, month, dayOfMonth, HOUR columns from timestamp
        df2 = df1.withColumn("year", substring(col("timestamp1"), -7,4))
        df2 = df2.withColumn("month", substring(col("timestamp1"), 1,2))
        df2 = df2.withColumn("dayOfMonth", substring(col("timestamp1"), 4,2))
        df2 = df2.withColumn("HOUR", substring(col("timestamp1"), -2,2))

        #create timestamp (with a different format) from existing string columns
        df2 = df2.withColumn("timestamp",
            to_timestamp(concat(df2["year"].cast("String"),
            lit("-"),
            df2["month"].cast("String"),
            lit("-"),
            df2["dayOfMonth"].cast("String"),
         ), "yyyy-MM-dd"))

        #calc dayOfWeek based on timestamp
        df2 = df2.withColumn('dayOfWeek', date_format('timestamp', 'u'))
        df2 = df2.withColumn('isWeekend', when(df2['dayOfWeek'].contains(6), 1).when(df2['dayOfWeek'].contains(7), 1).otherwise(0))
        df2 = df2.withColumn("timestamp", substring(col("timestamp"), 1,10))
        return df2


#### Main Function

In [None]:
if __name__ == '__main__':

    start_time = time.time()

    #Spark session created
    spark = SparkSession.builder.appName('Data processing').getOrCreate()

	#load dataframes
    crimeVan_dataframe = spark.read.csv('/Users/nicgardin/Documents/2020_Classes_Master/DataScience/crime_datasets/crimeVan.csv', header=True)
    crimeDen_dataframe = spark.read.csv('/Users/nicgardin/Documents/2020_Classes_Master/DataScience/crime_datasets/crimeDen.csv', header=True)

    van_crimeDim = preprocessCrimeCategories(crimeVan_dataframe)
    van_crimeDim = van_crimeDim.select("crimeCategory", "crimeSeverity", "timeOfDay", "crimeStartTime", "crimeEndTime", "crimeDetails")
    van_crimeDim.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/Users/nicgardin/Documents/2020_Classes_Master/DataScience/outputCSV/van_crimeDim")

    van_timeDim = preprocessTimeCategories(crimeVan_dataframe)
    van_timeDim = van_timeDim.select("dayOfWeek", "dayOfMonth", "month",  "year", "isWeekend", "timestamp")
    van_timeDim.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/Users/nicgardin/Documents/2020_Classes_Master/DataScience/outputCSV/van_timeDim")

    den_crimeDim = preprocessCrimeCategoriesDen(crimeDen_dataframe)
    den_crimeDim = den_crimeDim.select("crimeCategory", "crimeSeverity", "timeOfDay", "crimeStartTime", "crimeEndTime", "crimeDetails")
    den_crimeDim.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/Users/nicgardin/Documents/2020_Classes_Master/DataScience/outputCSV/den_crimeDim")

    den_timeDim = preprocessTimeCategoriesDen(crimeDen_dataframe)
    den_timeDim = den_timeDim.select("dayOfWeek", "dayOfMonth", "month",  "year", "isWeekend", "timestamp")
    den_timeDim.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("/Users/nicgardin/Documents/2020_Classes_Master/DataScience/outputCSV/den_timeDim")