# Cleaning, filtering, helper functions

In [16]:
from pyspark.sql.functions import radians, sin, cos, sqrt, atan2, col, lit
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import sum as sql_sum
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark.sql.functions import date_format



# Create SparkSession
spark: SparkSession = SparkSession.builder.appName("ChallengerTemperatureAnalysis").getOrCreate()

# Define haversine distance UDF
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Earth radius in kilometers
    dlon = radians(lon2) - radians(lon1)
    dlat = radians(lat2) - radians(lat1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

# Define inverse distance weighting UDF
def idw(distances, temperatures):
    weights = 1 / distances
    weighted_temps = temperatures * weights
    return sql_sum(weighted_temps) / sql_sum(weights)


# Read station and temperature data
stations = spark.read.csv("../data/stations.csv", header=True, inferSchema=True)
temperatures = spark.read.csv("../data/1986.csv", header=False, inferSchema=True) \
                     .toDF("station_id", "wban_id", "month", "day", "temperature")

# Filter and clean up the data
stations = stations.filter((stations["latitude"].isNotNull()) & (stations["longitude"].isNotNull()))
temperatures = temperatures.filter((temperatures["temperature"].isNotNull()) & (temperatures["month"] == 1) & (temperatures["day"] == 28))

# Join stations and temperatures data
joined = temperatures.join(stations, ["station_id", "wban_id"])

# Compute distances from each station to Cape Canaveral (28.3922° N, 80.6077° W)
lat_cc = lit(28.3922)
lon_cc = lit(-80.6077)
joined = joined.withColumn("distance", haversine(lat_cc, lon_cc, joined["latitude"], joined["longitude"]).cast(DoubleType()))

# Filter for stations within 100 km of Cape Canaveral
joined = joined.filter(joined["distance"] <= 100)

# Compute IDW temperature at Cape Canaveral on January 28, 1986
idw_temp = joined.groupby().agg(idw(col("distance"), col("temperature")).alias("IDW_Temperature")).collect()[0]["IDW_Temperature"]

# Print result
print("The estimated temperature at Cape Canaveral on January 28, 1986, using inverse distance weighting, is {:.2f} degrees F.".format(idw_temp))


23/04/13 23:49:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
The estimated temperature at Cape Canaveral on January 28, 1986, using inverse distance weighting, is 34.46 degrees F.


# Question #1: stations within 100 km

In [28]:
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

spark: SparkSession = SparkSession.builder.appName("WeatherStationDistance").getOrCreate()

# Cape Canaveral GPS coordinates
cape_canaveral_lat: float = 28.3922
cape_canaveral_lon: float = -80.6077


# UDF function for distance checking
@udf(returnType=BooleanType())
def is_within_100km(lat, lon) -> bool:
    try:
        return haversine(cape_canaveral_lat, cape_canaveral_lon, lat, lon) <= 100
    except:
        return False

# add new column to the stations DF for indicating if a weather station is < 100 km from Cape Canaveral
stations = stations.withColumn("is_within_100km", is_within_100km(col("latitude"), col("longitude")))

# filter to show only close stations
weather_stations_within_100km_df = stations.filter(col("is_within_100km"))

# display top 20 results
weather_stations_within_100km_df.show(20)

+----------+-------+--------+---------+---------------+
|station_id|wban_id|latitude|longitude|is_within_100km|
+----------+-------+--------+---------+---------------+
|    720904|    299|  29.067|  -81.283|           true|
|    720904|   null|  29.067|  -81.284|           true|
|    722011|  92813|   28.29|  -81.437|           true|
|    722011|   null|   28.29|  -81.437|           true|
|    722040|  12838|  28.101|  -80.644|           true|
|    722040|   null|    28.1|   -80.65|           true|
|    722045|  12843|  27.653|  -80.243|           true|
|    722045|   null|   27.65|  -80.417|           true|
|    722046|  12898|  28.517|    -80.8|           true|
|    722046|   null|  28.517|    -80.8|           true|
|    722050|  12815|  28.434|  -81.325|           true|
|    722051|  12841|  28.545|  -81.333|           true|
|    722053|  12841|  28.545|  -81.333|           true|
|    722053|   null|   28.55|  -81.333|           true|
|    722056|  12834|  29.183|  -81.048|         

In [2]:
# Aggregate temperature data by date

daily_temps = joined.filter(joined["temperature"].isNotNull()) \
                   .groupBy(date_format("date", "yyyy-MM-dd").alias("date")) \
                   .agg(sql_sum(col("temperature")).alias("total_temperature"), sql_sum(col("distance")).alias("total_distance")) \
                   .withColumn("average_temperature", col("total_temperature") / col("total_distance")) \
                   .orderBy("date")

# Extract the temperatures and days from the daily_temps DataFrame
temps = daily_temps.select("average_temperature").collect()
days = daily_temps.select(date_format("day", "d").alias("day")).collect()

# Plot the temperatures for each day in January 1986
plt.plot(days, temps)
plt.title("Temperature in January 1986 at Cape Canaveral")
plt.xlabel("Day of the Month")
plt.ylabel("Temperature (F)")
plt.show()




AnalysisException: Column 'date' does not exist. Did you mean one of the following? [day, month, distance, latitude, wban_id, longitude, station_id, temperature];
'Aggregate [date_format('date, yyyy-MM-dd, Some(America/New_York))], [date_format('date, yyyy-MM-dd, Some(America/New_York)) AS date#111, sum(cast(temperature#56 as double)) AS total_temperature#121, sum(distance#71) AS total_distance#123]
+- Filter isnotnull(temperature#56)
   +- Filter (distance#71 <= cast(100 as double))
      +- Project [station_id#52, wban_id#53, month#54, day#55, temperature#56, latitude#19, longitude#20, cast(((ATAN2(SQRT((POWER(SIN(((RADIANS(latitude#19) - RADIANS(28.3922)) / cast(2 as double))), cast(2 as double)) + ((COS(RADIANS(28.3922)) * COS(RADIANS(latitude#19))) * POWER(SIN(((RADIANS(longitude#20) - RADIANS(-80.6077)) / cast(2 as double))), cast(2 as double))))), SQRT((cast(1 as double) - (POWER(SIN(((RADIANS(latitude#19) - RADIANS(28.3922)) / cast(2 as double))), cast(2 as double)) + ((COS(RADIANS(28.3922)) * COS(RADIANS(latitude#19))) * POWER(SIN(((RADIANS(longitude#20) - RADIANS(-80.6077)) / cast(2 as double))), cast(2 as double))))))) * cast(2 as double)) * 6371.0) as double) AS distance#71]
         +- Project [station_id#52, wban_id#53, month#54, day#55, temperature#56, latitude#19, longitude#20]
            +- Join Inner, ((cast(station_id#52 as int) = station_id#17) AND (cast(wban_id#53 as int) = wban_id#18))
               :- Filter ((isnotnull(temperature#56) AND (cast(month#54 as int) = 1)) AND (cast(day#55 as int) = 28))
               :  +- Project [_c0#42 AS station_id#52, _c1#43 AS wban_id#53, _c2#44 AS month#54, _c3#45 AS day#55, _c4#46 AS temperature#56]
               :     +- Relation [_c0#42,_c1#43,_c2#44,_c3#45,_c4#46] csv
               +- Filter (isnotnull(latitude#19) AND isnotnull(longitude#20))
                  +- Relation [station_id#17,wban_id#18,latitude#19,longitude#20] csv


In [22]:
print(daily_temps.count())




1


                                                                                