<a href="https://colab.research.google.com/github/urszkam/AoC_2022/blob/main/Copy_of_04_spark_student.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import kagglehub

path = kagglehub.dataset_download("sobhanmoosavi/us-accidents")
print(path)

Using Colab cache for faster access to the 'us-accidents' dataset.
/kaggle/input/us-accidents


In [3]:
!cp -R /root/.cache/kagglehub/datasets/sobhanmoosavi/us-accidents/versions/13/US_Accidents_March23.csv /content/sample_data/US_Accidents_March23.csv


cp: cannot stat '/root/.cache/kagglehub/datasets/sobhanmoosavi/us-accidents/versions/13/US_Accidents_March23.csv': No such file or directory


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master("local[*]") \
                    .config("spark.executor.memory", "8g") \
                    .config("spark.driver.memory", "4g") \
                    .appName("mlibs") \
                    .getOrCreate()

inDF = spark.read.format("csv") \
  .option("sep", ",") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(path + "/US_Accidents_March23.csv")

inDF.printSchema()

inDF.createOrReplaceTempView("accidents")

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

In [24]:
from datetime import datetime
from pyspark.storagelevel import StorageLevel


##  **Zadania praktyczne z użyciem `cache()` – PySpark**

---

### **Zadanie 1: Filtrowanie i cache warunków pogodowych**

**Cel:** Przefiltruj dane do wypadków z niepustym polem `Weather_Condition` i cache’uj wynik.


Pytanie: Jakie są 3 najczęstsze warunki pogodowe przy wypadkach?

---


In [27]:
subset_weather = inDF.filter(inDF["Weather_Condition"].isNotNull())


In [28]:
subset_weather.cache()

start = datetime.now()

subset_weather.groupBy("Weather_Condition").count().orderBy("count", ascending=False).limit(3).show()

print(datetime.now() - start)

+-----------------+-------+
|Weather_Condition|  count|
+-----------------+-------+
|             Fair|2560802|
|    Mostly Cloudy|1016195|
|           Cloudy| 817082|
+-----------------+-------+

0:04:14.349550



###  **Zadanie 2: Porównanie z i bez `cache()`**

Cel: Sprawdź różnicę czasu wykonania tej samej analizy z i bez cache.


Pytanie: Czy `cache()` przyspieszyło analizę? O ile?

---


In [29]:
start = datetime.now()

subset_weather.groupBy("Weather_Condition").count().orderBy("count", ascending=False).limit(3).show()

print(datetime.now() - start)

+-----------------+-------+
|Weather_Condition|  count|
+-----------------+-------+
|             Fair|2560802|
|    Mostly Cloudy|1016195|
|           Cloudy| 817082|
+-----------------+-------+

0:00:01.876145




###  **Zadanie 3: Cache i analiza skumulowana (window function)**

Cel: Oblicz skumulowaną liczbę wypadków w danym stanie (cache uprzednio przefiltrowane dane).


Pytanie: Które 3 stany mają największą skumulowaną liczbę wypadków?

---

In [30]:
start = datetime.now()
subset_weather.groupBy("State").count().orderBy("count", ascending=False).limit(3).show()
print(datetime.now() - start)

+-----+-------+
|State|  count|
+-----+-------+
|   CA|1701655|
|   FL| 870292|
|   TX| 573638|
+-----+-------+

0:00:01.880758




###  **Zadanie 4: `persist()` jako alternatywa dla `cache()`**

Cel: Zamiast `cache()` użyj `persist(StorageLevel.DISK_ONLY)` i porównaj wydajność.



---

In [31]:
subset_weather.persist(StorageLevel.DISK_ONLY)

start = datetime.now()
subset_weather.groupBy("State").count().orderBy("count", ascending=False).limit(3).show()
print(datetime.now() - start)

+-----+-------+
|State|  count|
+-----+-------+
|   CA|1701655|
|   FL| 870292|
|   TX| 573638|
+-----+-------+

0:00:01.621200



