In [None]:
sc.install_pypi_package("numpy==1.26.4")
sc.install_pypi_package("pandas")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("altair")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg, stddev, window, year, month
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import random
from datetime import datetime, timedelta

In [None]:
spark = SparkSession.builder.appName("WeatherAnalise").getOrCreate()

In [None]:
s3_bucket = "sparkcalculations"
csvInput = f"s3://{s3_bucket}/data/weather_data.csv"

dataFile = spark.read.csv(csvInput, header = True)
dataFile.show(19)

In [None]:
df_with_time = dataFile.withColumn("year", F.year("date")) \
                          .withColumn("month", F.month("date")) \
                          .withColumn("day", F.day("date")) \
                          .withColumn("hour", F.hour("date")) 

df_with_time.show(10)

In [None]:
df_with_time = df_with_time.withColumn("date", F.to_date("date"))
df_with_time = df_with_time.select(["date", "year", "month", "day", "hour", "temperature_2m"])
df_with_time.show(10)

In [None]:
# Tworzymy okno czasowe oparte na dacie
window_spec = Window.orderBy("date")

# Flaga dla temperatury powyżej 32.2°C
df_hot = df_with_time.withColumn("is_hot", (col("temperature_2m") > 32.2).cast("int"))

# Tworzymy kolumny z przesunięciem (-1, -2 dla poprzednich dni)
df_hot = df_hot.withColumn("prev_1", lag("is_hot", 1).over(window_spec)) \
               .withColumn("prev_2", lag("is_hot", 2).over(window_spec))

# Identyfikacja fal ciepła: 3 dni pod rząd powyżej 32.2°C
df_heatwaves = df_hot.filter((col("is_hot") == 1) & (col("prev_1") == 1) & (col("prev_2") == 1))

df_heatwaves.select("date", "temperature_2m").show(50, truncate=False)


In [None]:
from pyspark.sql.functions import dense_rank

# Tworzymy unikalny identyfikator dla każdej fali ciepła
df_heatwaves = df_heatwaves.withColumn("heatwave_group", dense_rank().over(Window.orderBy("date")))

# Grupowanie po okresach
df_grouped = df_heatwaves.groupBy("heatwave_group").agg(
    F.min("date").alias("start_date"),
    F.max("date").alias("end_date"),
    F.count("*").alias("duration_days")
)

df_grouped.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------+----------+-------------+
|heatwave_group|start_date|  end_date|duration_days|
+--------------+----------+----------+-------------+
|             1|2022-07-01|2022-07-01|            2|
|             2|2022-07-21|2022-07-21|            5|
|             3|2022-08-04|2022-08-04|            3|
|             4|2022-08-05|2022-08-05|            5|
|             5|2022-08-16|2022-08-16|            4|
|             6|2022-08-19|2022-08-19|            5|
|             7|2023-08-15|2023-08-15|            1|
|             8|2024-06-30|2024-06-30|            3|
|             9|2024-07-10|2024-07-10|            2|
+--------------+----------+----------+-------------+

In [None]:
df_daily_hot = df_with_time.groupBy("date").agg(
    F.max((col("temperature_2m") > 25.2).cast("int")).alias("is_hot_day")
).filter(col("is_hot_day") == 1)
df_daily_hot.show(30)

In [None]:
window_spec = Window.orderBy("date")

df_heatwaves = df_with_time.withColumn("prev_1", lag("date", 1).over(window_spec)) \
                           .withColumn("prev_2", lag("date", 2).over(window_spec))

# Warunek: czy poprzednie 2 dni również były gorące
df_heatwaves = df_heatwaves.filter(
    (col("prev_1").isNotNull()) & (col("prev_2").isNotNull()) & 
    ((F.datediff(col("date"), col("prev_1")) == 1)) & ((F.datediff(col("prev_1"), col("prev_2")) == 1)
)
)

