In [1]:
import pyspark
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Возможно нужно будет явно указать Spark'у откуда брать интерпретатор Python для worker'ов
os.environ["PYSPARK_PYTHON"]="python3"

In [2]:
spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()

In [14]:
# "Число избирателей, включенных в список избирателей " 
# "Число избирательных бюллетеней, полученных участковой избирательной комиссией" 
# "Число избирательных бюллетеней, выданных избирателям, проголосовавшим досрочно" 
# "Число избирательных бюллетеней, выданных в помещении для голосования в день голосования"
# "Число избирательных бюллетеней, выданных вне помещения для голосования в день голосования"
# Число погашенных избирательных бюллетеней 
# Число избирательных бюллетеней в переносных ящиках для голосования
# Число бюллетеней в стационарных ящиках для голосования
# Число недействительных избирательных бюллетеней
# Число действительных избирательных бюллетеней
# Число утраченных избирательных бюллетеней
# "Число избирательных бюллетеней, не учтенных при получении "
# Бабурин Сергей Николаевич
# Грудинин Павел Николаевич
# Жириновский Владимир Вольфович
# Путин Владимир Владимирович
# Собчак Ксения Анатольевна
# Сурайкин Максим Александрович
# Титов Борис Юрьевич
# Явлинский Григорий Алексеевич

ELECTIONS_FILE = 'UIK_table.csv'

schema= StructType([
            StructField("region",              StringType(), False), #1
            StructField("tik",                 StringType(), False), #2
            StructField("uik",                 StringType(), False), #3
            StructField("voters_count",        IntegerType(), False),#4
            StructField("blanks_received",     IntegerType(), False),#5
            StructField("pre_voted_blanks",    IntegerType(), False),#6
            StructField("elec_day_blanks",     IntegerType(), False),#7
            StructField("not_elec_day_blanks", IntegerType(), False),#8
            StructField("cancel_blanks",       IntegerType(), False),#9
            StructField("tranf_box_blanks",    IntegerType(), False),#10
            StructField("stat_box_blanks",     IntegerType(), False),#11
            StructField("invalid_blanks",      IntegerType(), False),#12
            StructField("valid_blanks",        IntegerType(), False),#13
            StructField("lost_blanks",         IntegerType(), False),#14
            StructField("uncount_blanks",      IntegerType(), False),#15
            StructField("babur",               IntegerType(), False),#16
            StructField("grud",                IntegerType(), False),#17
            StructField("jirin",               IntegerType(), False),#18
            StructField("putin",               IntegerType(), False),#19
            StructField("soob",                IntegerType(), False),#20
            StructField("sur",                 IntegerType(), False),#21
            StructField("titov",               IntegerType(), False),#22
            StructField("yavl",                IntegerType(), False)]#23
        )

df = spark.read.csv(ELECTIONS_FILE, schema=schema)

#### найти явку (%) по всем регионам, результат отсортировать по убыванию

In [19]:
df.createOrReplaceTempView('election')

region_appearance = spark.sql(
    '''SELECT region, (SUM(tranf_box_blanks) + SUM(stat_box_blanks)) / SUM(voters_count) as appearance 
    FROM election
    GROUP BY region
    ORDER BY appearance DESC''')

region_appearance.show()

+--------------------+------------------+
|              region|        appearance|
+--------------------+------------------+
|     Республика Тыва|0.9362542974951742|
|Ямало-Ненецкий ав...|0.9187213306617982|
|Кабардино-Балкарс...| 0.916978754085207|
|Чеченская Республика|0.9150929703298174|
|Республика Северн...|0.8993704831730909|
| Республика Дагестан|0.8744351419820495|
|Карачаево-Черкесс...|0.8734380939009899|
| Кемеровская область|0.8307556958224817|
|Чукотский автоном...|0.8226051697921947|
|Республика Ингушетия|0.8195937165135314|
|    Брянская область| 0.796597680757152|
|   Тюменская область|0.7888296737637491|
| Республика Мордовия|0.7783648793754097|
|  Краснодарский край|0.7783466166202626|
|Республика Татарс...|0.7736526548202729|
|Чувашская Республ...|0.7621061419670118|
|Республика Башкор...|0.7542120065745193|
|  Республика Бурятия|0.7519133382510174|
| Ставропольский край|0.7377007114684981|
|  Пензенская область|0.7370819151521087|
+--------------------+------------

