In [2]:
pip --version

pip 24.1.2 from /usr/local/lib/python3.10/dist-packages/pip (python 3.10)


In [3]:
!pip install pyspark py4j

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=f79351c0e5a8013dad063920fbb419de71012d36eba92a158b12f7f4383e8c36
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, count, when, to_date, col, year, month, avg, sum as sum_

# Настройка Spark
spark = SparkSession.builder.appName("Meteo Data").getOrCreate()

# 1. Чтение данных из csv файла в DataFrame
df = spark.read.csv("/content/sample_data/weather_data.csv", header=True, inferSchema=True)
# df.printSchema()
# df.show()
# df.summary().show()

# 2. Преобразование столбца date в формат даты
# df = df.withColumn("date", to_date("date"), "yyyy-MM-dd"))

# 3. Проверить данные на наличие пропущенных значений
print("Проверка на наличие пропущенных значений:")
null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
null_counts.show()

# 4. Вывести топ-5 самых жарких дней за все время наблюдений
print("Топ-5 самых жарких дней за все время наблюдений:")
top5_df = df.orderBy(col("temperature") \
            .desc()).limit(5) \
            .select("date", "temperature")
top5_df.show()

# 5. Найти метеостанцию с наибольшим количеством осадков за последний год
print("Метеостанция с наибольшим количеством осадков за последний год:")
last_year = df.filter(year("date") == 2023)
top_prec_df = last_year.groupBy("station_id") \
                .agg(sum_("precipitation").alias("sum_precipitation")) \
                .orderBy("sum_precipitation", ascending=False) \
                .limit(1)
top_prec_df.show()

# 6. Подсчитать среднюю температуру по месяцам за все время наблюдений
print("Средняя температура по месяцам за все время наблюдений:")
avg_temp_df = df.groupBy(month("date").alias("month")) \
                .agg(avg("temperature").alias("avg_temperature")) \
                .orderBy("month")
avg_temp_df.show()

# Остановка SparkContext
spark.stop()

Проверка на наличие пропущенных значений:
+----------+----+-----------+-------------+----------+
|station_id|date|temperature|precipitation|wind_speed|
+----------+----+-----------+-------------+----------+
|         0|   0|          0|            0|         0|
+----------+----+-----------+-------------+----------+

Топ-5 самых жарких дней за все время наблюдений:
+----------+------------------+
|      date|       temperature|
+----------+------------------+
|2021-08-20|39.982828249354846|
|2023-12-02| 39.96797489293784|
|2022-03-28|  39.8246894248997|
|2019-02-11| 39.76737697836647|
|2020-06-10| 39.69147838355929|
+----------+------------------+

Метеостанция с наибольшим количеством осадков за последний год:
+----------+-----------------+
|station_id|sum_precipitation|
+----------+-----------------+
| station_5|642.9302626767898|
+----------+-----------------+

Средняя температура по месяцам за все время наблюдений:
+-----+------------------+
|month|   avg_temperature|
+-----+-------