# SparkSession

In [None]:
import os
import findspark

findspark.init()

from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .master("spark://spark-master:7077") \
    .appName("DeltaLake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.warehouse.dir", "/app/data/output/spark-warehouse") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.mlflow.trackingUri", "http://mlflow:5000") \
    .getOrCreate()

In [None]:
spark

# Extract

In [None]:
# Extract weather data from .csv files
from pyspark.sql import functions as F

# Define data path
data_path = "/app/data/input/csv/weather_data"

weather_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"{data_path}/*.csv")

# Transform

In [None]:
# Check the number of records
print("Number of records: ", weather_df.count())

In [None]:
# Check the schema
weather_df.printSchema()

In [None]:
print("Number of unique dates: ", weather_df.select("datetime").distinct().count())

In [None]:
# Convert the 'datetime' column to date type
weather_df = weather_df.withColumn("datetime", F.to_date("datetime", "yyyy-MM-dd"))

# Check the schema again
weather_df.printSchema()

In [None]:
# Check if all datetime fileds parsed correctly
print("Number of unique dates: ", weather_df.select("datetime").distinct().count())

In [None]:
# Count rows after dropDuplicates
weather_df.dropDuplicates().count()

In [None]:
# Check for columns with nulls
weather_df.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in weather_df.columns)).show()

In [None]:
# Your columns list without nulls
columns_without_nulls = ['name', 'datetime', 'tempmax', 'tempmin', 'temp', 'feelslike', 'dew', 'humidity', 'precip', 'windgust', 'winddir', 'sealevelpressure', 'cloudcover', 'visibility', 'sunrise', 'sunset', 'moonphase', 'conditions', 'description', 'icon', 'stations']

# Select those columns from DataFrame
weather_df = weather_df.select(*columns_without_nulls)

In [None]:
# Basic DataFrame statistics
weather_df.describe().show()

In [None]:
weather_df.show(2)

In [None]:
# Check the date range of the dataset
weather_df.select(F.min("datetime"), F.max("datetime")).show()

In [None]:
# Filter data for years less than 2022
weather_df = weather_df.filter(F.year(weather_df.datetime) < 2022)

In [None]:
# Check the date range again
weather_df.select(F.min("datetime"), F.max("datetime")).show()

# Save to Delta

In [None]:
# Set path for Delta
delta_path = "/app/data/output/delta"

In [None]:
# Save to Delta
weather_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(f"{delta_path}/weather_data")

In [None]:
# Save to Delta as table
weather_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("weather_data")

# Basic Stats in Spark

In [None]:
# Read the Delta table
weather_data = spark.read.format("delta").load(f"{delta_path}/weather_data")

In [None]:
# Calculate the average temperature per country
average_temp = weather_data.groupBy("name").agg(
    F.avg("temp").alias("average_temp")
).orderBy("average_temp", ascending=False)

In [None]:
# Show the results
average_temp.show()

In [None]:
# Extract year from 'datetime'
weather_data = weather_data.withColumn('year', F.year('datetime'))

# Calculate the average annual temperature for each country
average_annual_temp = weather_data.groupBy('name', 'year') \
    .agg(F.avg('temp').alias('average_temp')).orderBy('name', 'year')

average_annual_temp.show(20)

In [None]:
# Group by 'name' and 'datetime', then calculate the sum of 'precip' for each group
daily_precip = weather_data.groupBy(
    'name', 
    F.date_format('datetime', 'yyyy-MM-dd').alias('date')
).agg(
    F.sum('precip').alias('daily_precip')
)

# Now group by 'name' only and find the max of 'daily_precip' for each country
max_daily_precip = daily_precip.groupBy('name').agg(
    F.max('daily_precip').alias('max_daily_precip')
).orderBy(
    "max_daily_precip", ascending=False
)

# Display the result
max_daily_precip.show()


