In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, isnan, when, count, udf, year, month, to_date, mean
import pyspark.sql.functions as F
import seaborn as sns
import matplotlib.pyplot as plt

# Create my_spark
spark = SparkSession.builder.getOrCreate()
print(spark)

# Read File A
spark_weather_dfs_a = spark.read \
    .option("header", False) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .csv(path=f'data/weather/zurich_weather_a')

spark_weather_dfs_a.printSchema()
spark_weather_dfs_a.describe().show()

# Read File B
spark_weather_df_b = spark.read \
    .option("header", False) \
    .option("inferSchema", True) \
    .csv(path=f'data/weather/zurich_weather_b')

spark_weather_df_b.printSchema()
spark_weather_df_b.describe().show()

# Drop unused Columns
spark_weather_df_a = spark_weather_df_a.drop("_c0")
spark_weather_df_b = spark_weather_df_b.drop("_c0")

# Rename columns
def rename_multiple_columns(df, columns):
    if isinstance(columns, dict):
        return df.select(*[F.col(col_name).alias(columns.get(col_name, col_name)) for col_name in df.columns])
    else:
        raise ValueError("columns need to be in dict format {'existing_name_a':'new_name_a', 'existing_name_b':'new_name_b'}")

dict_columns = {"_c1": "date2",
                "_c2": "avg_temp",
                "_c3": "precip",
                "_c4": "snow",
                "_c5": "wind_dir",
                "_c6": "wind_speed",
                "_c7": "wind_power",
                "_c8": "air_pressure",
                "_c9": "sunny_hours",}

spark_weather_df_a_renamed = rename_multiple_columns(spark_weather_df_a, dict_columns)

# Join the dataframes
spark_weather_df = spark_weather_df_a_renamed.join(spark_weather_df_b_renamed, spark_weather_df_a_renamed.date2 == spark_weather_df_b_renamed.date, "inner")

# Define a function for a quick data overview
def quick_overview(dfs):
    # Display the spark dataframe
    display("FIRST RECORDS")
    display(dfs.limit(2).sort(col("date"), ascending=True).toPandas())

    # Count null values
    display("COUNT NULL VALUES")
    display(dfs.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c, y in df.dtypes if y in ["double", "float"]]
                      ).toPandas())

    # Check for duplicates
    duplicates = dfs.groupby(spark_weather_df.date) \
        .count() \
        .where('count > 1') \
        .limit(5).toPandas()
    display(duplicates)

    # Print schema
    display("PRINT SCHEMA")
    display(dfs.printSchema())

quick_overview(spark_weather_df)

# Filter Null values
spark_weather_df = spark_weather_df.filter(spark_weather_df.avg_temp.isNotNull())

# Replace NA values with mean values
avg = spark_weather_df.filter(spark_weather_df.avg_temp.isNotNull()) \
    .select(mean(col('min_temp')).alias('mean_min'),
            mean(col('max_temp')).alias('mean_max'),
            mean(col('wind_speed')).alias('mean_wind')).collect()

mean_min = avg[0]['mean_min']
mean_max = avg[0]['mean_max']
mean_wind = avg[0]['mean_wind']

spark_weather_df = spark_weather_df \
    .na.fill(value=mean_min, subset=["min_temp"]) \
    .na.fill(value=mean_max, subset=["max_temp"]) \
    .na.fill(value=mean_wind, subset=["wind_speed"]) \
    .na.fill(value=0, subset=["snow"])

# Check for duplicates
spark_weather_df.groupby(spark_weather_df.date) \
    .count() \
    .where('count > 1') \
    .limit(5).toPandas()

# Remove duplicates and drop column date2, convert date to datatype "date", Sort the Data by Date
spark_cleaned_df = spark_weather_df.dropDuplicates() \
    .drop(col("date2")) \
    .withColumn("date", to_date(col("date"), "yyyy-MM-dd")) \
    .orderBy(col("date")) \
    .select(col("date"), col("avg_temp"), col("min_temp"), col("max_temp"), col("wind_speed"), col("snow"),
            col("precip"))  # Select columns

