In [1]:
import os

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from functools import reduce
from operator import add
from pyspark.sql.functions import col, count, stddev, sum as sql_sum, max as sql_max, min as sql_min, round as sql_round

In [2]:
file_name = "parsed/parsed.csv"

In [3]:
spark = SparkSession \
    .builder \
    .appName("MyPySpark") \
    .getOrCreate()

In [4]:
schema = ['name','district','tik','Число включенных в список',\
          'Число бюллетеней, полученных участковой избирательной комиссией',\
          'Число бюллетеней, выданных избирателям, проголосовавшим досрочно', \
          'Число бюллетеней, выданных в помещении для голосования в день голосования', \
          'Число бюллетеней, выданных вне помещения для голосования в день голосования', \
          'Число погашенных бюллетеней', \
          'Число бюллетеней в переносных ящиках для голосования',\
          'Число бюллетеней в стационарных ящиках для голосования', \
          'Число недействительных бюллетеней', \
          'Число действительных бюллетеней', \
          'Число утраченных бюллетеней', \
          'Число бюллетеней, не учтенных при получении ', \
          'Бабурин С. Н.', 'Грудинин П. Н.', 'Жириновский В. В.','Путин В. В.', \
          'Собчак К. А.', 'Сурайкин М. А.', 'Титов Б. Ю.','Явлинский Г. А.']

In [5]:
candidates = ['Baburin', 'Grudinin', 'Zhirinovsky', 'Putin', 'Sobchak', 'Suraikin', 'Titov', 'Yavlisky']
df = spark.read.csv(file_name, inferSchema=True, header=True).toDF(
    'uik','district','tik', 'total_count', 'recieved_count', 'early_count', 'indoors_count', 'outdoors_count', 'cancelled_count', 
    'portable_count', 'static_count', 'invalid_count', 'valid_count', 'lost_count', 'unaccounted_count', 
    *candidates
)

In [6]:
df_presence = df.groupBy('district') \
    .agg(sql_sum('total_count').alias('sum_total_count'), \
        sql_sum('valid_count').alias('sum_valid_count')) \
    .sort('sum_total_count', ascending=False) \
    .withColumn('percentage', col('sum_valid_count') / col('sum_total_count') * 100) \
    .select('district', 'percentage') \
    .sort('percentage', ascending=False) \
    .toPandas().to_csv("elections_percentage.csv") 

In [7]:
favorite = 'Putin'

In [8]:
df_favorite = df.filter(col(favorite) >= 300) \
    .select('district', 'total_count', 'tik','uik', favorite) \
    .withColumn('percentage', col(favorite) / col('total_count') * 100) \
    .groupBy('district', 'tik','uik', 'percentage') \
    .agg(sql_max(favorite).alias('max')) \
    .sort('percentage', ascending=False)

df_favorite.show()

+--------------------+--------------------+---------+-----------------+----+
|            district|                 tik|      uik|       percentage| max|
+--------------------+--------------------+---------+-----------------+----+
|     Республика Тыва|        Улуг-Хемская| УИК №159|            100.0| 358|
|     Республика Тыва|      Бай-Тайгинская|  УИК №43|            100.0| 760|
| Кемеровская область|Прокопьевск, Цент...| УИК №876|            100.0| 732|
|     Республика Тыва|           Эрзинская| УИК №180|            100.0| 433|
|Республика Татарс...|         Апастовская| УИК №909|99.76415094339622| 423|
|     Республика Тыва|         Тес-Хемская| УИК №145|99.73045822102425| 370|
|     Республика Тыва|      Бай-Тайгинская|  УИК №45|99.72714870395635| 731|
|     Республика Тыва|        Улуг-Хемская| УИК №165| 99.6594778660613| 878|
|     Республика Тыва|        Улуг-Хемская| УИК №163|99.63325183374083| 815|
|Республика Башкор...|    Уфа, Калининская|  УИК №98|99.56331877729258| 456|

In [9]:
df_TIK = df.groupBy('district','tik') \
  .agg(sql_sum('total_count').alias('sum_total_count'), \
      sql_sum('valid_count').alias('sum_valid_count')) \
  .withColumn('percentage', col('sum_valid_count') / col('sum_total_count') * 100) \
  .select('district', 'tik', 'percentage') \
  .groupBy('district') \
  .agg((sql_max('percentage') - sql_min('percentage')).alias('Diff')) \
  .sort('Diff', ascending=False) \
  .show()


