In [None]:
"""
Часть 1

Нужно написать скрипт, который скачивает все данные прошедших президентских выборов для всех избирательных участков.

Входная точка по ссылке. 
http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100084849066&vrn=100100084849062&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100084849066&type=227
Затем нужно перейти на сайты региональных избирательных комиссий. 
Результаты нужно сохранить в cvs-файл, sqlite базе данных или parquet-файле. 
В итоге должна получиться таблица с полями: 
- название региона 
- название ТИК 
- номер УИК 
- 20 стандартных полей из итогового протокола


Часть 2

Нужно, используя Spark: 
- найти явку (%) по всем регионам, результат отсортировать по убыванию 
- выбрать любимого кандидата и найти тот избиратльный участок, 
на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек) 
- найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна 
- посчитать дисперсию по явке для каждого региона (учитывать УИК) 
- для каждого кандидата посчитать таблицу: результат (%, округленный до целого) 
- количество УИК, на которых кандидат получил данный результат

Результаты принимаются в виде Jupyter Notebook, Spark Notebook или исходных файлов на Scala.

"""

In [30]:
import sqlite3
import lxml.html
import lxml.etree
import requests
import re
import csv
import pyspark
from pyspark.sql.types import StructField

In [23]:
def create_table(session=None):
    if not session:
        conn = sqlite3.connect("elections.db")
        cursor = conn.cursor()
        cursor.execute("create table if not exists elections (region text, tik text, uik text)")
        for i in range(1, 21):
            cursor.execute("alter table elections add v_%i integer" % i)
        for i in range(13, 21):
            cursor.execute("alter table elections add percent_%i real" %i)
        conn.commit()
        conn.close()
    else:
        session.sql("create database if not exists eldb")
        session.sql("create table if not exists eldb.elections (region String, tik String, uik String)")
        for i in range(1, 21):
            session.sql("alter table eldb.elections add v_%i integer" % i)
        for i in range(13, 21):
            session.sql("alter table eldb.elections add percent_%i real" %i)

In [9]:
def get_titles(url):
    response = requests.get(url)
    tree = lxml.html.fromstring(response.text)
    data = tree.xpath('//td[contains(@width, "90%")]//tr[contains(@valign, "top")]//a')
    return [{"Name": x.text, "URL": x.attrib["href"]} for x in data]

In [10]:
def add_data(url, region, tik):
    tree = lxml.html.fromstring(requests.get(url).text)
    url = tree.xpath('//a[contains(text(), "сайт избирательной комиссии субъекта Российской Федерации")]')[0].attrib["href"]
    tree = lxml.html.fromstring(requests.get(url).text)
    data = tree.xpath('//table[contains(@align, "left")]//tr[count(td)=3]')
    uiks = [x.text for x in tree.xpath('//td[contains(@width, "90%")]/div/table//tr[contains(@valign, "top")]/td/nobr')]
    data = tree.xpath(f"//td[contains(@width, '90%')]/div/table//tr[count(td)={len(uiks)} and not(contains(@valign, 'top')) and td[count(nobr)>0]]")
    rows = {}
    conn = sqlite3.connect("elections.db")
    cursor = conn.cursor()
    for uik_index, uik in enumerate(uiks):
        request = f'insert into elections values ("{region}", "{tik}", "{uik}"'
        current = {"val": [], "percent": []}
        for row in range(1, 21):
            cell = data[row-1].getchildren()[uik_index]
            percent = re.findall(r"\w+[.]\w+[%]", cell.text_content())
            if percent:
                current['percent'].append((percent[0][:-1]))
            current['val'].append((cell.xpath("./nobr/b")[0].text))
        for val in current['val']:
            request += f", {val}"
        for percent in current['percent']:
            request += f", {percent}"
        request += ")"
        cursor.execute(request)
        conn.commit()

