In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import socket

hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)

spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.driver.host", local_ip) \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .appName("HWSparkSQL") \
    .config("spark.jars.packages",
           "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262") \
    .getOrCreate()

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "minio")
hadoop_conf.set("fs.s3a.secret.key", "minio123")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.path.style.access", "true")

df = spark.read.option("header", True).option("sep", ",").option("inferSchema", True).csv("s3a://my-bucket/covid-data.csv")


In [6]:
#Выберите 15 стран с наибольшим процентом переболевших на 31 марта 
#(в выходящем датасете необходимы колонки: iso_code, страна, процент переболевших)
top15_countries = df.filter(
    (col("date") == "2021-03-31") & 
    (col("location").isNotNull()) &
    (col("iso_code").isNotNull()) &
    (~col("iso_code").startswith("OWID"))
).select(
    col("iso_code"),
    col("location").alias("страна"),
    (round((col("total_cases") - col("total_deaths")) / col("population") * 100, 2)).alias("процент переболевших")
).filter(col("процент переболевших").isNotNull()) \
 .orderBy(desc("процент переболевших")) \
 .limit(15)

top15_countries.show()


+--------+-------------+--------------------+
|iso_code|       страна|процент переболевших|
+--------+-------------+--------------------+
|     AND|      Andorra|                15.4|
|     MNE|   Montenegro|               14.32|
|     CZE|      Czechia|               14.06|
|     SMR|   San Marino|               13.69|
|     SVN|     Slovenia|               10.18|
|     LUX|   Luxembourg|                9.73|
|     ISR|       Israel|                9.55|
|     USA|United States|                9.04|
|     SRB|       Serbia|                8.75|
|     BHR|      Bahrain|                8.46|
|     PAN|       Panama|                8.09|
|     EST|      Estonia|                7.95|
|     PRT|     Portugal|                7.89|
|     SWE|       Sweden|                7.84|
|     LTU|    Lithuania|                7.81|
+--------+-------------+--------------------+



In [8]:
#Top 10 стран с максимальным зафиксированным кол-вом новых случаев за последнюю неделю марта 2021 
#в отсортированном порядке по убыванию
#(в выходящем датасете необходимы колонки: число, страна, кол-во новых случаев)

last_week_march_2021 = df.filter(
    (col("date") >= "2021-03-25") & 
    (col("date") <= "2021-03-31") &
    (col("location").isNotNull()) &
    (~col("iso_code").startswith("OWID")) &
    (col("new_cases").isNotNull()) &
    (col("new_cases") > 0)
)

# Окно для нахождения дня с максимумом для каждой страны
date_max_cases_by_location = Window.partitionBy("location").orderBy(desc("new_cases"), "date")

top10_countries = last_week_march_2021.withColumn("rank", row_number().over(date_max_cases_by_location)) \
    .filter(col("rank") == 1) \
    .select(
        col("date").alias("число"),
        col("location").alias("страна"),
        col("new_cases").alias("количество новых случаев")
    ) \
    .orderBy(desc("количество новых случаев")) \
    .limit(10)


top10_countries.show()

+----------+-------------+------------------------+
|     число|       страна|количество новых случаев|
+----------+-------------+------------------------+
|2021-03-25|       Brazil|                100158.0|
|2021-03-26|United States|                 77321.0|
|2021-03-31|        India|                 72330.0|
|2021-03-31|       France|                 59054.0|
|2021-03-31|       Turkey|                 39302.0|
|2021-03-26|       Poland|                 35145.0|
|2021-03-31|      Germany|                 25014.0|
|2021-03-26|        Italy|                 24076.0|
|2021-03-25|         Peru|                 19206.0|
|2021-03-26|      Ukraine|                 18226.0|
+----------+-------------+------------------------+



In [9]:
#Посчитайте изменение случаев относительно предыдущего дня в России за последнюю неделю марта 2021. 
#(например: в россии вчера было 9150 , сегодня 8763, итог: -387) 
#(в выходящем датасете необходимы колонки: число, кол-во новых случаев вчера, кол-во новых случаев сегодня, дельта)

russia_data_last_week_march_2021 = df.filter(
        (col("location") == "Russia") &
        (col("date") >= "2021-03-25") &
        (col("date") <= "2021-03-31") &
        (col("new_cases").isNotNull())
    ).select(
        "date",
        col("new_cases").cast(LongType()).alias("new_cases")
    ).orderBy("date")
    
window_date_sort = Window.orderBy("date")
    
cases_changes = russia_data_last_week_march_2021.withColumn(
        "prev_day_cases", 
        lag("new_cases").over(window_date_sort)
    ).filter(col("prev_day_cases").isNotNull()) \
     .withColumn(
         "дельта", 
         col("new_cases") - col("prev_day_cases")
     ).select(
         col("date").alias("дата"),
         col("prev_day_cases").alias("кол-во новых случаев вчера"),
         col("new_cases").alias("кол-во новых случаев сегодня"),
         col("дельта")
     )

cases_changes.show()

+----------+--------------------------+----------------------------+------+
|      дата|кол-во новых случаев вчера|кол-во новых случаев сегодня|дельта|
+----------+--------------------------+----------------------------+------+
|2021-03-26|                      9128|                        9073|   -55|
|2021-03-27|                      9073|                        8783|  -290|
|2021-03-28|                      8783|                        8979|   196|
|2021-03-29|                      8979|                        8589|  -390|
|2021-03-30|                      8589|                        8162|  -427|
|2021-03-31|                      8162|                        8156|    -6|
+----------+--------------------------+----------------------------+------+



In [10]:
spark.stop()