#### Необходимые импорты

In [45]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import IntegerType, StringType

#### Создание сессии spark

In [4]:
spark = SparkSession.builder \
    .appName("Dwarf") \
    .config("spark.driver.memory", "2g") \
    .master("local[*]") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/02 15:19:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/07/02 15:19:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### Прочитаем данные в DataFrame

In [5]:
df = spark.read.csv("covid-data.csv", header=True, inferSchema=True)

                                                                                

#### Посмотрим на данные

In [6]:
df.show()

25/07/02 15:20:12 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+---------+-----------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+-------------------------------------+----------------+-----------+------------------+----------+-------------+-------------+--------------+--

#### Распечатаем схему

In [7]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

### Задание 1: 
Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [8]:
df_task_1 = df \
    .select(
        'iso_code'
        , 'location'
        , ((F.col('total_cases') - F.col('total_deaths'))/F.col('population') * 100).alias('% переболевших')) \
    .where((F.col('date') == '2020-03-31') & (F.length(F.col("iso_code")) == 3)) \
    .withColumn("% переболевших", F.round("% переболевших", 2)) \
    .orderBy(F.col('% переболевших') \
    .desc()).limit(15)

df_task_1.show()



+--------+-----------+--------------+
|iso_code|   location|% переболевших|
+--------+-----------+--------------+
|     SMR| San Marino|          0.62|
|     AND|    Andorra|          0.47|
|     LUX| Luxembourg|          0.34|
|     ISL|    Iceland|          0.33|
|     ESP|      Spain|          0.19|
|     CHE|Switzerland|          0.19|
|     ITA|      Italy|          0.15|
|     MCO|     Monaco|          0.13|
|     AUT|    Austria|          0.11|
|     BEL|    Belgium|           0.1|
|     NOR|     Norway|          0.08|
|     DEU|    Germany|          0.08|
|     NLD|Netherlands|          0.07|
|     ISR|     Israel|          0.07|
|     PRT|   Portugal|          0.07|
+--------+-----------+--------------+



                                                                                

### Задание 2: 
Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [9]:
w = Window.partitionBy("location").orderBy(F.col('new_cases').desc())

df_task_2 = df \
    .select(
        'date'
        , 'location'
        , 'new_cases') \
    .where(F.to_date(F.col('date')).between('2021-03-25', '2021-03-31') & (F.length(F.col("iso_code")) == 3)) \
    .withColumn('rank', F.row_number().over(w)) \
    .filter(F.col("rank") == 1) \
    .drop("rank") \
    .sort(F.desc(F.col('new_cases'))) \
    .limit(10)

df_task_2.show()



+----------+-------------+---------+
|      date|     location|new_cases|
+----------+-------------+---------+
|2021-03-25|       Brazil| 100158.0|
|2021-03-26|United States|  77321.0|
|2021-03-31|        India|  72330.0|
|2021-03-31|       France|  59054.0|
|2021-03-31|       Turkey|  39302.0|
|2021-03-26|       Poland|  35145.0|
|2021-03-31|      Germany|  25014.0|
|2021-03-26|        Italy|  24076.0|
|2021-03-25|         Peru|  19206.0|
|2021-03-26|      Ukraine|  18226.0|
+----------+-------------+---------+



                                                                                

### Задание 3: 
Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [51]:
w = Window.partitionBy("location").orderBy(F.col('date').asc())

df_task_3 = df \
    .filter(F.lower(F.col("location")).contains('russ')) \
    .where(F.to_date(F.col('date')).between('2021-03-25', '2021-03-31')) \
    .withColumn('prev_day', F.lag(F.col('new_cases'), 1, 0).over(w)) \
    .withColumn('diff', (F.col('new_cases') - F.col('prev_day')).cast(IntegerType())) \
    .select(
        F.col('date').alias('Дата')
        , F.col('prev_day').alias('Кол-во новых случаев вчера').cast(IntegerType())
        , F.col('new_cases').alias('Кол-во новых случаев сегодня').cast(IntegerType())
        , F.col('diff').alias('Дельта')
    )

df_task_3.show()

+----------+--------------------------+----------------------------+------+
|      Дата|Кол-во новых случаев вчера|Кол-во новых случаев сегодня|Дельта|
+----------+--------------------------+----------------------------+------+
|2021-03-25|                         0|                        9128|  9128|
|2021-03-26|                      9128|                        9073|   -55|
|2021-03-27|                      9073|                        8783|  -290|
|2021-03-28|                      8783|                        8979|   196|
|2021-03-29|                      8979|                        8589|  -390|
|2021-03-30|                      8589|                        8162|  -427|
|2021-03-31|                      8162|                        8156|    -6|
+----------+--------------------------+----------------------------+------+