+--------------------+------------------+
|            district|              Diff|
+--------------------+------------------+
|Архангельская обл...| 49.58829163166267|
| Сахалинская область|  47.8670553116025|
| Республика Дагестан| 47.37946734555504|
| Республика Калмыкия|43.211829960607844|
|   Самарская область| 41.87282511134117|
|   Красноярский край| 41.33462602081461|
| Саратовская область| 39.82477785287487|
|Республика Татарс...|39.601354722670806|
|  Краснодарский край| 36.48401530541677|
|Республика Адыгея...| 36.26433374911393|
|    Липецкая область| 36.09544601149141|
|     Камчатский край| 35.81056892397375|
| Воронежская область|35.532292606340185|
|Калининградская о...| 35.13296126554531|
|  Ростовская область|35.022072742281765|
|  Московская область| 34.68295949789706|
|  Пензенская область| 34.39901100348121|
|Нижегородская обл...| 34.38278030289075|
|  Республика Бурятия| 34.23664468861077|
|Республика Саха (...|32.752557598988176|
+--------------------+------------

In [10]:
df_presence_var = df \
    .select('district','tik', 'uik','total_count', 'valid_count') \
    .withColumn('percentage', col('valid_count') / col('total_count')) \
    .groupBy('district') \
    .agg(stddev('percentage').alias('std')) \
    .sort('Std', ascending=False) 

df_presence_var.toPandas().to_csv("std.csv")

df_presence_var.show()

+--------------------+-------------------+
|            district|                std|
+--------------------+-------------------+
| Сахалинская область| 0.2019312853758834|
|     Камчатский край|0.16896820928370695|
|     Приморский край| 0.1671701365752832|
|  Мурманская область|0.16697864277648009|
|Республика Адыгея...| 0.1637267331819067|
| Республика Калмыкия| 0.1603307335199929|
|   Самарская область|  0.155767880799464|
| Саратовская область| 0.1550544694418556|
| Воронежская область|0.15043667103903124|
|Республика Татарс...|0.14434297889714276|
| Магаданская область|0.14323045765055606|
|    Липецкая область| 0.1418398955040446|
|  Краснодарский край|0.14036974881853576|
|Архангельская обл...|0.13919464109359897|
|Белгородская область|0.13818853563236602|
|    Хабаровский край| 0.1363192333409008|
| Ставропольский край|0.13447367436294155|
|  Республика Бурятия|0.13339540717742093|
|  Ростовская область|0.13142685155590125|
| Республика Дагестан|0.13042374447500338|
+----------

In [11]:
for candidate in candidates:  
    print(candidate)
    df.select('district','tik', 'uik', candidate, 'valid_count') \
        .withColumn('percentage', sql_round(col(candidate) / col('valid_count') * 100)) \
        .groupBy('percentage') \
        .agg(count('uik').alias('uik_count')) \
        .sort(col('percentage'), ascending=False) \
        .show()

Baburin
+----------+---------+
|percentage|uik_count|
+----------+---------+
|      44.0|        1|
|      23.0|        1|
|      21.0|        2|
|      20.0|        2|
|      18.0|        2|
|      17.0|        1|
|      15.0|        2|
|      14.0|        2|
|      13.0|        1|
|      12.0|        1|
|      11.0|        6|
|      10.0|        9|
|       9.0|       10|
|       8.0|       21|
|       7.0|       21|
|       6.0|       37|
|       5.0|       86|
|       4.0|      183|
|       3.0|      577|
|       2.0|     3889|
+----------+---------+
only showing top 20 rows

Grudinin
+----------+---------+
|percentage|uik_count|
+----------+---------+
|      80.0|        1|
|      78.0|        1|
|      75.0|        1|
|      71.0|        1|
|      70.0|        1|
|      69.0|        2|
|      67.0|        3|
|      61.0|        1|
|      60.0|        1|
|      58.0|        3|
|      57.0|        6|
|      56.0|        4|
|      55.0|        3|
|      54.0|        5|
|      53.0|  