#### импорт данных из csv-файлов

In [3]:
import $ivy.`org.apache.spark::spark-sql:2.4.0`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = NotebookSparkSession
    .builder()
    .master("local[*]")
    .getOrCreate()

val crimeFacts = spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("data/crime.csv")

val offenseCodes = spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("data/offense_codes.csv")

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val offenseCodesBroadcast = broadcast(offenseCodes)

Getting spark JARs
Creating SparkSession


[32mimport [39m[36m$ivy.$                                  

[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@677d8c4
[36mcrimeFacts[39m: [32mDataFrame[39m = [INCIDENT_NUMBER: string, OFFENSE_CODE: int ... 15 more fields]
[36moffenseCodes[39m: [32mDataFrame[39m = [CODE: int, NAME: string]
[32mimport [39m[36mspark.implicits._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window

[39m
[36moffenseCodesBroadcast[39m: [32mDataset[39m[[32mRow[39m] = [CODE: int, NAME: string]

#### crimes_total - общее количество преступлений в этом районе

In [4]:
val crimesTotal = crimeFacts
    .withColumn("DISTRICT", when($"DISTRICT".isNull, "n/d").otherwise($"DISTRICT"))
    .groupBy($"DISTRICT")
    .agg(count("*")
    .alias("crimes_total"))

[36mcrimesTotal[39m: [32mDataFrame[39m = [DISTRICT: string, crimes_total: bigint]

#### crimes_monthly - медиана числа преступлений в месяц в этом районе

In [6]:
val crimesMonthlyCounts = crimeFacts
    .groupBy($"DISTRICT", $"YEAR", $"MONTH")
    .agg(count("*").alias("monthly_counts"))
    .orderBy($"DISTRICT")

crimesMonthlyCounts.createOrReplaceTempView("monthly")
val crimesMonthly = spark
    .sql("select coalesce(DISTRICT, 'n/d') as DISTRICT, percentile_approx(monthly_counts, 0.5) as crimes_monthly from monthly group by DISTRICT")


[36mcrimesMonthlyCounts[39m: [32mDataset[39m[[32mRow[39m] = [DISTRICT: string, YEAR: int ... 2 more fields]
[36mcrimesMonthly[39m: [32mDataFrame[39m = [DISTRICT: string, crimes_monthly: bigint]

#### lat - широта координаты района, расчитанная как среднее по всем широтам инцидентов
#### lng - долгота координаты района, расчитанная как среднее по всем долготам инцидентов

In [7]:
crimeFacts.createOrReplaceTempView("crimes")
val crimesLatLng = spark
    .sql("select coalesce(DISTRICT, 'n/d') as DISTRICT, avg(Lat) as lat, avg(Long) as lng from crimes group by DISTRICT")


[36mcrimesLatLng[39m: [32mDataFrame[39m = [DISTRICT: string, lat: double ... 1 more field]

#### frequent_crime_types - три самых частых crime_type за всю историю наблюдений в этом районе, объединенных через запятую с одним пробелом “, ” , расположенных в порядке убывания частоты

In [11]:
val crimesTypeCounts = crimeFacts
    .join(offenseCodesBroadcast, $"CODE" === $"OFFENSE_CODE")
    .groupBy($"DISTRICT", substring_index($"NAME", " -", 1).as("CRIME_TYPE"))
    .count()

val windowSpec = Window.partitionBy($"DISTRICT").orderBy($"DISTRICT".asc, $"count".desc)
val crimesTypeTop3 = crimesTypeCounts
    .withColumn("CRIME_TYPE",  concat($"CRIME_TYPE", lit(" ("), $"count".cast("string"), lit(")")))
    .withColumn("RANK", rank().over(windowSpec))
    .filter($"RANK" <= 3)

val frequentCrimeTypes = crimesTypeTop3
    .withColumn("DISTRICT", when($"DISTRICT".isNull, "n/d").otherwise($"DISTRICT"))
    .groupBy($"DISTRICT")
    .agg(concat_ws(", ", collect_list($"CRIME_TYPE")).as("frequent_crime_types"))
    .orderBy($"DISTRICT".asc)

[36mcrimesTypeCounts[39m: [32mDataFrame[39m = [DISTRICT: string, CRIME_TYPE: string ... 1 more field]
[36mwindowSpec[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@6b80012c
[36mcrimesTypeTop3[39m: [32mDataset[39m[[32mRow[39m] = [DISTRICT: string, CRIME_TYPE: string ... 2 more fields]
[36mfrequentCrimeTypes[39m: [32mDataset[39m[[32mRow[39m] = [DISTRICT: string, frequent_crime_types: string]

### итог

In [10]:
crimesTotal.alias("a")
    .join(crimesMonthly.alias("b"), crimesTotal("DISTRICT") === crimesMonthly("DISTRICT"), "inner")
    .join(frequentCrimeTypes.alias("c"), crimesTotal("DISTRICT") === frequentCrimeTypes("DISTRICT"), "inner")
    .join(crimesLatLng.alias("d"), crimesTotal("DISTRICT") === crimesLatLng("DISTRICT"), "inner")
    .select("a.DISTRICT", "a.crimes_total", "b.crimes_monthly", "c.frequent_crime_types", "d.lat", "d.lng")
    .orderBy($"DISTRICT")
    .show()

+--------+------------+--------------+--------------------+------------------+-------------------+
|DISTRICT|crimes_total|crimes_monthly|frequent_crime_types|               lat|                lng|
+--------+------------+--------------+--------------------+------------------+-------------------+
|      A1|       35717|           904|PROPERTY (5300), ...| 42.33123077259832| -71.01991881362024|
|     A15|        6505|           160|INVESTIGATE PERSO...| 42.17915525091079| -70.74472508958506|
|      A7|       13544|           344|SICK/INJURED/MEDI...| 42.36070260499384| -71.00394833039846|
|      B2|       49945|          1298|VERBAL DISPUTE (6...|42.316003677327735| -71.07569930654353|
|      B3|       35442|           907|VERBAL DISPUTE (5...| 42.28305944520094| -71.07894914185495|
|     C11|       42530|          1115|SICK/INJURED/MEDI...| 42.29263740900062| -71.05125995734369|
|      C6|       23460|           593|SICK/INJURED/MEDI...| 42.21212258445543| -70.85561011772238|
|     D14|