> Jeśli pracujesz w PySparku, pamiętaj najpierw o rejestracji DataFrame jako tabeli SQL:

```python
df.createOrReplaceTempView("accidents")
```

---


### **Zadanie 5 – Top miasta z największą liczbą wypadków**

**Cel:** Znajdź 10 miast z największą liczbą wypadków.


In [8]:
query = """
SELECT City, COUNT(*) AS total_accidents
FROM accidents
GROUP BY City
ORDER BY total_accidents DESC
LIMIT 10
"""

spark.sql(query).show()

+-----------+---------------+
|       City|total_accidents|
+-----------+---------------+
|      Miami|         186917|
|    Houston|         169609|
|Los Angeles|         156491|
|  Charlotte|         138652|
|     Dallas|         130939|
|    Orlando|         109733|
|     Austin|          97359|
|    Raleigh|          86079|
|  Nashville|          72930|
|Baton Rouge|          71588|
+-----------+---------------+




---

### **Zadanie 6 – Najczęstsze warunki pogodowe**

**Cel:** Wypisz 5 najczęstszych warunków pogodowych podczas wypadków.

---

In [10]:
query = """
SELECT Weather_Condition, COUNT(*) AS total_accidents
FROM accidents
GROUP BY Weather_Condition
ORDER BY total_accidents DESC
LIMIT 5
"""

spark.sql(query).show()

+-----------------+---------------+
|Weather_Condition|total_accidents|
+-----------------+---------------+
|             Fair|        2560802|
|    Mostly Cloudy|        1016195|
|           Cloudy|         817082|
|            Clear|         808743|
|    Partly Cloudy|         698972|
+-----------------+---------------+





### **Zadanie 7 – Analiza godzinowa**

**Cel:** Wyciągnij godzinę z kolumny `Start_Time` i podlicz liczbę wypadków dla każdej godziny.


---

In [15]:
query = """
SELECT HOUR(Start_Time) AS start_hour, COUNT(*) AS total_accidents
FROM accidents
GROUP BY start_hour
ORDER BY total_accidents DESC
"""

spark.sql(query).show(24)

+----------+---------------+
|start_hour|total_accidents|
+----------+---------------+
|         7|         587472|
|        16|         581969|
|         8|         577576|
|        17|         576015|
|        15|         525855|
|        14|         448846|
|        18|         432042|
|         6|         405837|
|        13|         396445|
|         9|         363034|
|        11|         355040|
|        12|         355001|
|        10|         342706|
|        19|         295121|
|         5|         228182|
|        20|         225226|
|        21|         191452|
|        22|         167645|
|         4|         159852|
|        23|         126539|
|         0|         112378|
|         1|          97071|
|         2|          93227|
|         3|          83863|
+----------+---------------+





### **Zadanie 8 – Dzień vs. Noc**

**Cel:** Policz liczbę wypadków, które wydarzyły się w ciągu dnia i nocy (`Sunrise_Sunset`).

---



In [12]:
query = """
SELECT DISTINCT Sunrise_Sunset
FROM accidents
"""

spark.sql(query).show()

+--------------+
|Sunrise_Sunset|
+--------------+
|         Night|
|           Day|
|          NULL|
+--------------+



In [14]:
query = """
SELECT Sunrise_Sunset, COUNT(*) AS total_accidents
FROM accidents
WHERE Sunrise_Sunset IS NOT NULL
GROUP BY Sunrise_Sunset
"""

spark.sql(query).show()

+--------------+---------------+
|Sunrise_Sunset|total_accidents|
+--------------+---------------+
|         Night|        2370595|
|           Day|        5334553|
+--------------+---------------+





### **Zadanie 9 – Filtrowanie poważnych wypadków przy złej pogodzie**

**Cel:** Wybierz tylko wypadki o powadze ≥ 4 i złych warunkach pogodowych.

---


In [17]:
query = """
SELECT DISTINCT Weather_Condition
FROM accidents
"""

spark.sql(query).show(50)