#### выбрать любимого кандидата и найти тот избиратльный участок, на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек)

In [93]:
fav_cand = 'babur'
fav_cand_results = spark.sql(f'''
SELECT region, tik, uik, {fav_cand}/voters_count as percent 
FROM election 
ORDER BY percent DESC
''')
fav_cand_results.show(1)

+---------------+--------------------+---------+------------------+
|         region|                 tik|      uik|           percent|
+---------------+--------------------+---------+------------------+
|Приморский край|Владивосток, Фрун...|УИК №5863|0.4444444444444444|
+---------------+--------------------+---------+------------------+
only showing top 1 row



#### найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна

In [85]:
tik_appearance = spark.sql(f'''
SELECT region, MAX(appearance) - MIN(appearance) as diff
FROM
    (SELECT region, tik, (SUM(tranf_box_blanks) + SUM(stat_box_blanks)) / SUM(voters_count) as appearance
    FROM election
    GROUP BY region, tik)
GROUP BY region
ORDER BY diff DESC
''')
tik_appearance.show(1)

+--------------------+-------------------+
|              region|               diff|
+--------------------+-------------------+
|Архангельская обл...|0.49859164840684894|
+--------------------+-------------------+
only showing top 1 row



#### посчитать дисперсию по явке для каждого региона (учитывать УИК)

In [86]:
# Отсортировал для наглядности. Чтобы выполнить запрос, требуемый в задании, последнюю строку нужно убрать
appearance_dispersion = spark.sql(f'''
SELECT region, STDDEV((tranf_box_blanks + stat_box_blanks) / voters_count) as sd
FROM election 
GROUP BY region
ORDER BY sd DESC
''')
appearance_dispersion.show()

+--------------------+-------------------+
|              region|                 sd|
+--------------------+-------------------+
| Сахалинская область|0.20188798623588156|
|     Камчатский край|0.16873475777512123|
|  Мурманская область|0.16742131453895345|
|     Приморский край|0.16639015831512335|
|Республика Адыгея...|0.16278424507727504|
|   Самарская область| 0.1562883627711599|
| Саратовская область|0.15426531267153373|
| Республика Калмыкия|0.15350844523090657|
| Воронежская область|0.14975754071870498|
| Магаданская область|0.14387651017723133|
|Республика Татарс...|0.14172353736111934|
|    Липецкая область|0.14096365252591136|
|Архангельская обл...|0.14094552956128326|
|  Краснодарский край|0.13946578225528095|
|    Хабаровский край|0.13811595897262202|
|Белгородская область| 0.1375828241478792|
| Ставропольский край|0.13411190491014102|
|  Республика Бурятия|0.13308253052737126|
|  Ростовская область|0.13138651268427387|
|Чувашская Республ...|0.13041627601227343|
+----------

#### для каждого кандидата посчитать таблицу: результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат

In [105]:
cand_tables = []

candidates = ["babur", "grud", "jirin", "putin", "soob", "sur", "titov", "yavl"]

for cand in candidates:
    cand_table = spark.sql(f'''
    SELECT result, COUNT(result) as count_res FROM
        (SELECT uik, int(100 * {cand} / voters_count) as result
        FROM election)
    GROUP BY result
    ORDER BY count_res DESC
    ''')
    cand_tables.append((cand, cand_table))

In [108]:
print(cand_tables[0][0])
cand_tables[0][1].show()

babur
+------+---------+
|result|count_res|
+------+---------+
|     0|    92019|
|     1|     4431|
|     2|      513|
|     3|      152|
|     4|       70|
|     5|       30|
|     6|       21|
|     7|       16|
|    10|        8|
|     8|        8|
|     9|        5|
|    14|        3|
|    16|        2|
|    11|        2|
|    20|        2|
|    12|        1|
|    17|        1|
|    44|        1|
|    18|        1|
|    21|        1|
+------+---------+
only showing top 20 rows



In [109]:
print(cand_tables[1][0])
cand_tables[1][1].show()