In [11]:
def add_data_spark(url, region, tik, session):
    tree = lxml.html.fromstring(requests.get(url).text)
    url = tree.xpath('//a[contains(text(), "сайт избирательной комиссии субъекта Российской Федерации")]')[0].attrib["href"]
    tree = lxml.html.fromstring(requests.get(url).text)
    data = tree.xpath('//table[contains(@align, "left")]//tr[count(td)=3]')
    uiks = [x.text for x in tree.xpath('//td[contains(@width, "90%")]/div/table//tr[contains(@valign, "top")]/td/nobr')]
    data = tree.xpath(f"//td[contains(@width, '90%')]/div/table//tr[count(td)={len(uiks)} and not(contains(@valign, 'top')) and td[count(nobr)>0]]")
    rows = {}
    for uik_index, uik in enumerate(uiks):
        request = f'insert into elections values ("{region}", "{tik}", "{uik}"'
        current = {"val": [], "percent": []}
        for row in range(1, 21):
            cell = data[row-1].getchildren()[uik_index]
            percent = re.findall(r"\w+[.]\w+[%]", cell.text_content())
            if percent:
                current['percent'].append((percent[0][:-1]))
            current['val'].append((cell.xpath("./nobr/b")[0].text))
        for val in current['val']:
            request += f", {val}"
        for percent in current['percent']:
            request += f", {percent}"
        request += ")"
        session.sql(request)

In [16]:
def fill_db(session=None):
    create_table(session)
    url = 'http://www.vybory.izbirkom.ru/region/region/izbirkom?action=show&root=1&tvd=100100084849066&vrn=100100084849062&region=0&global=1&sub_region=0&prver=0&pronetvd=null&vibid=100100084849066&type=227'
    regions = get_titles(url)
    for region in regions:
        print(region["Name"])
        tiks = get_titles(region["URL"])
        for tik in tiks:
            if session:
                add_data_spark(tik["URL"], region["Name"], tik["Name"], session)
            else:
                add_data(tik["URL"], region["Name"], tik["Name"])

In [6]:
fill_db()

Республика Адыгея (Адыгея)
Республика Алтай
Республика Башкортостан
Республика Бурятия
Республика Дагестан
Республика Ингушетия
Кабардино-Балкарская Республика
Республика Калмыкия
Карачаево-Черкесская Республика
Республика Карелия
Республика Коми
Республика Крым
Республика Марий Эл
Республика Мордовия
Республика Саха (Якутия)
Республика Северная Осетия - Алания
Республика Татарстан (Татарстан)
Республика Тыва
Удмуртская Республика
Республика Хакасия
Чеченская Республика
Чувашская Республика - Чувашия
Алтайский край
Забайкальский край
Камчатский край
Краснодарский край
Красноярский край
Пермский край
Приморский край
Ставропольский край
Хабаровский край
Амурская область
Архангельская область
Астраханская область
Белгородская область
Брянская область
Владимирская область
Волгоградская область
Вологодская область
Воронежская область
Ивановская область
Иркутская область
Калининградская область
Калужская область
Кемеровская область
Кировская область
Костромская область
Курганская область
Кур

In [32]:
def transfer_data_from_sql_to_csv(sql_db_filename, csv_filename)
    conn = sqlite3.connect(sql_db_filename)
    cursor = conn.cursor()
    cursor.execute("select * from elections")
    result = cursor.fetchall()
    with open(csv_filename, "w") as csv_file:
        writer = csv.writer(csv_file)
        for line in result:
            writer.writerow(line)
    conn.close()