+--------------------+
|   Weather_Condition|
+--------------------+
|         Ice Pellets|
|         Shallow Fog|
|        Thunderstorm|
|        Volcanic Ash|
|   N/A Precipitation|
|Showers in the Vi...|
|              Cloudy|
| Light Freezing Rain|
|        Blowing Snow|
|  Heavy Rain / Windy|
|   Low Drifting Snow|
|Heavy Thunderstor...|
|     Widespread Dust|
|         Snow Grains|
|Light Rain with T...|
|             Squalls|
|    Scattered Clouds|
|       Heavy T-Storm|
|      Patches of Fog|
|        Rain Showers|
|Thunderstorms and...|
|             Drizzle|
|      Cloudy / Windy|
|             T-Storm|
|        Snow Showers|
|                 Fog|
|       Partly Cloudy|
|               Clear|
|  Light Freezing Fog|
|                Fair|
|       Heavy Drizzle|
|Heavy Thunderstor...|
|       Mostly Cloudy|
|        Funnel Cloud|
|               Smoke|
|   Light Ice Pellets|
|Light Freezing Dr...|
|          Light Haze|
|Heavy T-Storm / W...|
|          Heavy Rain|
|          

In [43]:
query = """
SELECT ID, Severity, Weather_Condition
FROM accidents
WHERE
  Severity >= 4
  AND Weather_Condition RLIKE '(?i)(rain|snow|fog|ice|storm|hail)'
"""

spark.sql(query).show()

+--------+--------+--------------------+
|      ID|Severity|   Weather_Condition|
+--------+--------+--------------------+
| A-33932|       4|          Light Rain|
| A-58392|       4|                Rain|
|A-135771|       4|Heavy Thunderstor...|
|A-140391|       4|          Light Rain|
|A-141009|       4|          Light Rain|
|A-142111|       4|        Thunderstorm|
|A-142114|       4|        Thunderstorm|
|A-145061|       4|Light Thunderstor...|
|A-145062|       4|Light Thunderstor...|
|A-187900|       4|                Rain|
|A-192709|       4|          Light Rain|
|A-194270|       4|          Light Rain|
|A-216275|       4|          Light Rain|
|A-237861|       4|          Light Rain|
|A-271300|       4|          Light Rain|
|A-289564|       4|          Heavy Rain|
|A-307817|       4|  Light Freezing Fog|
|A-308661|       4|          Light Rain|
|A-328013|       4|                Snow|
|A-361338|       4|          Light Rain|
+--------+--------+--------------------+
only showing top


### **Zadanie 10 – Kierunki wiatru**

**Cel:** Znajdź najczęściej występujące kierunki wiatru podczas wypadków.

---

In [20]:
query = """
SELECT Wind_Direction, COUNT(*) AS total_accidents
FROM accidents
GROUP BY Wind_Direction
ORDER BY total_accidents DESC
"""

spark.sql(query).show()

+--------------+---------------+
|Wind_Direction|total_accidents|
+--------------+---------------+
|          CALM|         961624|
|             S|         419989|
|           SSW|         384840|
|             W|         383913|
|           WNW|         378781|
|            NW|         369352|
|          Calm|         368557|
|            SW|         364470|
|           WSW|         353806|
|           SSE|         349110|
|           NNW|         333427|
|             N|         307151|
|            SE|         294901|
|             E|         278914|
|           ESE|         268311|
|            NE|         258639|
|           ENE|         258474|
|           NNE|         255311|
|           VAR|         250566|
|         South|         177375|
+--------------+---------------+
only showing top 20 rows




---

## Zadanie 11 `groupBy` + `agg`

**Zadanie:**
Policz średnią widoczność (`Visibility(mi)`) i średnią temperaturę (`Temperature(F)`) dla każdego stanu (`State`). Posortuj wyniki malejąco po średniej temperaturze.


In [41]:
query = """
SELECT State, ROUND(AVG(`Visibility(mi)`), 2) AS avg_visibility, ROUND(AVG(`Temperature(F)`), 2) AS avg_temperature
FROM accidents
GROUP BY State
ORDER BY avg_temperature DESC
"""

