In [75]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window as W

In [103]:
spark = SparkSession.builder.appName('test_name').getOrCreate()

In [104]:
df = spark.read.option('inferSchema', 'true')\
.option('header', 'true').csv("/users/dznosok/downloads/owid-covid-data.csv")

In [33]:
df.printSchema()

root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: 

Выберите 15 стран с наибольшим процентом переболевших на 31 марта (в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)

In [105]:
df.select('iso_code', 'location', (F.col('total_cases')/F.col('population')).alias('procent'))\
.where(F.col('date') == '2021-03-31')\
.orderBy(F.col('procent').desc())\
.show(15)

+--------+-------------+-------------------+
|iso_code|     location|            procent|
+--------+-------------+-------------------+
|     AND|      Andorra|0.15543907331909662|
|     MNE|   Montenegro|0.14523725364693293|
|     CZE|      Czechia|0.14308848404077998|
|     SMR|   San Marino| 0.1393717956273204|
|     SVN|     Slovenia|0.10370805779121203|
|     LUX|   Luxembourg|0.09847342390123583|
|     ISR|       Israel|0.09625106044786802|
|     USA|United States|0.09203010995860707|
|     SRB|       Serbia|0.08826328557933491|
|     BHR|      Bahrain|0.08488860079114566|
|     PAN|       Panama|0.08228739065460762|
|     PRT|     Portugal|0.08058699735120368|
|     EST|      Estonia| 0.0802268157965955|
|     SWE|       Sweden|0.07969744347858805|
|     LTU|    Lithuania|0.07938864728274825|
+--------+-------------+-------------------+
only showing top 15 rows



Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 в отсортированном порядке по убыванию
(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

In [108]:
window = W.partitionBy("location").orderBy(F.col("new_cases").desc())

In [110]:
df.withColumn('rn', F.row_number().over(window))\
.select('date', 'location', 'new_cases')\
.where(F.col('date') >= '2021-03-29').where(F.col('date') <= '2021-04-04')\
.where(F.col('continent') != 'null')\
.where(F.col('rn')==1)\
.orderBy(F.col('new_cases').desc()).show(10)


+----------+--------------------+---------+
|      date|            location|new_cases|
+----------+--------------------+---------+
|2021-04-01|              Poland|  35253.0|
|2021-04-03|             Ukraine|  20456.0|
|2021-04-02|         Philippines|  15298.0|
|2021-03-30|              Serbia|   9983.0|
|2021-04-01|            Cameroon|   9668.0|
|2021-03-30|              Greece|   4322.0|
|2021-03-29|              Kosovo|   4257.0|
|2021-03-29|Bosnia and Herzeg...|   3755.0|
|2021-04-01|            Ethiopia|   2372.0|
|2021-04-04|           Venezuela|   1786.0|
+----------+--------------------+---------+
only showing top 10 rows



Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. (например: в россии вчера было 9150 , сегодня 8763, итог: -387) (в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)



In [91]:
window = W.orderBy("date")

In [101]:
df.select( 'date', 'new_cases', F.col('new_cases') - F.col('new_cases'))\
.where(F.col('location') == 'Russia')\
.select('date', F.lag('new_cases').over(window).alias('lag'),'new_cases')\
.where(F.col('date') >= '2021-03-29').where(F.col('date') <= '2021-04-04')\
.select('date', 'lag', 'new_cases', (F.col('new_cases')-F.col('lag')).alias('delta')).show()

+----------+------+---------+------+
|      date|   lag|new_cases| delta|
+----------+------+---------+------+
|2021-03-29|8979.0|   8589.0|-390.0|
|2021-03-30|8589.0|   8162.0|-427.0|
|2021-03-31|8162.0|   8156.0|  -6.0|
|2021-04-01|8156.0|   9057.0| 901.0|
|2021-04-02|9057.0|   8682.0|-375.0|
|2021-04-03|8682.0|   8906.0| 224.0|
|2021-04-04|8906.0|   8697.0|-209.0|
+----------+------+---------+------+



23/10/31 22:47:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/31 22:47:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/31 22:47:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/31 22:47:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/31 22:47:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [102]:
spark.stop()