In [40]:
spark = pyspark.sql.SparkSession.builder.master("local[6]").appName("HW3").getOrCreate()
schema= pyspark.sql.types.StructType([
            StructField("region", pyspark.sql.types.StringType(), False),             
            StructField("tik", pyspark.sql.types.StringType(), False),             
            StructField("uik", pyspark.sql.types.StringType(), False),             
            StructField("v_1", pyspark.sql.types.IntegerType(), False),            
            StructField("v_2", pyspark.sql.types.IntegerType(), False),            
            StructField("v_3", pyspark.sql.types.IntegerType(), False),            
            StructField("v_4", pyspark.sql.types.IntegerType(), False),            
            StructField("v_5", pyspark.sql.types.IntegerType(), False),            
            StructField("v_6", pyspark.sql.types.IntegerType(), False),            
            StructField("v_7", pyspark.sql.types.IntegerType(), False),
            StructField("v_8", pyspark.sql.types.IntegerType(), False),
            StructField("v_9", pyspark.sql.types.IntegerType(), False),
            StructField("v_10", pyspark.sql.types.IntegerType(), False),
            StructField("v_11", pyspark.sql.types.IntegerType(), False),
            StructField("v_12", pyspark.sql.types.IntegerType(), False),
            StructField("v_13", pyspark.sql.types.IntegerType(), False),
            StructField("v_14", pyspark.sql.types.IntegerType(), False),
            StructField("v_15", pyspark.sql.types.IntegerType(), False),
            StructField("v_16", pyspark.sql.types.IntegerType(), False),
            StructField("v_17", pyspark.sql.types.IntegerType(), False),
            StructField("v_18", pyspark.sql.types.IntegerType(), False),
            StructField("v_19", pyspark.sql.types.IntegerType(), False),
            StructField("v_20", pyspark.sql.types.IntegerType(), False),
            StructField("percent_1", pyspark.sql.types.FloatType(), False),
            StructField("percent_2", pyspark.sql.types.FloatType(), False),
            StructField("percent_3", pyspark.sql.types.FloatType(), False),
            StructField("percent_4", pyspark.sql.types.FloatType(), False),
            StructField("percent_5", pyspark.sql.types.FloatType(), False),
            StructField("percent_6", pyspark.sql.types.FloatType(), False),
            StructField("percent_7", pyspark.sql.types.FloatType(), False),
            StructField("percent_8", pyspark.sql.types.FloatType(), False)]
        )
data = spark.read.csv("elections.csv", schema=schema)
data.createOrReplaceTempView('elections')

In [87]:
# найти явку (%) по всем регионам, результат отсортировать по убыванию 
def get_percent_by_region(session):
    return session.sql("select region, (100*(sum(v_9)+sum(v_10))/sum(v_1)) as percent from elections group by region order by percent desc")

In [63]:
get_percent_by_region(spark).show()

+--------------------+-----------------+
|              region|          percent|
+--------------------+-----------------+
|     Республика Тыва|93.62542974951742|
|Ямало-Ненецкий ав...|91.87213306617983|
|Кабардино-Балкарс...| 91.6978754085207|
|Чеченская Республика|91.50929703298175|
|Республика Северн...|89.93704831730909|
| Республика Дагестан|87.44351419820495|
|Карачаево-Черкесс...|  87.343809390099|
| Кемеровская область|83.07556958224816|
|Чукотский автоном...|82.26051697921946|
|Республика Ингушетия|81.95937165135314|
|    Брянская область|79.65976807571519|
|   Тюменская область| 78.8829673763749|
| Республика Мордовия|77.83648793754097|
|  Краснодарский край|77.83466166202626|
|Республика Татарс...| 77.3652654820273|
|Чувашская Республ...|76.21061419670117|
|Республика Башкор...|75.42120065745192|
|  Республика Бурятия|75.19133382510174|
|Республика Адыгея...|74.30595153243051|
| Ставропольский край|73.77007114684982|
+--------------------+-----------------+
only showing top

In [88]:
""" выбрать любимого кандидата и найти тот избиратльный участок, 
на котором он получил наибольший результат (учитывать участки на которых проголосовало больше 300 человек) 
"""
def find_best_uik(session, candidate):
    return session.sql("select %s, region, tik, uik, 100*%s/(v_9+v_10) as percent from elections where (v_9+v_10)>300 order by percent desc" % (candidate, candidate))

