## OTUS-DE - HW4 Сборка витрины на PySpark

В этом задании предлагается собрать статистику по криминогенной обстановке в разных районах Бостона. 
В качестве исходных данных используется датасет **[Kaggle - Crimes in Boston](https://www.kaggle.com/AnalyzeBoston/crimes-in-boston)**


### Цель
    * Разработать программу построения витрины.

### Подсказки
* Функция `percentile_approx` может посчитать медиану.
* В справочнике кодов есть дубликаты. Нужно выбрать уникальные коды, взяв любое из названий.

### Шаги
1. Загрузить данные.
    * Проверить данные на корректность, наличие дубликатов. Очистить.
2. Собрать витрину (агрегат по районам (поле **district**) со следующими метриками:
    * **crimes_total** - общее количество преступлений в этом районе
    * **crimes_monthly** - медиана числа преступлений в месяц в этом районе
    * **frequent_crime_types** - три самых частых crime_type за всю историю наблюдений в этом районе, объединенных через запятую с одним пробелом “, ” , расположенных в порядке убывания частоты
        * **crime_type** - первая часть NAME из таблицы offense_codes, разбитого по разделителю “-” (например, если NAME “BURGLARY - COMMERICAL - ATTEMPT”, то crime_type “BURGLARY”)
    * **lat** - широта координаты района, рассчитанная как среднее по всем широтам инцидентов
    * **lng** - долгота координаты района, рассчитанная как среднее по всем долготам инцидентов
3. Сохранить витрину в один файл в формате .parquet в папке path/to/output_folder.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Window

### Переменные - пути к файлам и к итоговой папке для parquet-файла

In [2]:
codes_file = ".\data\offense_codes.csv"
crime_file = ".\data\crime.csv"
parquet_folder = ".\output"

In [3]:
spark = SparkSession.builder.master("local[*]").appName("crimes_in_boston_analysis").getOrCreate()

### Произведем анализ и предварительную подготовку в наборе данных файла - offense_codes.csv

In [4]:
df_codes_src = spark.read.format("csv").option("mode", "FAILFAST").option("inferSchema", "true") \
    .option("header","true").option("encoding", "latin1").option("path", codes_file ) \
    .load()

df_codes_src.show(10, truncate=False)

+----+------------------------------------------+
|CODE|NAME                                      |
+----+------------------------------------------+
|612 |LARCENY PURSE SNATCH - NO FORCE           |
|613 |LARCENY SHOPLIFTING                       |
|615 |LARCENY THEFT OF MV PARTS & ACCESSORIES   |
|1731|INCEST                                    |
|3111|LICENSE PREMISE VIOLATION                 |
|2646|LIQUOR - DRINKING IN PUBLIC               |
|2204|LIQUOR LAW VIOLATION                      |
|3810|M/V ACCIDENT - INVOLVING ÊBICYCLE - INJURY|
|3801|M/V ACCIDENT - OTHER                      |
|3807|M/V ACCIDENT - OTHER CITY VEHICLE         |
+----+------------------------------------------+
only showing top 10 rows



In [5]:
df_codes_src.printSchema()

root
 |-- CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)



In [6]:
df_codes_src.count()

576

#### Проверка нулевых значений

In [7]:
df_codes_src.where("CODE IS NULL OR NAME IS NULL").show(truncate=False)

+----+----+
|CODE|NAME|
+----+----+
+----+----+



#### Проверка на дублирование кодов преступлений и список наименование преступлений в дубликатах через ","

In [8]:
df_codes_src.groupBy(col("CODE")) \
    .agg(count(col("CODE")).alias("CODE_COUNT"), collect_list(col("NAME")).alias("NAME_LIST")) \
    .orderBy(col("CODE_COUNT").desc()) \
    .show(truncate=False)

+----+----------+------------------------------------------------------------------------------------------------+
|CODE|CODE_COUNT|NAME_LIST                                                                                       |
+----+----------+------------------------------------------------------------------------------------------------+
|2608|3         |[CHINS, CHINS, CHINS]                                                                           |
|2610|2         |[TRESPASSING, TRESPASSING]                                                                      |
|2663|2         |[VIOLATION - CITY ORDINANCE CONSTRUCTION PERMIT, VIOLATION - CITY ORDINANCE CONSTRUCTION PERMIT]|
|2006|2         |[VIOLATION - RESTRAINING ORDER, VIOL. OF RESTRAINING ORDER W ARREST]                            |
|2907|2         |[M/V - LEAVING SCENE - PROPERTY DAMAGE, VAL - OPERATING AFTER REV/SUSP.]                        |
|1415|2         |[VANDALISM - GRAFFITI, GRAFFITI]                               

#### Формирование финального набора данных df_codes
* Оставляем уникальные коды, взяв любое из названий в дубликатах.
* Формируем столбец **crime_type** - первая часть NAME из таблицы offense_codes, разбитого по разделителю “-” (например, если NAME “BURGLARY - COMMERICAL - ATTEMPT”, то crime_type “BURGLARY”)

In [9]:
df_codes = df_codes_src \
    .withColumn("row_num", row_number().over( Window.partitionBy("CODE").orderBy(col("NAME").desc()) )) \
    .filter(col("row_num") == 1).drop("row_num") \
    .withColumn('CRIME_TYPE', substring_index("NAME", " -", 1)) \
    .orderBy(col("CODE")) 

df_codes.show(20, truncate=False)

+----+--------------------------------------------------+-----------------------------------+
|CODE|NAME                                              |CRIME_TYPE                         |
+----+--------------------------------------------------+-----------------------------------+
|111 |MURDER, NON-NEGLIGIENT MANSLAUGHTER               |MURDER, NON-NEGLIGIENT MANSLAUGHTER|
|112 |KILLING OF FELON BY POLICE                        |KILLING OF FELON BY POLICE         |
|113 |KILLING OF FELON BY CITIZEN                       |KILLING OF FELON BY CITIZEN        |
|114 |KILLING OF POLICE BY FELON                        |KILLING OF POLICE BY FELON         |
|121 |MANSLAUGHTER - VEHICLE - NEGLIGENCE               |MANSLAUGHTER                       |
|122 |MANSLAUGHTER - TRAIN ETC. VICTIM NON-NEGLIGENCE   |MANSLAUGHTER                       |
|123 |MANSLAUGHTER - NON-VEHICLE - NEGLIGENCE           |MANSLAUGHTER                       |
|124 |MANSLAUGHTER - VEHICLE - NEGLIGENCE OF VICTIM     |MAN

In [10]:
df_codes.count()

425

### Произведем анализ и предварительную подготовку в наборе данных файла - crime.csv
* Сразу оставляем только необходимые столбцы из датасета

In [11]:
df_crime_src = spark.read.format("csv").option("mode", "FAILFAST").option("inferSchema", "true") \
    .option("header","true").option("path", crime_file).option("encoding", "latin1") \
    .load() \
    .select("DISTRICT", "YEAR", "MONTH", "OFFENSE_CODE", "Lat", "Long")

df_crime_src.show(10, truncate=False)

+--------+----+-----+------------+-----------+------------+
|DISTRICT|YEAR|MONTH|OFFENSE_CODE|Lat        |Long        |
+--------+----+-----+------------+-----------+------------+
|D14     |2018|9    |619         |42.35779134|-71.13937053|
|C11     |2018|8    |1402        |42.30682138|-71.06030035|
|D4      |2018|9    |3410        |42.34658879|-71.07242943|
|D4      |2018|9    |3114        |42.33418175|-71.07866441|
|B3      |2018|9    |3114        |42.27536542|-71.09036101|
|C11     |2018|9    |3820        |42.29019621|-71.07159012|
|B2      |2018|9    |724         |42.30607218|-71.0827326 |
|B2      |2018|9    |3301        |42.32701648|-71.10555088|
|C6      |2018|9    |301         |42.33152148|-71.07085307|
|C11     |2018|9    |3301        |42.29514664|-71.05860832|
+--------+----+-----+------------+-----------+------------+
only showing top 10 rows



In [12]:
df_crime_src.printSchema()

root
 |-- DISTRICT: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- OFFENSE_CODE: integer (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)



In [13]:
df_crime_src.count()

319073

#### Проверка нулевых значений

In [14]:
df_crime_src.where("DISTRICT IS NULL").count()

1765

In [15]:
df_crime_src.where("OFFENSE_CODE IS NULL").count()

0

In [16]:
df_crime_src.select('Lat', 'Long').distinct().orderBy("Lat", "Long").show(truncate=False)

+-----------+------------+
|Lat        |Long        |
+-----------+------------+
|null       |null        |
|-1.0       |-1.0        |
|42.2324133 |-71.12971531|
|42.23265556|-71.13069992|
|42.23287025|-71.13004959|
|42.23290729|-71.13167059|
|42.2330858 |-71.12815697|
|42.23312147|-71.13102697|
|42.23315741|-71.13265354|
|42.23334151|-71.13368781|
|42.23337226|-71.13200359|
|42.23346994|-71.13862962|
|42.23357448|-71.12917748|
|42.23358147|-71.13453781|
|42.23362594|-71.13299328|
|42.23371303|-71.13129839|
|42.23375394|-71.13349324|
|42.23382322|-71.13015658|
|42.23396095|-71.13228408|
|42.23405146|-71.13882916|
+-----------+------------+
only showing top 20 rows



In [17]:
df_crime_src.where("(Lat=-1 AND Long=-1) OR Lat IS NULL OR Long IS NULL").count()

20744

* **20744** Строки с нулевыми или неверными значениями координат - при расчете средних значений широты и долготы, такие строки отфильтруем 

#### Формирование финального  набора данных  df_crime
* **1765** строк с пустыми значениями поля DISTRICT. Далее заменим такие значения на **N/A** и проведем расчет витрины

In [18]:
df_crime = df_crime_src \
    .withColumn('DISTRICT', when(df_crime_src.DISTRICT.isNull() , 'N/A') \
    .otherwise(df_crime_src.DISTRICT)) 

df_crime.show(truncate=False)

+--------+----+-----+------------+-----------+------------+
|DISTRICT|YEAR|MONTH|OFFENSE_CODE|Lat        |Long        |
+--------+----+-----+------------+-----------+------------+
|D14     |2018|9    |619         |42.35779134|-71.13937053|
|C11     |2018|8    |1402        |42.30682138|-71.06030035|
|D4      |2018|9    |3410        |42.34658879|-71.07242943|
|D4      |2018|9    |3114        |42.33418175|-71.07866441|
|B3      |2018|9    |3114        |42.27536542|-71.09036101|
|C11     |2018|9    |3820        |42.29019621|-71.07159012|
|B2      |2018|9    |724         |42.30607218|-71.0827326 |
|B2      |2018|9    |3301        |42.32701648|-71.10555088|
|C6      |2018|9    |301         |42.33152148|-71.07085307|
|C11     |2018|9    |3301        |42.29514664|-71.05860832|
|C6      |2018|9    |3301        |42.31957856|-71.04032766|
|C6      |2018|9    |3114        |42.34011469|-71.05339029|
|D4      |2018|9    |3108        |42.3503876 |-71.0878529 |
|B3      |2018|9    |2647        |42.286

In [19]:
df_crime.select('DISTRICT').distinct().show(truncate=False)

+--------+
|DISTRICT|
+--------+
|C6      |
|B2      |
|C11     |
|E13     |
|B3      |
|E5      |
|A15     |
|N/A     |
|A7      |
|D14     |
|D4      |
|E18     |
|A1      |
+--------+



### Объединяем набор данных df_crime с df_codes

In [20]:
df_crime = df_crime.join(df_codes, df_crime['OFFENSE_CODE'] == df_codes['CODE']) \
    .drop("CODE", "NAME") \
    .cache()

In [21]:
df_crime.show(10, truncate=False)

+--------+----+-----+------------+-----------+------------+---------------------------------+
|DISTRICT|YEAR|MONTH|OFFENSE_CODE|Lat        |Long        |CRIME_TYPE                       |
+--------+----+-----+------------+-----------+------------+---------------------------------+
|D14     |2018|9    |619         |42.35779134|-71.13937053|LARCENY OTHER $200 & OVER        |
|C11     |2018|8    |1402        |42.30682138|-71.06030035|VANDALISM                        |
|D4      |2018|9    |3410        |42.34658879|-71.07242943|TOWED MOTOR VEHICLE              |
|D4      |2018|9    |3114        |42.33418175|-71.07866441|INVESTIGATE PROPERTY             |
|B3      |2018|9    |3114        |42.27536542|-71.09036101|INVESTIGATE PROPERTY             |
|C11     |2018|9    |3820        |42.29019621|-71.07159012|M/V ACCIDENT INVOLVING PEDESTRIAN|
|B2      |2018|9    |724         |42.30607218|-71.0827326 |AUTO THEFT                       |
|B2      |2018|9    |3301        |42.32701648|-71.10555088|V

### Формируем промежуточные наборы данных для расчета витрины

#### Набор данных df_crimes_stat
* **crimes_total** - Общее количество преступлений в этом районе

In [22]:
df_crimes_stat = df_crime.groupBy(col("DISTRICT")) \
    .agg(count("*").alias("crimes_total")) \
    .orderBy(col("DISTRICT"))

df_crimes_stat.show(truncate=False)

+--------+------------+
|DISTRICT|crimes_total|
+--------+------------+
|A1      |35717       |
|A15     |6505        |
|A7      |13544       |
|B2      |49945       |
|B3      |35442       |
|C11     |42530       |
|C6      |23460       |
|D14     |20127       |
|D4      |41915       |
|E13     |17536       |
|E18     |17348       |
|E5      |13239       |
|N/A     |1765        |
+--------+------------+



#### Набор данных df_crimes_lat_lng
Сразу фильтруем записи с нулевыми и некорректными значениями координат
* **lat** - Широта координаты района, рассчитанная как среднее по всем широтам инцидентов
* **lng** - Долгота координаты района, рассчитанная как среднее по всем долготам инцидентов

In [23]:
df_crimes_lat_lng = df_crime.where("(Lat<>-1 AND Long<>-1) AND Lat IS NOT NULL AND Long IS NOT NULL") \
    .groupBy(col("DISTRICT")) \
    .agg(avg(col("Lat")).alias("lat"), avg(col("Long")).alias("lng")) \
    .withColumnRenamed("DISTRICT","DISTRICT_ID") \
    .orderBy(col("DISTRICT"))

df_crimes_lat_lng.show(truncate=False)

+-----------+------------------+------------------+
|DISTRICT_ID|lat               |lng               |
+-----------+------------------+------------------+
|A1         |42.356733944170074|-71.06112996712555|
|A15        |42.375999332158834|-71.06267564355906|
|A7         |42.377355459516075|-71.0308336320078 |
|B2         |42.32162901059031 |-71.08479984973175|
|B3         |42.28691276522989 |-71.08518799423271|
|C11        |42.300052708547035|-71.0632585581977 |
|C6         |42.33362804576509 |-71.05203273306215|
|D14        |42.350309783520814|-71.14226135353768|
|D4         |42.34350022705698 |-71.08090067663262|
|E13        |42.31502201870257 |-71.10645084032775|
|E18        |42.262680611226024|-71.11891998757694|
|E5         |42.2824730656643  |-71.14134995577236|
|N/A        |42.32297345803791 |-71.08543463457076|
+-----------+------------------+------------------+



#### Набор данных df_crimes_monthly
* **crimes_monthly** - медиана числа преступлений в месяц в этом районе

In [24]:
df_crimes_monthly = df_crime.groupBy("DISTRICT", "YEAR", "MONTH") \
    .agg(count("*").alias("month_crimes")) \
    .groupBy("DISTRICT") \
    .agg( expr('percentile_approx(month_crimes, 0.5)').alias("crimes_monthly")) \
    .withColumnRenamed("DISTRICT","DISTRICT_ID") \
    .orderBy("DISTRICT_ID")

df_crimes_monthly.show(truncate=False)

+-----------+--------------+
|DISTRICT_ID|crimes_monthly|
+-----------+--------------+
|A1         |904           |
|A15        |160           |
|A7         |344           |
|B2         |1298          |
|B3         |907           |
|C11        |1115          |
|C6         |593           |
|D14        |505           |
|D4         |1084          |
|E13        |445           |
|E18        |435           |
|E5         |337           |
|N/A        |41            |
+-----------+--------------+



##### Проверка расчета с вариантом через SQL к Temporary View

In [25]:
df_crimes_monthly_sql = df_crime.groupBy("DISTRICT", "YEAR", "MONTH") \
    .agg(count("*").alias("month_crimes")) \
    .orderBy("DISTRICT", "YEAR", "MONTH") \
    .createOrReplaceTempView("crimes_monthly")

spark.sql("select DISTRICT AS DISTRICT_ID, percentile_approx(month_crimes, 0.5) as crimes_monthly \
          FROM crimes_monthly GROUP BY DISTRICT ORDER BY DISTRICT_ID") \
    .show(truncate=False)

+-----------+--------------+
|DISTRICT_ID|crimes_monthly|
+-----------+--------------+
|A1         |904           |
|A15        |160           |
|A7         |344           |
|B2         |1298          |
|B3         |907           |
|C11        |1115          |
|C6         |593           |
|D14        |505           |
|D4         |1084          |
|E13        |445           |
|E18        |435           |
|E5         |337           |
|N/A        |41            |
+-----------+--------------+



> ## Расчет данных через SQL Expression происходит быстрее (3 s) чем через запрос к Temporary View (1+5 s) ! 

#### Набор данных df_frequent_crime_types
* **frequent_crime_types** - три самых частых crime_type за всю историю наблюдений в этом районе, объединенных через запятую с одним пробелом “, ” , расположенных в порядке убывания частоты

In [26]:
df_frequent_crime_types = df_crime.groupBy("DISTRICT", "CRIME_TYPE")  \
    .agg(count("*").alias("crimes_by_crime_type")) \
    .withColumn("row_num", row_number().over( Window.partitionBy("DISTRICT").orderBy(col("crimes_by_crime_type").desc()) )) \
    .filter(col("row_num") <= 3).drop("row_num", "crimes_by_crime_type") \
    .groupBy("DISTRICT") \
    .agg(concat_ws(", ",collect_list(col("CRIME_TYPE"))).alias("frequent_crime_types")) \
    .withColumnRenamed("DISTRICT","DISTRICT_ID") \
    .orderBy("DISTRICT_ID")

df_frequent_crime_types.show(20, truncate=False)

+-----------+----------------------------------------------------------------------+
|DISTRICT_ID|frequent_crime_types                                                  |
+-----------+----------------------------------------------------------------------+
|A1         |PROPERTY, ASSAULT SIMPLE, DRUGS                                       |
|A15        |M/V ACCIDENT, INVESTIGATE PERSON, M/V                                 |
|A7         |SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON                       |
|B2         |M/V, M/V ACCIDENT, VERBAL DISPUTE                                     |
|B3         |VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON                    |
|C11        |M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON                         |
|C6         |DRUGS, SICK/INJURED/MEDICAL, INVESTIGATE PERSON                       |
|D14        |TOWED MOTOR VEHICLE, M/V, SICK/INJURED/MEDICAL                        |
|D4         |LARCENY SHOPLIFTING $200 & OVER, PROPERTY, LARCENY T

#### Выборочно проверяем корректность вычислений по frequent_crime_types

In [27]:
df_crime.groupBy("DISTRICT", "CRIME_TYPE")  \
    .agg(count("*").alias("crimes_by_crime_type")) \
    .filter(col("DISTRICT") == "B3") \
    .orderBy(col("DISTRICT"), col("crimes_by_crime_type").desc()) \
    .show(10, truncate=False)

+--------+--------------------+--------------------+
|DISTRICT|CRIME_TYPE          |crimes_by_crime_type|
+--------+--------------------+--------------------+
|B3      |VERBAL DISPUTE      |2957                |
|B3      |INVESTIGATE PERSON  |2460                |
|B3      |MISSING PERSON      |2153                |
|B3      |SICK/INJURED/MEDICAL|2003                |
|B3      |VANDALISM           |1915                |
|B3      |M/V                 |1909                |
|B3      |M/V ACCIDENT        |1797                |
|B3      |ASSAULT SIMPLE      |1722                |
|B3      |DRUGS               |1505                |
|B3      |INVESTIGATE PROPERTY|1431                |
+--------+--------------------+--------------------+
only showing top 10 rows



### Готовим итоговый набор данных df_result
* **DISTRICT**
* **crimes_total**
* **crimes_monthly**
* **frequent_crime_types**
* **lat**
* **lng**

In [28]:
df_result = df_crimes_stat.join(df_crimes_monthly, df_crimes_stat['DISTRICT'] == df_crimes_monthly['DISTRICT_ID']) \
    .drop("DISTRICT_ID") \
    .join(df_frequent_crime_types, df_crimes_stat['DISTRICT'] == df_frequent_crime_types['DISTRICT_ID']) \
    .drop("DISTRICT_ID") \
    .join(df_crimes_lat_lng, df_crimes_stat['DISTRICT'] == df_crimes_lat_lng['DISTRICT_ID']) \
    .drop("DISTRICT_ID") \
    .select("DISTRICT", "crimes_total", "crimes_monthly", "frequent_crime_types", "lat", "lng") \
    .orderBy(col("DISTRICT"))
df_result.show(truncate=True)

+--------+------------+--------------+--------------------+------------------+------------------+
|DISTRICT|crimes_total|crimes_monthly|frequent_crime_types|               lat|               lng|
+--------+------------+--------------+--------------------+------------------+------------------+
|      A1|       35717|           904|PROPERTY, ASSAULT...|42.356733944170074|-71.06112996712555|
|     A15|        6505|           160|M/V ACCIDENT, INV...|42.375999332158834|-71.06267564355906|
|      A7|       13544|           344|SICK/INJURED/MEDI...|42.377355459516075| -71.0308336320078|
|      B2|       49945|          1298|M/V, M/V ACCIDENT...| 42.32162901059031|-71.08479984973175|
|      B3|       35442|           907|VERBAL DISPUTE, I...| 42.28691276522989|-71.08518799423271|
|     C11|       42530|          1115|M/V, SICK/INJURED...|42.300052708547035| -71.0632585581977|
|      C6|       23460|           593|DRUGS, SICK/INJUR...| 42.33362804576509|-71.05203273306215|
|     D14|       201

In [29]:
df_result.select("DISTRICT", "crimes_total", "crimes_monthly", "frequent_crime_types").show(truncate=False)

+--------+------------+--------------+----------------------------------------------------------------------+
|DISTRICT|crimes_total|crimes_monthly|frequent_crime_types                                                  |
+--------+------------+--------------+----------------------------------------------------------------------+
|A1      |35717       |904           |PROPERTY, ASSAULT SIMPLE, DRUGS                                       |
|A15     |6505        |160           |M/V ACCIDENT, INVESTIGATE PERSON, M/V                                 |
|A7      |13544       |344           |SICK/INJURED/MEDICAL, DRUGS, INVESTIGATE PERSON                       |
|B2      |49945       |1298          |M/V, M/V ACCIDENT, VERBAL DISPUTE                                     |
|B3      |35442       |907           |VERBAL DISPUTE, INVESTIGATE PERSON, MISSING PERSON                    |
|C11     |42530       |1115          |M/V, SICK/INJURED/MEDICAL, INVESTIGATE PERSON                         |
|C6      |

### Сохраняем итоговый набор данных df_result в один файл в формате parquet

In [30]:
df_result.coalesce(1).write.mode('overwrite').parquet(parquet_folder) 

In [31]:
spark.stop()