In [None]:
# Filter out rows with country name 'Russia'
max_daily_precip = max_daily_precip.filter(max_daily_precip.name != 'Russia')

# Display the updated result
max_daily_precip.show()

In [None]:
# Find the top 10 days with the highest precipitation across all countries
top_precip_days = weather_data.select("name", "datetime", "precip") \
    .orderBy("precip", ascending=False).limit(10)

In [None]:
# Show the results
top_precip_days.show()

# Basic Stats with Spark SQL

In [None]:
# Register the DataFrame as a SQL temporary view
weather_data.createOrReplaceTempView("weather")

In [None]:
# Calculate the average temperature per country using SQL
average_temp_sql = spark.sql("""
SELECT 
    name, 
    AVG(temp) as average_temp
FROM 
    weather 
GROUP BY 
    name 
ORDER BY 
    average_temp DESC
""")

average_temp_sql.show()


In [None]:
# Find the top 10 days with the highest precipitation across all countries using SQL
top_precip_days_sql = spark.sql("""
SELECT 
    name, 
    datetime, 
    precip
FROM 
    weather 
ORDER BY 
    precip DESC 
LIMIT 10
""")

top_precip_days_sql.show()

In [None]:
# Calculate the sum of precipitation for each country and date
daily_precip_sql = spark.sql("""
SELECT 
    name, 
    DATE_FORMAT(datetime, 'yyyy-MM-dd') as date,
    SUM(precip) as daily_precip
FROM 
    weather 
GROUP BY 
    name, 
    date
""")

# Register the result DataFrame as a SQL temporary view
daily_precip_sql.createOrReplaceTempView("daily_precip")

In [None]:
# Find the day with the maximum precipitation for each country
max_daily_precip_sql = spark.sql("""
SELECT 
    name, 
    MAX(daily_precip) as max_daily_precip
FROM 
    daily_precip 
GROUP BY 
    name
ORDER BY 
    max_daily_precip DESC
""")

max_daily_precip_sql.show()

In [None]:
# Filter out Russia from the DataFrame
max_daily_precip_sql = max_daily_precip_sql.filter(max_daily_precip_sql.name != "Russia")

# Display the filtered result
max_daily_precip_sql.show()

In [None]:
# Create temporary view with the year column
spark.sql("""
CREATE OR REPLACE TEMPORARY VIEW weather_with_year AS
SELECT 
    name, 
    YEAR(datetime) as year,
    temp
FROM 
    weather_data
WHERE 
    YEAR(datetime) < 2022
""")

# Now perform the SQL query
average_annual_temp_sql = spark.sql("""
SELECT 
    name, 
    year,
    AVG(temp) as average_temp
FROM 
    weather_with_year
GROUP BY 
    name, 
    year
ORDER BY 
    name, 
    year
""")

average_annual_temp_sql.show()

# Viz

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Spark DataFrame Conversion to Pandas DataFrame
average_temp_pd = average_temp.toPandas()

In [None]:
# Set Chart Style
sns.set(style="whitegrid")

In [None]:
# Bar Chart
plt.figure(figsize=(12, 6))
ax = sns.barplot(
    x="name", 
    y="average_temp", 
    palette="coolwarm_r", 
    data=average_temp_pd
)

# Title & Axis Labels
plt.title("Average Temperature by Country")
plt.xlabel("Country")
plt.ylabel("Average Temp")
plt.xticks(rotation=45)

#Show Chart
plt.show()

In [None]:
# Spark DataFrame Conversion to Pandas DataFrame
top_precip_days_pd = top_precip_days.toPandas()

In [None]:
# Bar Chart
plt.figure(figsize=(12, 6))
ax = sns.barplot(x="datetime", y="precip", hue="name", data=top_precip_days_pd)

# Title & Axis Labels
plt.title("Top 10 days with the most precipitation for all countries")
plt.xlabel("Date")
plt.ylabel("Precipitation")
plt.xticks(rotation=45)

# Show Chart
plt.show()