grud
+------+---------+
|result|count_res|
+------+---------+
|     7|    12752|
|     6|    12082|
|     8|    10966|
|     5|     9500|
|     9|     7860|
|     4|     6942|
|    10|     5740|
|     3|     5233|
|     2|     4331|
|    11|     3943|
|     1|     3700|
|    12|     3022|
|     0|     2449|
|    13|     2266|
|    14|     1534|
|    15|     1185|
|    16|      906|
|    17|      632|
|    18|      481|
|    19|      322|
+------+---------+
only showing top 20 rows



In [110]:
print(cand_tables[2][0])
cand_tables[2][1].show()

jirin
+------+---------+
|result|count_res|
+------+---------+
|     3|    21136|
|     4|    18152|
|     2|    15316|
|     5|    11525|
|     0|     7203|
|     1|     6968|
|     6|     6669|
|     7|     3782|
|     8|     2282|
|     9|     1390|
|    10|      868|
|    11|      508|
|    12|      328|
|    13|      247|
|    14|      167|
|    15|      129|
|    16|      110|
|    17|       82|
|    18|       62|
|    20|       50|
+------+---------+
only showing top 20 rows



In [111]:
print(cand_tables[3][0])
cand_tables[3][1].show()

putin
+------+---------+
|result|count_res|
+------+---------+
|    45|     3768|
|    44|     3758|
|    43|     3708|
|    42|     3627|
|    46|     3547|
|    47|     3318|
|    41|     3192|
|    48|     3173|
|    40|     2859|
|    50|     2735|
|    49|     2731|
|    39|     2492|
|    51|     2386|
|    52|     2228|
|    53|     2028|
|    38|     1986|
|    54|     1837|
|    55|     1687|
|    37|     1663|
|    56|     1656|
+------+---------+
only showing top 20 rows



In [112]:
print(cand_tables[4][0])
cand_tables[4][1].show()

soob
+------+---------+
|result|count_res|
+------+---------+
|     0|    64111|
|     1|    23575|
|     2|     6500|
|     3|     1866|
|     4|      653|
|     5|      248|
|     6|       80|
|     7|       64|
|     8|       49|
|    10|       34|
|     9|       28|
|    11|       14|
|    12|       14|
|    14|       11|
|    15|       10|
|    13|        8|
|    16|        4|
|    17|        4|
|    18|        4|
|    25|        3|
+------+---------+
only showing top 20 rows



In [113]:
print(cand_tables[5][0])
cand_tables[5][1].show()

sur
+------+---------+
|result|count_res|
+------+---------+
|     0|    90787|
|     1|     5114|
|     2|      794|
|     3|      224|
|     4|      141|
|     5|       63|
|     6|       51|
|     7|       47|
|     8|       44|
|     9|       10|
|    10|        8|
|    16|        2|
|    13|        1|
|    12|        1|
|    11|        1|
|    14|        1|
+------+---------+



In [114]:
print(cand_tables[6][0])
cand_tables[6][1].show()

titov
+------+---------+
|result|count_res|
+------+---------+
|     0|    89910|
|     1|     6642|
|     2|      449|
|     3|      125|
|     4|       65|
|     5|       32|
|     6|       17|
|     7|       13|
|     8|       10|
|    10|        4|
|     9|        4|
|    14|        3|
|    12|        3|
|    20|        2|
|    11|        2|
|    15|        2|
|    17|        1|
|    13|        1|
|    23|        1|
|    29|        1|
+------+---------+
only showing top 20 rows



In [115]:
print(cand_tables[7][0])
cand_tables[7][1].show()

yavl
+------+---------+
|result|count_res|
+------+---------+
|     0|    82348|
|     1|    11158|
|     2|     2823|
|     3|      676|
|     4|      147|
|     5|       52|
|     6|       30|
|     7|       18|
|     8|       13|
|    10|       12|
|    14|        3|
|    18|        2|
|     9|        2|
|    13|        1|
|    21|        1|
|    16|        1|
|    19|        1|
|    11|        1|
+------+---------+



In [118]:
election_res = spark.sql(f'''
SELECT SUM(putin)/SUM(tranf_box_blanks + stat_box_blanks) as res FROM election
''')
election_res.show()

+-----------------+
|              res|
+-----------------+
|0.766374250924615|
+-----------------+

