# Обработка данных с помощью Spark

## Импорты

In [1]:
import os
from functools import reduce
from operator import add

from pyspark.sql import SparkSession

import pyspark.sql.functions as F

In [2]:
os.environ["PYSPARK_PYTHON"] = "python3"

## Вспомогательные функции и константы

In [3]:
csv_path = "elections_results.csv" 

## SparkSQL

Точка входа

In [4]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

Читаем файл

In [5]:
df = spark.read.csv(csv_path, inferSchema=True, header=True).toDF(
    "Region", "TIK", "UIK", "Total voters", "Total papers", "Early papers",
    "Inside papers", "Outside papers", "Pledged papers", 
    "Papers in portable boxes", "Papers in stacionary boxes",
    "Invalid papers", "Valid papers", "Lost papers", "Not counted papers",
    "Baburin", "Grudinin", "Zhirinovsky", "Putin",
    "Sobchak", "Suraikin", "Titov", "Yavlisky")
df.show(5)

+--------------------+----------+------+------------+------------+------------+-------------+--------------+--------------+------------------------+--------------------------+--------------+------------+-----------+------------------+-------+--------+-----------+-----+-------+--------+-----+--------+
|              Region|       TIK|   UIK|Total voters|Total papers|Early papers|Inside papers|Outside papers|Pledged papers|Papers in portable boxes|Papers in stacionary boxes|Invalid papers|Valid papers|Lost papers|Not counted papers|Baburin|Grudinin|Zhirinovsky|Putin|Sobchak|Suraikin|Titov|Yavlisky|
+--------------------+----------+------+------------+------------+------------+-------------+--------------+--------------+------------------------+--------------------------+--------------+------------+-----------+------------------+-------+--------+-----------+-----+-------+--------+-----+--------+
|Республика Адыгея...|Адыгейская|УИК №1|        2256|        2181|           0|         2107| 

### 1. Найти явку (%) по всем регионам, результат отсортировать по убыванию
Отношение выданных бюллетеней к числу избирателей в списке

In [6]:
df_attendance = df.groupBy("Region") \
  .agg(F.sum("Early papers").alias("Sum early papers"), \
       F.sum("Inside papers").alias("Sum inside papers"), \
       F.sum("Outside papers").alias("Sum outside papers"), \
       F.sum("Total voters").alias("Sum voters")) \

df_attendance.show(10)

