In [1]:
import os
import warnings
from datetime import datetime

from pyspark.sql.functions import col, lag, lit, round, row_number
from pyspark.sql import SparkSession
from pyspark.sql import Window

warnings.filterwarnings('ignore')

In [2]:
spark = SparkSession.builder.appName('dz_spark_sql').master('local[2]').getOrCreate()
spark

In [3]:
df = (
    spark.read.
    option('header', True).
    option('sep', ',').
    option('inferSchema', True).
    csv('owid-covid-data.csv')
)

In [4]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

#### Задание 1.
Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших).

**Примечание:** в условии задачи не указано на 31 марта какого года необходимо произвести расчёт. Расчёт давался на 31 марта 2021 года

In [5]:
date = '2021-03-31'

Для подсчёта будем использовать признак 'total_cases_per_million', который характеризует общее число заболевших в переводе на 1 млн на указанную дату, из которого делением на 10 000 легко получаем процент заболевших от населения. 

In [6]:
df.select(
    'date',
    'iso_code',
    'location',
    'total_cases_per_million',
).filter(col('date') == date).show()

+----------+--------+-------------------+-----------------------+
|      date|iso_code|           location|total_cases_per_million|
+----------+--------+-------------------+-----------------------+
|2021-03-31|     AFG|        Afghanistan|               1450.203|
|2021-03-31|OWID_AFR|             Africa|               3145.844|
|2021-03-31|     ALB|            Albania|              43490.514|
|2021-03-31|     DZA|            Algeria|               2672.502|
|2021-03-31|     AND|            Andorra|             155439.073|
|2021-03-31|     AGO|             Angola|                678.842|
|2021-03-31|     AIA|           Anguilla|                   NULL|
|2021-03-31|     ATG|Antigua and Barbuda|              11600.359|
|2021-03-31|     ARG|          Argentina|              51969.922|
|2021-03-31|     ARM|            Armenia|              65009.716|
|2021-03-31|OWID_ASI|               Asia|               6143.981|
|2021-03-31|     AUS|          Australia|               1149.888|
|2021-03-3

Проанализируем записи с iso_code содержащими OWID

In [7]:
df.select(
    'date',
    'iso_code',
    'location',
    'total_cases_per_million',
).filter(col('date') == date).filter(col('iso_code').contains('OWID')).show()

+----------+--------+---------------+-----------------------+
|      date|iso_code|       location|total_cases_per_million|
+----------+--------+---------------+-----------------------+
|2021-03-31|OWID_AFR|         Africa|               3145.844|
|2021-03-31|OWID_ASI|           Asia|               6143.981|
|2021-03-31|OWID_EUR|         Europe|              53224.815|
|2021-03-31|OWID_EUN| European Union|              60204.506|
|2021-03-31|OWID_INT|  International|                   NULL|
|2021-03-31|OWID_KOS|         Kosovo|              46141.453|
|2021-03-31|OWID_NAM|  North America|              59345.329|
|2021-03-31|OWID_CYN|Northern Cyprus|                   NULL|
|2021-03-31|OWID_OCE|        Oceania|                888.307|
|2021-03-31|OWID_SAM|  South America|              49098.225|
|2021-03-31|OWID_WRL|          World|              16536.373|
+----------+--------+---------------+-----------------------+



Мы видим, что такие записи не относятся к странам, поэтому отфильтруем их. Косово не является официально признаной страной, поэтому исключим его, так как в списке отсутствуют другие непризнаные страны.

In [8]:
filt_df = df.select(
    'date',
    'iso_code',
    'location',
    'total_cases_per_million',
).filter(col('date') == date).filter(~col('iso_code').contains('OWID'))

In [9]:
result_1 = filt_df.select('iso_code', 'location', (round((col('total_cases_per_million') / 10000), 2)).alias('percent cases, %')).orderBy(col('percent cases, %').desc()).limit(15)

In [10]:
result_1.show()

+--------+-------------+----------------+
|iso_code|     location|percent cases, %|
+--------+-------------+----------------+
|     AND|      Andorra|           15.54|
|     MNE|   Montenegro|           14.52|
|     CZE|      Czechia|           14.31|
|     SMR|   San Marino|           13.94|
|     SVN|     Slovenia|           10.37|
|     LUX|   Luxembourg|            9.85|
|     ISR|       Israel|            9.63|
|     USA|United States|             9.2|
|     SRB|       Serbia|            8.83|
|     BHR|      Bahrain|            8.49|
|     PAN|       Panama|            8.23|
|     PRT|     Portugal|            8.06|
|     EST|      Estonia|            8.02|
|     SWE|       Sweden|            7.97|
|     LTU|    Lithuania|            7.94|
+--------+-------------+----------------+



### Задание 2.
Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию (в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев).

