<a href="https://colab.research.google.com/github/werowe/HypatiaAcademy/blob/master/stats/consolidate_weather_data_run_sql_spark_queries.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import pandas as pd

# Define the directory containing the CSV files
directory = 'drive/MyDrive/weather'

# Initialize an empty list to store DataFrames
dataframes = []

# Loop through files in the directory
for filename in os.listdir(directory):
    # Check if the file starts with 'paphos2024' and ends with '.csv'
    if filename.startswith('paphos20') and filename.endswith('.csv'):
        # Read the CSV file into a DataFrame
        filepath = os.path.join(directory, filename)
        df = pd.read_csv(filepath)
        dataframes.append(df)

# Combine all DataFrames into one
df_combined = pd.concat(dataframes, ignore_index=True)

# Drop duplicate rows
df_combined = df_combined.drop_duplicates()

# Save the combined DataFrame to a new CSV file (optional)
df_combined.to_csv('combined_weather.csv', index=False)

# Print a summary of the combined DataFrame
print(f"Combined DataFrame shape: {df_combined.shape}")

Combined DataFrame shape: (18119, 24)


In [3]:
from pyspark.sql import SparkSession


# Initialize SparkSession
spark = SparkSession.builder \
    .appName("weather") \
    .getOrCreate()

df = spark.read.csv(
    "combined_weather.csv",
    header=True,        # Use the first row as column names
    inferSchema=True,   # Automatically infer data types
    sep=",",            # Specify delimiter (default is ',')
    encoding="UTF-8"    # Handle encoding
)



In [4]:
df.columns

['name',
 'datetime',
 'temp',
 'feelslike',
 'dew',
 'humidity',
 'precip',
 'precipprob',
 'preciptype',
 'snow',
 'snowdepth',
 'windgust',
 'windspeed',
 'winddir',
 'sealevelpressure',
 'cloudcover',
 'visibility',
 'solarradiation',
 'solarenergy',
 'uvindex',
 'severerisk',
 'conditions',
 'icon',
 'stations']

In [5]:
df.createOrReplaceTempView("weather")

In [13]:

sql = '''
SELECT round(SUM(precip)) AS total_precip, YEAR(datetime) AS year, MONTH(datetime) AS month
FROM weather
GROUP BY YEAR(datetime), MONTH(datetime)
ORDER BY year, month

'''

result = spark.sql(sql)
result.show()

+------------+----+-----+
|total_precip|year|month|
+------------+----+-----+
|        69.0|2024|    1|
|        35.0|2024|    2|
|         9.0|2024|    3|
|         0.0|2024|    7|
|         0.0|2024|    8|
|         2.0|2024|    9|
|         2.0|2024|   10|
|       166.0|2024|   11|
|       487.0|2024|   12|
|        64.0|2025|    1|
|       108.0|2025|    2|
|        40.0|2025|    3|
+------------+----+-----+



In [14]:
sql = '''
SELECT round(avg(temp),2) AS temp, YEAR(datetime) AS year, MONTH(datetime) AS month
FROM weather
GROUP BY YEAR(datetime), MONTH(datetime)
ORDER BY year, month

'''

result = spark.sql(sql)
result.show()

+-----+----+-----+
| temp|year|month|
+-----+----+-----+
|14.35|2024|    1|
|14.39|2024|    2|
|16.02|2024|    3|
|29.12|2024|    7|
|28.42|2024|    8|
|26.56|2024|    9|
|22.68|2024|   10|
|18.29|2024|   11|
|15.07|2024|   12|
|14.31|2025|    1|
|11.74|2025|    2|
|16.41|2025|    3|
+-----+----+-----+



In [8]:
sql = '''
SELECT avg(temp) AS temp, YEAR(datetime) AS year
FROM weather
GROUP BY YEAR(datetime)


'''

result = spark.sql(sql)
result.show()

+-----------------+----+
|             temp|year|
+-----------------+----+
|14.05378872120734|2025|
|21.03340663058174|2024|
+-----------------+----+



In [9]:
sql = '''
SELECT YEAR(datetime) AS year, MONTH(datetime) AS month, AVG(temp) AS avg_temp
FROM weather
WHERE YEAR(datetime) IN (2025, 2024) AND MONTH(datetime) = 1
GROUP BY YEAR(datetime), MONTH(datetime);

'''

result = spark.sql(sql)
result.show()

+----+-----+------------------+
|year|month|          avg_temp|
+----+-----+------------------+
|2024|    1|14.354924242424257|
|2025|    1|14.306578947368354|
+----+-----+------------------+



In [18]:
# skip sea as runs slow

import sys

seago=False

if seago==False:
  print("🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 not calculating waves this time🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊")
  sys.exit("\nNo sea calc today.")


🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊 not calculating waves this time🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊🌊


SystemExit: 
No sea calc today.

#Sea

In [None]:
import json


ga=[]

for filename in os.listdir(directory):
       if filename.endswith('waves.json'):
        # Read the CSV file into a DataFrame
        filepath = os.path.join(directory, filename)
        fo = open(filepath)
        fs=fo.read()
        jf=json.loads(fs)
        for s in jf['hours']:
          f=[s['time'], s['waterTemperature']['noaa'],s['waveHeight']['noaa'],s['swellHeight']['noaa'],s['windWaveHeight']['noaa']]
          g=pd.DataFrame(f).T
          g.columns=['time','temp','height', 'swell', 'wind']
          g['time'] = pd.to_datetime(g['time'])
          g.set_index("time", inplace=True)
          ga.append(g)
        fo.close()



# Combine all DataFrames into one
sdf = pd.concat(ga, ignore_index=False)

# Drop duplicate rows
sdfc = sdf.drop_duplicates()



In [None]:
f

In [None]:
s.keys()

In [None]:
sdfc = sdfc.reset_index()

spark_df = spark.createDataFrame(sdfc)

In [None]:
spark_df.createOrReplaceTempView("sea")

In [None]:
sql = '''
SELECT
    round(MAX(temp),2) AS max_temp,
    round(MIN(temp),2) AS min_temp,
    round(AVG(temp),2) AS ave_temp,
    YEAR(time) AS year,
    MONTH(time) AS month
FROM sea
WHERE HOUR(time) = 12
GROUP BY YEAR(time), MONTH(time)
ORDER BY max_temp desc


'''

result = spark.sql(sql)
result.show()

In [None]:
sql = '''
SELECT
    round(MAX(height),2) AS max_height,
    round(AVG(height),2) AS avg_height,
    YEAR(time) AS year,
    MONTH(time) AS month
FROM sea
WHERE HOUR(time) = 12
GROUP BY YEAR(time), MONTH(time)
ORDER BY avg_height desc


'''

result = spark.sql(sql)
result.show()

In [None]:
sql = '''
SELECT * from sea
order by YEAR(time) desc, MONTH(time) desc


'''

result = spark.sql(sql)
result.show()