In [9]:
pip install --quiet pyspark

Note: you may need to restart the kernel to use updated packages.


In [99]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window, types

spark = SparkSession.builder.appName("Practice").getOrCreate()

In [12]:
df = spark.read.csv('data/covid-data.csv', header=True, inferSchema=True)

In [8]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

.

Задание 1

Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [108]:
df_1 = df.select('iso_code', 'date', (df.total_cases_per_million / 10000).alias("percent_of_patients"))\
    .filter(df.date == '2021-03-31')\
    .sort(F.col('percent_of_patients').desc())\
    .limit(15)

df_1.show()
df_1.write.csv("/content/result_1")

+--------+----------+-------------------+
|iso_code|      date|percent_of_patients|
+--------+----------+-------------------+
|     AND|2021-03-31|         15.5439073|
|     MNE|2021-03-31| 14.523725399999998|
|     CZE|2021-03-31|         14.3088484|
|     SMR|2021-03-31|         13.9371796|
|     SVN|2021-03-31| 10.370805800000001|
|     LUX|2021-03-31|          9.8473424|
|     ISR|2021-03-31|           9.625106|
|     USA|2021-03-31|           9.203011|
|     SRB|2021-03-31|          8.8263286|
|     BHR|2021-03-31|          8.4888601|
|     PAN|2021-03-31|          8.2287391|
|     PRT|2021-03-31|          8.0586997|
|     EST|2021-03-31|          8.0226816|
|     SWE|2021-03-31|          7.9697443|
|     LTU|2021-03-31|          7.9388647|
+--------+----------+-------------------+



.

Задание 2

Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [131]:
df_int = df.select('iso_code', 'date', 'new_cases')\
    .filter((df.date >= '2021-03-25') & (df.date <= '2021-03-31'))\
    .filter(F.substring(df.iso_code, 1, 4) != 'OWID')

df_int.show()

+--------+----------+---------+
|iso_code|      date|new_cases|
+--------+----------+---------+
|     AFG|2021-03-24|     15.0|
|     AFG|2021-03-25|     34.0|
|     AFG|2021-03-26|     28.0|
|     AFG|2021-03-27|     36.0|
|     AFG|2021-03-28|      4.0|
|     AFG|2021-03-29|     28.0|
|     AFG|2021-03-30|     62.0|
|     AFG|2021-03-31|     70.0|
|     ALB|2021-03-24|    448.0|
|     ALB|2021-03-25|    472.0|
|     ALB|2021-03-26|    449.0|
|     ALB|2021-03-27|    425.0|
|     ALB|2021-03-28|    493.0|
|     ALB|2021-03-29|    285.0|
|     ALB|2021-03-30|    304.0|
|     ALB|2021-03-31|    434.0|
|     DZA|2021-03-24|     89.0|
|     DZA|2021-03-25|    105.0|
|     DZA|2021-03-26|    114.0|
|     DZA|2021-03-27|     93.0|
+--------+----------+---------+
only showing top 20 rows



In [132]:
w = Window.partitionBy('iso_code').orderBy(F.desc('new_cases'))
df_int = df_int.withColumn('cases_rank', F.rank().over(w))\
    .filter(F.col('cases_rank') == 1)
    
df_2 = df_int.select('date', 'iso_code', 'new_cases')\
    .sort(F.col('new_cases').desc())\
    .limit(10)
    
df_2.show() 
df_2.write.csv("/content/result_2")

+----------+--------+---------+
|      date|iso_code|new_cases|
+----------+--------+---------+
|2021-03-25|     BRA| 100158.0|
|2021-03-24|     USA|  86960.0|
|2021-03-31|     IND|  72330.0|
|2021-03-24|     FRA|  65392.0|
|2021-03-31|     TUR|  39302.0|
|2021-03-26|     POL|  35145.0|
|2021-03-31|     DEU|  25014.0|
|2021-03-26|     ITA|  24076.0|
|2021-03-25|     PER|  19206.0|
|2021-03-26|     UKR|  18226.0|
+----------+--------+---------+



.

Задание 3

Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

In [146]:
df_int3 = df.select('iso_code', 'date', 'new_cases')\
    .filter((df.date >= '2021-03-24') & (df.date <= '2021-03-31'))\
    .filter(df.iso_code == 'RUS')\
    .sort('date')

df_int3.show()

+--------+----------+---------+
|iso_code|      date|new_cases|
+--------+----------+---------+
|     RUS|2021-03-23|   8369.0|
|     RUS|2021-03-24|   8769.0|
|     RUS|2021-03-25|   9128.0|
|     RUS|2021-03-26|   9073.0|
|     RUS|2021-03-27|   8783.0|
|     RUS|2021-03-28|   8979.0|
|     RUS|2021-03-29|   8589.0|
|     RUS|2021-03-30|   8162.0|
|     RUS|2021-03-31|   8156.0|
+--------+----------+---------+



In [151]:
w2 = Window.partitionBy('iso_code').orderBy(('date'))
df_int3 = df_int3.withColumn('previos', F.lag('new_cases').over(w2))
    
df_3 = df_int3.select('date', 'new_cases', 'previos', (df_int3.new_cases - df_int3.previos).alias('delta'))\
    .filter((df.date >= '2021-03-25') & (df.date <= '2021-03-31'))
    
df_3.show()
df_3.write.csv("/content/result_3")

+----------+---------+-------+------+
|      date|new_cases|previos| delta|
+----------+---------+-------+------+
|2021-03-25|   9128.0| 8769.0| 359.0|
|2021-03-26|   9073.0| 9128.0| -55.0|
|2021-03-27|   8783.0| 9073.0|-290.0|
|2021-03-28|   8979.0| 8783.0| 196.0|
|2021-03-29|   8589.0| 8979.0|-390.0|
|2021-03-30|   8162.0| 8589.0|-427.0|
|2021-03-31|   8156.0| 8162.0|  -6.0|
+----------+---------+-------+------+

