In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=64e88a33eaebab1786cae311d3f36603830af5b0aeee00a904b4b9b3626a4c2c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, avg, count, countDistinct, lit, sum, when, year, month, dayofmonth
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
import zipfile
import os

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Weather") \
    .getOrCreate()

# Path to the zip file assuming the data.zip is in the same directory as this ipynb file
zip_file_path = 'data.zip'

# Directory where the zip file will be extracted
extract_to_dir = 'data'

# Check if the extraction directory exists, create if it doesn't
if not os.path.exists(extract_to_dir):
    os.makedirs(extract_to_dir)

# Open the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all the contents into the directory
    zip_ref.extractall(extract_to_dir)

print(f"Extracted all files in {zip_file_path} to {extract_to_dir}")


FileNotFoundError: [Errno 2] No such file or directory: 'data.zip'

In [None]:
# Assuming the data.zip has been extracted into a folder named 'data'
df = spark.read.csv("data/data/*/*.csv", header=True, inferSchema=True)

# Show the DataFrame schema to understand your data
df.printSchema()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max as pyspark_max, when, row_number
from pyspark.sql.window import Window

# Convert DATE column to date type
df = df.withColumn('DATE', col('DATE').cast('date'))

# Extract year from DATE column
df = df.withColumn('YEAR', year('DATE'))

# Filter out missing MAX values
df_filtered = df.filter(df['MAX'] != 9999.9)

# Find the hottest day for each year
window_spec = Window.partitionBy('YEAR')
hottest_df = hottest_days = df_filtered.withColumn('max_temp_rank', row_number().over(window_spec.orderBy(col('MAX').desc()))) \
                          .filter(col('max_temp_rank') == 1) \
                          .select('YEAR', 'STATION', 'NAME', 'DATE', 'MAX')

# Show the hottest day for each year
hottest_days.show(truncate=False)

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = hottest_df.toPandas()



In [None]:
from pyspark.sql.functions import col, min as pyspark_min, month

# Convert DATE column to date type
df = df.withColumn('DATE', col('DATE').cast('date'))

# Extract month from DATE column
df = df.withColumn('MONTH', month('DATE'))

# Filter for January (month = 1)
df_january = df.filter(df['MONTH'] == 1)

# Filter out missing MIN values
df_filtered = df_january.filter(df_january['MIN'] != 9999.9)

# Find the coldest day for the month of January across all years
window_spec = Window.partitionBy('MONTH')
coldest_days = df_filtered.withColumn('min_temp_rank', row_number().over(window_spec.orderBy(col('MIN')))) \
                          .filter(col('min_temp_rank') == 1) \
                          .select('STATION', 'NAME', 'DATE', 'MIN')

# Show the coldest day for the month of January across all years
coldest_days.show(truncate=False)

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd_df = coldest_days.toPandas()



In [None]:
from pyspark.sql.functions import col, year

# Convert DATE column to date type
df = df.withColumn('DATE', col('DATE').cast('date'))

# Extract year from DATE column
df = df.withColumn('YEAR', year('DATE'))

# Filter for the year 2015
df_2015 = df.filter(df['YEAR'] == 2015)

# Filter out invalid precipitation values (99.99)
df_filtered = df_2015.filter((df_2015['PRCP'] != 99.99))

# Find any maximum and minimum precipitation for the year 2015
max_precipitation_data = df_filtered.orderBy(col('PRCP').desc()).select('STATION', 'NAME', 'DATE', 'PRCP').limit(1)
min_precipitation_data = df_filtered.orderBy(col('PRCP')).select('STATION', 'NAME', 'DATE', 'PRCP').limit(1)

# Show the results
print("Maximum precipitation for the year 2015 (excluding invalid values):")
max_precipitation_data.show(truncate=False)

print("Minimum precipitation for the year 2015 (excluding invalid values):")
min_precipitation_data.show(truncate=False)

In [None]:
# Filter for the year 2019
df_2019 = df.filter(year('DATE') == 2019)
# Calculate the percentage of missing gust data for the year 2019
percentage_missing = df_2019.filter(year("DATE") == 2019) \
    .select((count(when(col("GUST") == 999.9, True)) / count("*") * 100).alias("MISSING GUST PERCENTAGE"))

percentage_missing.show()

In [None]:
from pyspark.sql.functions import col, year, month, mean, stddev, expr, desc

# Filter for the year 2020
df_2020 = df.filter(year('DATE') == 2020)

# Filter out missing values (9999.9) for temperature (TEMP) column
df_2020 = df_2020.filter(df_2020['TEMP'] != 9999.9)

# Extract the month from the date column
df_2020 = df_2020.withColumn('MONTH', month('DATE'))

# Group by month and calculate mean, median, mode, and standard deviation for temperature
result_df = df_2020.groupBy('MONTH') \
    .agg(mean('TEMP').alias('MEAN_TEMP'),
         expr('percentile_approx(TEMP, 0.5)').alias('MEDIAN_TEMP'),
         expr('max(TEMP) as MODE_TEMP'),  # Using max as an approximation for mode
         stddev('TEMP').alias('STDDEV_TEMP')) \
    .orderBy('MONTH')

# Display the results
result_df.show(truncate=False)

In [None]:
import pandas as pd
from tabulate import tabulate

# Convert to Pandas DataFrame
hottest_df_pd = hottest_df.toPandas()
coldest_days_pd = coldest_days.toPandas()
max_precipitation_data_pd = max_precipitation_data.toPandas()
min_precipitation_data_pd = min_precipitation_data.toPandas()
percentage_missing_pd = percentage_missing.toPandas()
result_df_pd = result_df.toPandas()

# Define the file path within Google Colab
file_path = "/result.txt"

# Formatting the output file
with open(file_path, "w") as f:
    f.write("TASK 1:\n")
    f.write("The hottest day for each year.\n")
    f.write(tabulate(hottest_df_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 2:\n")
    f.write("The coldest day for the month of January across all years.\n")
    f.write(tabulate(coldest_days_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 3:\n")
    f.write("Maximum Precipitation for 2015.\n")
    f.write(tabulate(max_precipitation_data_pd, headers='keys', tablefmt='psql', showindex=False))
    f.write("\nMinimum Precipitation for 2015.\n")
    f.write(tabulate(min_precipitation_data_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 4:\n")
    f.write("Percentage of missing values for wind gust for the year 2019.\n")
    f.write(tabulate(percentage_missing_pd, headers='keys', tablefmt='psql', showindex=False))

    f.write("\n\nTASK 5:\n")
    f.write("The mean, median, mode and standard deviation of the temperature for each month for the year 2020.\n")
    f.write(tabulate(result_df_pd, headers='keys', tablefmt='psql', showindex=False))
