In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.1`
import $ivy.`sh.almond::almond-spark:0.6.0`
import org.apache.log4j.{Level, Logger}

Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = NotebookSparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
import spark.sqlContext.implicits._

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                              
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}

[39m
[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@1e0733dc
[32mimport [39m[36mspark.implicits._
[39m
[32mimport [39m[36mspark.sqlContext.implicits._[39m

In [2]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m

In [3]:
val crimeCsvPath = "crimes-in-boston/crime.csv"
val offenceCodesCsvPath = "crimes-in-boston/offense_codes.csv"
val outputFolderPath = "crimes-in-boston/outputs"

[36mcrimeCsvPath[39m: [32mString[39m = [32m"crimes-in-boston/crime.csv"[39m
[36moffenceCodesCsvPath[39m: [32mString[39m = [32m"crimes-in-boston/offense_codes.csv"[39m
[36moutputFolderPath[39m: [32mString[39m = [32m"crimes-in-boston/outputs"[39m

In [16]:
val rawCrimes = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(crimeCsvPath)
    .cache()

val rawOffenceCodes = spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(offenceCodesCsvPath)
    .withColumn("pretty_name", split($"NAME", "\\s*-\\s*")(0))

[36mrawCrimes[39m: [32mDataset[39m[[32mRow[39m] = [INCIDENT_NUMBER: string, OFFENSE_CODE: int ... 15 more fields]
[36mrawOffenceCodes[39m: [32mDataFrame[39m = [CODE: int, NAME: string ... 1 more field]

In [17]:
rawCrimes.printSchema

root
 |-- INCIDENT_NUMBER: string (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- OFFENSE_CODE_GROUP: string (nullable = true)
 |-- OFFENSE_DESCRIPTION: string (nullable = true)
 |-- DISTRICT: string (nullable = true)
 |-- REPORTING_AREA: string (nullable = true)
 |-- SHOOTING: string (nullable = true)
 |-- OCCURRED_ON_DATE: timestamp (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- UCR_PART: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Location: string (nullable = true)



In [18]:
val percentileFunc = expr("percentile_approx(crimes_monthly, 0.5)");

// rawCrimes.withColumn("crimes_monthly", percentileFunc.over(perMonthWindow))

val monthlyCrimes = rawCrimes
    .groupBy($"DISTRICT", $"YEAR", $"MONTH")
    .agg(count("*").as("crimes_monthly"))
    .groupBy($"DISTRICT")
    .agg(percentileFunc.alias("crimes_monthly"))

[36mpercentileFunc[39m: [32mColumn[39m = percentile_approx(crimes_monthly, 0.5)
[36mmonthlyCrimes[39m: [32mDataFrame[39m = [DISTRICT: string, crimes_monthly: bigint]

In [19]:
val perDistrictWindow = Window.partitionBy($"DISTRICT").orderBy(desc("crime_type_count"))
val topCrimeTypeCount = 3

val topCrimeTypes = rawCrimes
    .groupBy($"DISTRICT", $"OFFENSE_CODE")
    .agg(count("OFFENSE_CODE").as("crime_type_count"))
    .withColumn("offence_rank", rank().over(perDistrictWindow))
    .where($"offence_rank" <= topCrimeTypeCount)
    .join(broadcast(rawOffenceCodes), $"OFFENSE_CODE" === rawOffenceCodes("CODE"))
    .groupBy($"DISTRICT")
    .agg(collect_set("pretty_name").alias("frequent_crime_types"))

[36mperDistrictWindow[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@226c4197
[36mtopCrimeTypeCount[39m: [32mInt[39m = [32m3[39m
[36mtopCrimeTypes[39m: [32mDataFrame[39m = [DISTRICT: string, frequent_crime_types: array<string>]

In [20]:
rawCrimes
    .groupBy($"DISTRICT")
    .agg(count("*").alias("crimes_total"),
         avg("Lat").alias("lat"),
         avg("Long").alias("lng"))
    .join(topCrimeTypes, "DISTRICT")
    .join(monthlyCrimes, "DISTRICT")
    .select($"crimes_total", $"crimes_monthly", $"frequent_crime_types", $"lat", $"lng")
    .write.parquet(outputFolderPath)