df_heatwaves.select("date").show(50, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_daily_hot = df_daily.groupBy("date").agg(
    max((col("temperature_2m") > 32.2).cast("int")).alias("is_hot_day")
).filter(col("is_hot_day") == 1)

window_spec = Window.orderBy("date_only")

df_heatwaves = df_daily_hot.withColumn("prev_1", lag("date", 1).over(window_spec)) \
                           .withColumn("prev_2", lag("date", 2).over(window_spec))

df_heatwaves = df_heatwaves.filter(
    (col("prev_1").isNotNull()) & (col("prev_2").isNotNull()) & 
    (datediff(col("date"), col("prev_1")) == 1) & 
    (datediff(col("prev_1"), col("prev_2")) == 1)
)

df_heatwaves.select("date").show(50, truncate=False)

In [None]:
sc.install_pypi_package("numpy==1.26.4")
sc.install_pypi_package("pandas")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("altair")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, avg, stddev, window, year, month
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import random
from datetime import datetime, timedelta

In [None]:
spark = SparkSession.builder.appName("WeatherAnalise").getOrCreate()

In [None]:
s3_bucket = "sparkcalculations"
csvInput = f"s3://{s3_bucket}/data/weather_data.csv"

dataFile = spark.read.csv(csvInput, header = True)
dataFile.show(19)

In [None]:
df_with_time = dataFile.withColumn("year", F.year("date")) \
                          .withColumn("month", F.month("date")) \
                          .withColumn("day", F.day("date")) \
                          .withColumn("hour", F.hour("date")) 

df_with_time.show(10)

In [None]:
df_with_time = df_with_time.withColumn("date", F.to_date("date"))
df_with_time = df_with_time.select(["date", "year", "month", "day", "hour", "temperature_2m"])
df_with_time.show(10)

In [7]:
hot_day = 25
df_hot_days = df_with_time.filter(col("temperature_2m") > hot_day).select("date").distinct()

window_spec = Window.orderBy("date")
df_hot_days = df_hot_days.withColumn("prev_date", F.lag("date").over(window_spec))
df_hot_days.show(10)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+
|      date| prev_date|
+----------+----------+
|2021-06-18|      NULL|
|2021-06-19|2021-06-18|
|2021-06-20|2021-06-19|
|2021-06-21|2021-06-20|
|2021-06-22|2021-06-21|
|2021-06-29|2021-06-22|
|2021-06-30|2021-06-29|
|2021-07-06|2021-06-30|
|2021-07-07|2021-07-06|
|2021-07-09|2021-07-07|
+----------+----------+
only showing top 10 rows

In [8]:
df_hot_days = df_hot_days.withColumn(
    "new_wave", 
    F.when(F.datediff("date", "prev_date") == 1, 0).otherwise(1)
)
df_hot_days = df_hot_days.withColumn(
    "wave_group",
    F.sum("new_wave").over(window_spec)
)
df_hot_days.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+--------+----------+
|      date| prev_date|new_wave|wave_group|
+----------+----------+--------+----------+
|2021-06-18|      NULL|       1|         1|
|2021-06-19|2021-06-18|       0|         1|
|2021-06-20|2021-06-19|       0|         1|
|2021-06-21|2021-06-20|       0|         1|
|2021-06-22|2021-06-21|       0|         1|
|2021-06-29|2021-06-22|       1|         2|
|2021-06-30|2021-06-29|       0|         2|
|2021-07-06|2021-06-30|       1|         3|
|2021-07-07|2021-07-06|       0|         3|
|2021-07-09|2021-07-07|       1|         4|
+----------+----------+--------+----------+
only showing top 10 rows

In [9]:

df_heatwaves = df_hot_days.groupBy("wave_group").agg(
    F.min("date").alias("start_date"),
    F.max("date").alias("end_date"),
    F.count("*").alias("duration_days")
).filter("duration_days >= 3")


df_heatwaves.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+----------+-------------+
|wave_group|start_date|end_date  |duration_days|
+----------+----------+----------+-------------+
|1         |2021-06-18|2021-06-22|5            |
|5         |2021-07-13|2021-07-16|4            |
|7         |2021-07-25|2021-07-29|5            |
|13        |2022-06-24|2022-06-28|5            |
|16        |2022-07-19|2022-07-23|5            |
|18        |2022-08-03|2022-08-05|3            |
|19        |2022-08-12|2022-08-20|9            |
|20        |2022-08-26|2022-08-28|3            |
|26        |2023-07-15|2023-07-17|3            |
|32        |2023-09-08|2023-09-13|6            |
|39        |2024-06-26|2024-06-28|3            |
|46        |2024-08-14|2024-08-17|4            |
+----------+----------+----------+-------------+