spark.sql(query).show()

+-----+--------------+---------------+
|State|avg_visibility|avg_temperature|
+-----+--------------+---------------+
|   FL|          9.51|          75.37|
|   AZ|         10.21|          72.65|
|   LA|          9.16|          70.03|
|   TX|          9.25|          69.87|
|   AL|          9.09|          66.49|
|   MS|          8.99|          66.13|
|   SC|          9.08|          64.53|
|   GA|          8.98|          64.29|
|   CA|          9.09|           63.9|
|   OK|           9.4|           63.8|
|   NC|          9.11|          62.85|
|   TN|          9.11|          62.39|
|   AR|          8.83|          59.85|
|   DC|          9.47|          59.38|
|   VA|          9.13|          59.24|
|   DE|          9.16|          58.94|
|   KY|          8.81|          58.72|
|   NM|          9.69|          58.08|
|   NJ|           8.9|          57.25|
|   MD|          8.87|          56.73|
+-----+--------------+---------------+
only showing top 20 rows




---

## Zadanie 12 `orderBy`

**Zadanie:**
Znajdź 5 wypadków o najdłuższym czasie trwania (`End_Time - Start_Time`). Wyświetl ich `ID`, `City`, `State` i czas trwania w minutach.

---

In [34]:
query = """
SELECT ID, Start_Time, End_Time, City, State, TIMESTAMPDIFF(MINUTE, Start_Time, End_Time) AS duration
FROM accidents
ORDER BY duration DESC
LIMIT 5
"""

spark.sql(query).show()

+---------+-------------------+-------------------+-----------+-----+--------+
|       ID|         Start_Time|           End_Time|       City|State|duration|
+---------+-------------------+-------------------+-----------+-----+--------+
|A-4810425|2016-10-21 07:26:00|2022-02-25 17:45:00| Wilmington|   DE| 2812939|
|A-5053641|2016-10-21 07:26:00|2022-02-25 17:45:00| Wilmington|   DE| 2812939|
|A-5399002|2018-04-19 09:24:00|2022-07-20 10:49:45|Southampton|   NY| 2236405|
|A-4014778|2018-04-19 09:24:00|2022-07-20 09:59:32|Southampton|   NY| 2236355|
|A-5073315|2018-04-19 09:24:00|2022-07-20 09:59:32|Southampton|   NY| 2236355|
+---------+-------------------+-------------------+-----------+-----+--------+





## Zadanie 13 `to_timestamp` i różnice czasowe

**Zadanie:**
Oblicz średnią długość wypadków (w minutach) w dzień (`Sunrise_Sunset = 'Day'`) i w nocy (`Sunrise_Sunset = 'Night'`).

---

In [42]:
query = """
SELECT Sunrise_Sunset, ROUND(AVG(TIMESTAMPDIFF(MINUTE, Start_Time, End_Time)), 2) AS avg_duration
FROM accidents
WHERE Sunrise_Sunset IS NOT NULL
GROUP BY Sunrise_Sunset
ORDER BY avg_duration DESC
"""

spark.sql(query).show()

+--------------+------------+
|Sunrise_Sunset|avg_duration|
+--------------+------------+
|         Night|       561.7|
|           Day|      379.29|
+--------------+------------+





## Zadanie 14 `filter` + `isin`

**Zadanie:**
Znajdź wszystkie wypadki, które wydarzyły się w stanie **California (CA)** lub **Texas (TX)** **w złych warunkach pogodowych** (`Weather_Condition IN ('Rain', 'Snow', 'Fog')`).

---

In [36]:
query = """
SELECT ID, State, Weather_Condition
FROM accidents
WHERE State IN ('CA', 'TX')
  AND Weather_Condition IN ('Rain', 'Snow', 'Fog')
"""

spark.sql(query).show()