In [86]:
find_best_uik(spark, "v_13").show(1)

+----+--------------------+--------------------+---------+------------------+
|v_13|              region|                 tik|      uik|           percent|
+----+--------------------+--------------------+---------+------------------+
| 170|Республика Татарс...|Набережные Челны,...|УИК №2098|10.365853658536585|
+----+--------------------+--------------------+---------+------------------+
only showing top 1 row



In [91]:
# найти регион, где разница между ТИК с наибольшей явкой и наименьшей максимальна
def find_max_voters_number_difference(session):
    return session.sql("""select region, (max(percent) - min(percent)) as difference, max(percent), min(percent) from
    (select region, tik, 100*(sum(v_9)+sum(v_10))/sum(v_1) as percent from elections group by region, tik) 
    group by region order by difference desc""")

In [104]:
find_max_voters_number_difference(spark).show()

+--------------------+------------------+-----------------+------------------+
|              region|        difference|     max(percent)|      min(percent)|
+--------------------+------------------+-----------------+------------------+
|Архангельская обл...|  49.8591648406849|  99.795605518651|49.936440677966104|
| Сахалинская область|48.425080358447666| 99.5766299745978| 51.15154961615013|
| Республика Дагестан| 46.44410241767304|  98.097936058276| 51.65383364060297|
|   Самарская область| 41.71955342820328|98.76494804940208|  57.0453946211988|
|   Красноярский край| 41.60373262210857|90.67096347711121| 49.06723085500264|
| Саратовская область| 39.79730608698219|93.27429609445959|  53.4769900074774|
|Республика Татарс...| 39.56726290580031|99.12042668662862| 59.55316378082831|
|  Краснодарский край|37.372497281522975|96.18320610687023|58.810708825347255|
|Республика Адыгея...| 36.03080253305154|97.70222146854333|61.671418935491786|
| Республика Калмыкия|  35.8004704715642|91.11192392

In [115]:
"""
для каждого кандидата посчитать таблицу: 
результат (%, округленный до целого) - количество УИК, на которых кандидат получил данный результат
"""
def count_results_fro_candidates(session):
    results = []
    for i in range(13, 21):
        candidate = f"v_{i}"
        results.append(session.sql('''
            select percent, count(percent) as uiks_number from 
            (select uik, floor(100*%s/(v_9+v_10)) as percent from elections) 
            group by percent order by uiks_number desc''' % candidate))
    return results

In [118]:
candidates = ["Бабурин Сергей Николаевич",
"Грудинин Павел Николаевич",
"Жириновский Владимир Вольфович",
"Путин Владимир Владимирович",
"Собчак Ксения Анатольевна",
"Сурайкин Максим Александрович",
"Титов Борис Юрьевич",
"Явлинский Григорий Алексеевич"]
for i, result in enumerate(count_results_fro_candidates(spark)):
    print(candidates[i])
    result.show()
    print("\n")

Бабурин Сергей Николаевич
+-------+-----------+
|percent|uiks_number|
+-------+-----------+
|      0|      80093|
|      1|      15385|
|      2|       1264|
|      3|        262|
|      4|        136|
|      5|         65|
|      6|         26|
|      7|         19|
|      8|         12|
|     10|          9|
|      9|          5|
|     11|          4|
|     14|          3|
|     20|          3|
|     18|          1|
|     23|          1|
|     17|          1|
|     21|          1|
|     12|          1|
|     44|          1|
+-------+-----------+
only showing top 20 rows



Грудинин Павел Николаевич
+-------+-----------+
|percent|uiks_number|
+-------+-----------+
|     11|       7794|
|     12|       7733|
|     10|       7635|
|     13|       6802|
|      9|       6491|
|      8|       5671|
|     14|       5422|
|      7|       4763|
|      6|       4237|
|     15|       4236|
|      5|       3897|
|      4|       3672|
|      3|       3501|
|     16|       3372|
|      2|       33