quick_overview(spark_cleaned_df)

# Create bucket column for temperature
def binner(min_temp, max_temp):
    if (min_temp is None) or (max_temp is None):
        return "unknown"
    else:
        if min_temp < -10:
            return "freezing cold"
        elif min_temp < -5:
            return "very cold"
        elif min_temp < 0:
            return "cold"
        elif max_temp < 10:
            return "normal"
        elif max_temp < 20:
            return "warm"
        elif max_temp < 30:
            return "hot"
        elif max_temp >= 30:
            return "very hot"
        return "normal"

udf_binner_temp = udf(binner, StringType())

spark_cleaned_df = spark_cleaned_df.withColumn("temp_buckets", udf_binner_temp(col("min_temp"), col("max_temp")))

# Create new columns for bucket precipitation and for month and year
udf_binner_precip = udf(lambda x: "very rainy" if x > 50 else ("rainy" if x > 0 else "dry"), StringType())

spark_cleaned_df = spark_cleaned_df \
    .withColumn("precip_buckets", udf_binner_precip("precip")) \
    .withColumn("month", month(spark_cleaned_df.date)) \
    .withColumn("year", year(spark_cleaned_df.date))

# Set the dimensions for the scatterplot
fig, ax = plt.subplots(figsize=(28, 8))
sns.scatterplot(hue="temp_buckets", y="avg_temp", x="date", data=spark_cleaned_df.toPandas(), palette="Spectral_r")

# Plot formatting
ax.tick_params(axis="x", rotation=30, labelsize=10, length=0)

# Title formatting
mindate = str(spark_cleaned_df.agg({'date': 'min'}).collect()[0]['min(date)'])
maxdate = str(spark_cleaned_df.agg({'date': 'max'}).collect()[0]['max(date)'])
ax.set_title("average temperature in Zurich: " + mindate + " - " + maxdate)
plt.xlabel("Year")
plt.show()

# Data Analysis using PySpark.SQL

# Register the dataset as a temp view, so we can use SQL
spark_cleaned_df.createOrReplaceTempView("weather_data_temp_view")

# Perform SQL-like analysis
events_over_years_df = spark.sql( \
    'SELECT year, month, mean(avg_temp), mean(max_temp), mean(min_temp) \
    FROM weather_data_temp_view \
    WHERE max_temp > 25 \
    GROUP BY month, year \
    ORDER BY year, month')

print(events_over_years_df.limit(5).toPandas())

plt.figure(figsize=(16, 6))
fig = sns.lineplot(y="mean(avg_temp)", x="year", data=events_over_years_df.toPandas(), color="orange")
fig = sns.lineplot(y="mean(max_temp)", x="year", data=events_over_years_df.toPandas(), color="red")
fig = sns.lineplot(y="mean(min_temp)", x="year", data=events_over_years_df.toPandas(), color="blue")
plt.grid()
plt.show()

# Scatter Plots
fig, axes = plt.subplots(nrows=1, ncols=3, sharey=True, figsize=(30, 10))
fig.subplots_adjust(hspace=0.5, wspace=0.2)

palette = sns.color_palette("ch:start=.2,rot=-.3", as_cmap=True)
sns.scatterplot(ax=axes[0], hue="snow", size="snow", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(),
                alpha=1.0, palette=palette)
axes[0].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[0].set_title("min - max temperature separated by snow")

sns.scatterplot(ax=axes[1], hue="wind_speed", size="wind_speed", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(),
                alpha=1.0, palette='rocket_r')
axes[1].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[1].set_title("min - max temperature separated by wind speed")

sns.scatterplot(ax=axes[2], hue="month", y="max_temp", x="min_temp", data=spark_cleaned_df.toPandas(),
                alpha=1.0, palette='Spectral_r', hue_norm=(1, 12), legend="full")
axes[2].legend(bbox_to_anchor=(1.01, 1), loc=2, borderaxespad=0.)
axes[2].set_title("min - max temperature separated by month")


FileNotFoundError: [WinError 2] The system cannot find the file specified