+------+-----+-----------------+
|    ID|State|Weather_Condition|
+------+-----+-----------------+
|A-4382|   CA|              Fog|
|A-5598|   CA|              Fog|
|A-5608|   CA|              Fog|
|A-5610|   CA|              Fog|
|A-5795|   CA|              Fog|
|A-5806|   CA|              Fog|
|A-5812|   CA|              Fog|
|A-6065|   CA|             Rain|
|A-6070|   CA|             Rain|
|A-6088|   CA|             Rain|
|A-6102|   CA|             Rain|
|A-6105|   CA|             Rain|
|A-6111|   CA|             Rain|
|A-6115|   CA|             Rain|
|A-6131|   CA|             Rain|
|A-6132|   CA|             Rain|
|A-6133|   CA|             Rain|
|A-6136|   CA|             Rain|
|A-6142|   CA|             Rain|
|A-6147|   CA|             Rain|
+------+-----+-----------------+
only showing top 20 rows





## Zadanie 15 `withColumn`

**Zadanie:**
Dodaj nową kolumnę `is_long`, która przyjmuje wartość:

* `1` jeśli wypadek trwał dłużej niż 60 minut,
* `0` w przeciwnym razie.

---


In [38]:
query = """
SELECT
  ID,
  Start_Time,
  End_Time,
  CASE
    WHEN TIMESTAMPDIFF(MINUTE, Start_Time, End_Time) > 60 THEN 1
    ELSE 0
  END AS is_long
FROM accidents
"""

spark.sql(query).show()

+----+-------------------+-------------------+-------+
|  ID|         Start_Time|           End_Time|is_long|
+----+-------------------+-------------------+-------+
| A-1|2016-02-08 05:46:00|2016-02-08 11:00:00|      1|
| A-2|2016-02-08 06:07:59|2016-02-08 06:37:59|      0|
| A-3|2016-02-08 06:49:27|2016-02-08 07:19:27|      0|
| A-4|2016-02-08 07:23:34|2016-02-08 07:53:34|      0|
| A-5|2016-02-08 07:39:07|2016-02-08 08:09:07|      0|
| A-6|2016-02-08 07:44:26|2016-02-08 08:14:26|      0|
| A-7|2016-02-08 07:59:35|2016-02-08 08:29:35|      0|
| A-8|2016-02-08 07:59:58|2016-02-08 08:29:58|      0|
| A-9|2016-02-08 08:00:40|2016-02-08 08:30:40|      0|
|A-10|2016-02-08 08:10:04|2016-02-08 08:40:04|      0|
|A-11|2016-02-08 08:14:42|2016-02-08 08:44:42|      0|
|A-12|2016-02-08 08:21:27|2016-02-08 08:51:27|      0|
|A-13|2016-02-08 08:36:34|2016-02-08 09:06:34|      0|
|A-14|2016-02-08 08:37:07|2016-02-08 09:07:07|      0|
|A-15|2016-02-08 08:39:43|2016-02-08 09:09:43|      0|
|A-16|2016


## Zadanie 16 `count`

**Zadanie:**
Policz liczbę różnych miast (`DISTINCT City`) w każdym stanie (`State`). Posortuj wyniki malejąco po liczbie unikalnych miast.

---

In [40]:
query = """
SELECT State, COUNT(DISTINCT City) distinct_cities
FROM accidents
GROUP BY State
ORDER BY distinct_cities DESC
"""

spark.sql(query).show()

+-----+---------------+
|State|distinct_cities|
+-----+---------------+
|   PA|           1422|
|   CA|           1268|
|   NY|           1215|
|   TX|            883|
|   IL|            785|
|   OH|            780|
|   VA|            660|
|   MN|            656|
|   NC|            612|
|   MI|            590|
|   FL|            586|
|   IA|            559|
|   NJ|            558|
|   GA|            525|
|   WI|            520|
|   IN|            493|
|   AL|            458|
|   TN|            430|
|   MO|            410|
|   MA|            406|
+-----+---------------+
only showing top 20 rows