+--------------------+----------------+-----------------+------------------+----------+
|              Region|Sum early papers|Sum inside papers|Sum outside papers|Sum voters|
+--------------------+----------------+-----------------+------------------+----------+
|Республика Саха (...|           17281|           418588|             21196|    643786|
|Калининградская о...|               0|           472062|             23183|    796445|
|Новосибирская обл...|               0|          1240320|             64418|   2159912|
|   город Севастополь|             633|           233869|              7674|    339010|
|Свердловская область|            3210|          2003456|             79001|   3345221|
|  Республика Хакасия|             356|           230967|             14043|    372528|
| Магаданская область|            2417|            68365|              3018|    102626|
|Республика Ингушетия|             544|           177335|              1862|    219305|
|Удмуртская Респуб...|          

In [7]:
df_attendance = df_attendance \
    .withColumn("Sum papers", reduce(
        add,
        map(F.col, ["Sum early papers", "Sum inside papers", "Sum outside papers"])
    )) \
    .select("Region", "Sum papers", "Sum voters") \
    
df_attendance.show(10)

+--------------------+----------+----------+
|              Region|Sum papers|Sum voters|
+--------------------+----------+----------+
|Республика Саха (...|    457065|    643786|
|Калининградская о...|    495245|    796445|
|Новосибирская обл...|   1304738|   2159912|
|   город Севастополь|    242176|    339010|
|Свердловская область|   2085667|   3345221|
|  Республика Хакасия|    245366|    372528|
| Магаданская область|     73800|    102626|
|Республика Ингушетия|    179741|    219305|
|Удмуртская Респуб...|    749943|   1185303|
|     Камчатский край|    161930|    239045|
+--------------------+----------+----------+
only showing top 10 rows



In [8]:
df_attendance_perc = df_attendance \
    .withColumn("Attendance, %", F.col("Sum papers") / F.col("Sum voters")) \
    .select("Region", "Attendance, %") \
    .sort("Attendance, %", ascending=False)

df_attendance_perc.show(10)

+--------------------+------------------+
|              Region|     Attendance, %|
+--------------------+------------------+
|Территория за пре...|0.9806986984380843|
|     Республика Тыва|0.9366369316170003|
|Ямало-Ненецкий ав...|0.9190125747324195|
|Кабардино-Балкарс...|0.9180157863562131|
|Чеченская Республика|0.9154001705101918|
|Республика Северн...|0.8998810071492768|
| Республика Дагестан|0.8747795414462081|
|Карачаево-Черкесс...|0.8740544668203033|
| Кемеровская область|0.8322733516003962|
|Чукотский автоном...|0.8228138695924391|
+--------------------+------------------+
only showing top 10 rows



### 2. Выбрать любимого кандидата и найти тот избиратeльный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек)

In [9]:
favorite_candidate = "Grudinin"
favorite_candidate_perc = favorite_candidate + ", %"

df_favorite_candidate = df \
    .filter(F.col(favorite_candidate) >= 300) \
    .withColumn(favorite_candidate_perc,
                F.col(favorite_candidate) / reduce(
                    add,
                    map(F.col, ["Early papers", "Inside papers", "Outside papers"])
                ))\
    .select("Region", "TIK", "UIK", favorite_candidate_perc) \
    .sort(favorite_candidate_perc, ascending=False)
#     .groupBy("Region", "TIK", "UIK") \
#     .agg(F.max(favorite_candidate))

df_favorite_candidate.show(1)

+------------------+---------+---------+------------------+
|            Region|      TIK|      UIK|       Grudinin, %|
+------------------+---------+---------+------------------+
|Московская область|Ленинская|УИК №1306|0.6653061224489796|
+------------------+---------+---------+------------------+
only showing top 1 row



### 3. Найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна

In [10]:
df_TIK_attendance = df.groupBy("Region", "TIK") \
  .agg(F.sum("Early papers").alias("Sum early papers"), \
       F.sum("Inside papers").alias("Sum inside papers"), \
       F.sum("Outside papers").alias("Sum outside papers"), \
       F.sum("Total voters").alias("Sum voters")) \

df_TIK_attendance = df_TIK_attendance \
    .withColumn("Sum papers", reduce(
        add,
        map(F.col, ["Sum early papers", "Sum inside papers", "Sum outside papers"])
    )) \
    .select("Region", "TIK", "Sum papers", "Sum voters") \

df_TIK_attendance = df_TIK_attendance \
    .withColumn("Attendance, %", F.col("Sum papers") / F.col("Sum voters")) \
    .select("Region", "TIK", "Attendance, %") \
    .sort("Attendance, %", ascending=False)

df_TIK_attendance.show(10)

+--------------------+--------------------+------------------+
|              Region|                 TIK|     Attendance, %|
+--------------------+--------------------+------------------+
|Архангельская обл...|         Новая Земля|  0.99795605518651|
|     Республика Тыва|         Тес-Хемская|0.9967462039045553|
| Сахалинская область|  Невельская судовая| 0.995766299745978|
|     Республика Тыва|   Монгун-Тайгинская|  0.99545159194282|
|     Республика Тыва|        Улуг-Хемская|0.9917140379612679|
|Республика Татарс...|         Тюлячинская|0.9912042668662862|
|     Республика Тыва|        Сут-Хольская|0.9911560328490209|
|     Республика Тыва|           Эрзинская| 0.991008991008991|
|     Камчатский край|Петропавловск-Кам...| 0.990578734858681|
|     Республика Тыва|        Чаа-Хольская|0.9894532334165973|
+--------------------+--------------------+------------------+
only showing top 10 rows



In [11]:
df_TIK_attendance_diff = df_TIK_attendance \
    .groupBy("Region") \
    .agg((F.max("Attendance, %") - F.min("Attendance, %")).alias("Difference")) \
    .sort("Difference", ascending=False)

df_TIK_attendance_diff.show(1)

+--------------------+-------------------+
|              Region|         Difference|
+--------------------+-------------------+
|Архангельская обл...|0.49859164840684894|
+--------------------+-------------------+
only showing top 1 row



### 4. Посчитать дисперсию по явке для каждого региона (учитывать УИК)

In [12]:
df_attendance_std = df \
    .withColumn("Attendance", reduce(
        add,
        map(F.col, ["Early papers", "Outside papers", "Inside papers"])
    )) \
    .select("Region", "TIK", "UIK", "Attendance") \
    .groupBy("Region") \
    .agg(F.stddev("Attendance").alias("Attendance, std")) \
    .sort("Attendance, std", ascending=False) 
    
df_attendance_std.show(10)

+--------------------+-----------------+
|              Region|  Attendance, std|
+--------------------+-----------------+
|Территория за пре...|1332.153361037134|
|Чеченская Республика|779.5717067527235|
|Ямало-Ненецкий ав...|739.3290449083053|
|Республика Северн...|677.9089014204244|
| Республика Дагестан|657.2845481650252|
|     Республика Тыва|641.6682099770799|
| Кемеровская область|621.0871610694816|
| Магаданская область|614.8118916285797|
|Кабардино-Балкарс...|614.0460846716691|
|    Брянская область|592.4564291304055|
+--------------------+-----------------+
only showing top 10 rows



### 5. Для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат

In [13]:
candidates = ["Baburin", "Grudinin", "Zhirinovsky", "Putin",
              "Sobchak", "Suraikin", "Titov", "Yavlisky"]

In [14]:
for candidate in candidates:
    candidate_perc = candidate + ", %"
    df_uik_num = df \
        .withColumn(candidate_perc, F.round(100 * F.col(candidate) / reduce(
            add,
            map(F.col, ["Early papers", "Outside papers", "Inside papers"])
        ))).select("Region", "TIK", "UIK", candidate_perc) \
        .groupBy(candidate_perc) \
        .agg(F.count('UIK').alias('UIK_count')) \
        .sort(candidate_perc, ascending=False)

    df_uik_num.show()

+----------+---------+
|Baburin, %|UIK_count|
+----------+---------+
|      44.0|        1|
|      23.0|        1|
|      21.0|        1|
|      20.0|        3|
|      18.0|        2|
|      17.0|        1|
|      15.0|        1|
|      14.0|        3|
|      13.0|        1|
|      12.0|        1|
|      11.0|        5|
|      10.0|        7|
|       9.0|       10|
|       8.0|       22|
|       7.0|       20|
|       6.0|       35|
|       5.0|       89|
|       4.0|      179|
|       3.0|      555|
|       2.0|     3692|
+----------+---------+
only showing top 20 rows

+-----------+---------+
|Grudinin, %|UIK_count|
+-----------+---------+
|       80.0|        1|
|       78.0|        1|
|       75.0|        1|
|       71.0|        1|
|       70.0|        1|
|       69.0|        1|
|       68.0|        1|
|       67.0|        3|
|       61.0|        1|
|       60.0|        1|
|       58.0|        3|
|       57.0|        4|
|       56.0|        4|
|       55.0|        2|
|       54.0| 