Будем считать, что под крайней неделей в марте понимается, крайниие 7 дней марта, т.е. даты с 25.03 по 31.03.

In [11]:
df_last_week_march = df.select(
    'location', 
    'new_cases',
    'date',
).filter((col('date') >= '2021-03-25') & (col('date') <= '2021-03-31')).filter(~col('iso_code').contains('OWID'))

In [12]:
df_last_week_march.show()

+-----------+---------+----------+
|   location|new_cases|      date|
+-----------+---------+----------+
|Afghanistan|     34.0|2021-03-25|
|Afghanistan|     28.0|2021-03-26|
|Afghanistan|     36.0|2021-03-27|
|Afghanistan|      4.0|2021-03-28|
|Afghanistan|     28.0|2021-03-29|
|Afghanistan|     62.0|2021-03-30|
|Afghanistan|     70.0|2021-03-31|
|    Albania|    472.0|2021-03-25|
|    Albania|    449.0|2021-03-26|
|    Albania|    425.0|2021-03-27|
|    Albania|    493.0|2021-03-28|
|    Albania|    285.0|2021-03-29|
|    Albania|    304.0|2021-03-30|
|    Albania|    434.0|2021-03-31|
|    Algeria|    105.0|2021-03-25|
|    Algeria|    114.0|2021-03-26|
|    Algeria|     93.0|2021-03-27|
|    Algeria|     86.0|2021-03-28|
|    Algeria|    110.0|2021-03-29|
|    Algeria|    115.0|2021-03-30|
+-----------+---------+----------+
only showing top 20 rows



Создадим оконную функцию, в которой отсортируем дни по количеству новых заболевших в каждой стране.

In [13]:
window = Window().partitionBy('location').orderBy(col('new_cases').desc())

In [14]:
result_2 = df_last_week_march.withColumn(
    'row_number',
    row_number().over(window)
).filter(col('row_number') == '1').select(
    'location', 
    'new_cases', 
    'date',
).orderBy(col('new_cases').desc()).limit(10)

In [15]:
result_2.show()

+-------------+---------+----------+
|     location|new_cases|      date|
+-------------+---------+----------+
|       Brazil| 100158.0|2021-03-25|
|United States|  77321.0|2021-03-26|
|        India|  72330.0|2021-03-31|
|       France|  59054.0|2021-03-31|
|       Turkey|  39302.0|2021-03-31|
|       Poland|  35145.0|2021-03-26|
|      Germany|  25014.0|2021-03-31|
|        Italy|  24076.0|2021-03-26|
|         Peru|  19206.0|2021-03-25|
|      Ukraine|  18226.0|2021-03-26|
+-------------+---------+----------+



### Задание 3.
Посчитать изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в России вчера было 9150 , сегодня 8763, итог: -387).

In [16]:
df_last_week_march_rus = df.select(
    'date',
    (col('new_cases').alias('today_new_cases')),
).filter((col('date') >= '2021-03-24') & (col('date') <= '2021-03-31')).filter(col('iso_code') == 'RUS')
df_last_week_march_rus.show()

+----------+---------------+
|      date|today_new_cases|
+----------+---------------+
|2021-03-24|         8769.0|
|2021-03-25|         9128.0|
|2021-03-26|         9073.0|
|2021-03-27|         8783.0|
|2021-03-28|         8979.0|
|2021-03-29|         8589.0|
|2021-03-30|         8162.0|
|2021-03-31|         8156.0|
+----------+---------------+



In [17]:
window = Window().partitionBy(lit(0)).orderBy('date')

In [18]:
result_3 = df_last_week_march_rus.withColumn(
    'prev_day_new_cases',
    lag(col('today_new_cases'), 1).over(window)).withColumn(
    'delta_new_cases',
    col('today_new_cases') - col('prev_day_new_cases')).filter(col('date') > '2021-03-24')

In [19]:
result_3.show()

+----------+---------------+------------------+---------------+
|      date|today_new_cases|prev_day_new_cases|delta_new_cases|
+----------+---------------+------------------+---------------+
|2021-03-25|         9128.0|            8769.0|          359.0|
|2021-03-26|         9073.0|            9128.0|          -55.0|
|2021-03-27|         8783.0|            9073.0|         -290.0|
|2021-03-28|         8979.0|            8783.0|          196.0|
|2021-03-29|         8589.0|            8979.0|         -390.0|
|2021-03-30|         8162.0|            8589.0|         -427.0|
|2021-03-31|         8156.0|            8162.0|           -6.0|
+----------+---------------+------------------+---------------+



### Сохранение результатов.

In [20]:
path = f'{os.getcwd()}/results/'

In [23]:
for n, item in enumerate([result_1, result_2, result_3]):
    item.write.option('header', True).csv(f'{path}result_{n + 1}_{datetime.now().date()}')