In [0]:
!pip install geopy

In [0]:
def download_city_weather(station_id, station_name, city, start_year=1990, end_year=2025):
    """
    Downloads daily weather data for Edmonton from Environment Canada.
    Handles the station transition from City Centre to Blatchford.
    """
    
    # URL template for Environment Canada's bulk download
    base_url = "https://climate.weather.gc.ca/climate_data/bulk_data_e.html"
    
    # Storage for annual dataframes
    all_data = []

    print(f"Starting download for {station_name} weather data ({start_year}-{end_year})...")
    print("-" * 60)

    for year in range(start_year, end_year + 1):

        params = {
            'format': 'csv',
            'stationID': station_id,
            'Year': year,
            'Month': 1,      # Month/Day required by API but ignored for daily/bulk dl
            'Day': 1,
            'timeframe': 2,  # 2 = Daily data
            'submit': 'Download Data'
        }

        try:
            # Fetch the data
            response = requests.get(base_url, params=params)
            response.raise_for_status()
            
            # Read CSV from memory
            # specific string decode is safe for these files
            content = response.content.decode('utf-8')
            
            # Environment Canada CSVs often have metadata headers; pandas handles this well
            # if we simply read it. Sometimes dates parse better if specified.
            df = pd.read_csv(io.StringIO(content))
            
            # Basic cleanup: Keep only rows where 'Date/Time' is actual data
            if 'Date/Time' in df.columns:
                df['Date/Time'] = pd.to_datetime(df['Date/Time'])
                
                # Add a column to track which station this came from (optional but helpful)
                df['Source_Station_ID'] = station_id
                df['Source_Station_Name'] = station_name
                df['City'] = city
                all_data.append(df)
                print(f"✓ {year} downloaded ({station_name})")
            else:
                print(f"⚠ {year} downloaded but format seemed empty or incorrect.")

        except Exception as e:
            print(f"✗ Failed to download {year}: {e}")
        
        # Be polite to the server
        time.sleep(1)

    # Combine all years into one DataFrame
    if all_data:
        full_df = pd.concat(all_data, ignore_index=True)
        
        # Sort by date to ensure clean chronology
        full_df.sort_values('Date/Time', inplace=True)
        spark.sql("CREATE TABLE IF NOT EXISTS main.ext.weather")
        #print("Original DataFrame:")
        #display(full_df)
        #print(list(full_df.columns))
        full_df = full_df.rename(columns={'Longitude (x)':'Longitude', 'Latitude (y)':'Latitude', 'Station Name': 'StationName', 'Climate ID':'ClimateID', 'Date/Time':'DateTime', 'Data Quality':'DataQuality', 'Max Temp (°C)':'MaxTemp', 'Max Temp Flag':'MaxTempFlag', 'Min Temp (°C)':'MinTemp', 'Min Temp Flag':'MinTempFlag', 'Mean Temp (°C)':'MeanTemp', 'Mean Temp Flag':'MeanTempFlag', 'Heat Deg Days (°C)':'HeatDegDays', 'Heat Deg Days Flag':'HeatDegDaysFlag', 'Cool Deg Days (°C)':'CoolDegDays', 'Cool Deg Days Flag':'CoolDegDaysFlag', 'Total Rain (mm)':'TotalRain', 'Total Rain Flag':'TotalRainFlag', 'Total Snow (cm)':'TotalSnow', 'Total Snow Flag':'TotalSnowFlag', 'Total Precip (mm)':'TotalPrecip', 'Total Precip Flag':'TotalPrecipFlag', 'Snow on Grnd (cm)':'SnowonGrnd', 'Snow on Grnd Flag':'SnowonGrndFlag', 'Dir of Max Gust (10s deg)':'DirofMaxGust', 'Dir of Max Gust Flag':'DirofMaxGustFlag', 'Spd of Max Gust (km/h)':'SpdofMaxGust', 'Spd of Max Gust Flag':'SpdofMaxGustFlag'})
        full_df['MaxTempFlag'] = full_df['MaxTempFlag'].astype(str)
        full_df['MinTempFlag'] = full_df['MinTempFlag'].astype(str)
        full_df['MeanTempFlag'] = full_df['MeanTempFlag'].astype(str)
        full_df['DataQuality'] = full_df['DataQuality'].astype(float)
        full_df['TotalRainFlag'] = full_df['TotalRainFlag'].astype(str)
        full_df['HeatDegDaysFlag'] = full_df['HeatDegDaysFlag'].astype(str)
        full_df['ClimateID'] = pd.to_numeric(full_df['ClimateID'], errors='coerce').astype('Int64')
        full_df['DirofMaxGustFlag'] = full_df['DirofMaxGustFlag'].astype(float)
        full_df['CoolDegDaysFlag'] = full_df['CoolDegDaysFlag'].astype(str)
        full_df['TotalPrecipFlag'] = full_df['TotalPrecipFlag'].astype(str)
        full_df['TotalSnowFlag'] = full_df['TotalSnowFlag'].astype(float)
        spark_df = spark.createDataFrame(full_df)
        spark_df.write.format("delta") \
            .mode("append") \
            .option("mergeSchema", "true") \
            .saveAsTable("main.ext.weather")
        # return full_df
    else:
        # return pd.DataFrame()
        print("No more data to download")

In [0]:
import pandas as pd
import requests
import io
import time
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
from math import radians, cos, sin, asin, sqrt


START_YEAR = 1990
END_YEAR = 2025

# Official EC Inventory URL
INVENTORY_URL = "https://collaboration.cmc.ec.gc.ca/cmc/climate/Get_More_Data_Plus_de_donnees/Station%20Inventory%20EN.csv"

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance in kilometers between two points 
    on the earth (specified in decimal degrees).
    """
    # Convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # Haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers
    return c * r

def get_station_inventory():
    print("Downloading official Station Inventory...")
    try:
        response = requests.get(INVENTORY_URL, timeout=30)
        response.raise_for_status()
        # Skip top 3 metadata rows
        df = pd.read_csv(io.StringIO(response.content.decode('utf-8', errors='ignore')), skiprows=3)
        return df
    except Exception as e:
        print(f"Error downloading inventory: {e}")
        return pd.DataFrame()

def find_nearest_station(target_lat, target_lon, inventory_df):
    """
    Finds the station in the inventory closest to the target lat/lon
    that has data within the requested year range.
    """
    if pd.isna(target_lat) or pd.isna(target_lon):
        return None, "Invalid Coordinates", None

    # Filter inventory to stations with valid lat/lon and overlap with our years
    # Ensure columns are numeric
    inv = inventory_df.copy()
    inv['DLY First Year'] = pd.to_numeric(inv['DLY First Year'], errors='coerce').fillna(9999)
    inv['DLY Last Year'] = pd.to_numeric(inv['DLY Last Year'], errors='coerce').fillna(0)
    
    # Filter for time overlap (Station must have started before End Year and ended after Start Year)
    active_stations = inv[
        (inv['DLY First Year'] <= END_YEAR) & 
        (inv['DLY Last Year'] >= START_YEAR)
    ].copy()

    if active_stations.empty:
        return None, "No active stations in time range", None

    # Calculate distance to all active stations
    # Note: Column names in inventory are usually "Latitude (Decimal Degrees)" and "Longitude (Decimal Degrees)"
    lat_col = [c for c in inv.columns if "Latitude" in c and "Decimal" in c][0]
    lon_col = [c for c in inv.columns if "Longitude" in c and "Decimal" in c][0]

    def get_dist(row):
        return haversine(target_lon, target_lat, row[lon_col], row[lat_col])

    active_stations['distance_km'] = active_stations.apply(get_dist, axis=1)
    
    # Sort by distance
    nearest = active_stations.sort_values('distance_km').iloc[0]
    
    return nearest['Station ID'], nearest['Name'], nearest['distance_km']

# --- Main Execution ---
def save_weather_data():
    # 1. Load Data
    try:
        df = spark.sql("SQL STATEMENT TO PULL DATA THAT STORES A LIST OF CITIES YOU WOULD LIKE TO GET WEATHER INFORMATION FOR").toPandas()
        print(f"Loaded {len(df)} locations.")
    except Exception as e:
        print(f"Could not load input file: {e}")
        exit()

    # 2. Load Inventory
    inventory = get_station_inventory()
    if inventory.empty:
        print("CRITICAL: Inventory download failed.")
        exit()

    # 3. Initialize Geocoder
    geolocator = Nominatim(user_agent="weather_matcher_v1")

    results = []
    
    print("Starting Geocoding and Matching (this takes time to be polite to the server)...")
    
    for index, row in df.iterrows():
        location_query = row['County']
        print("City currently being processed")
        print(location_query)
        print(f"Processing {index + 1}/{len(df)}: {location_query}")
        
        try:
            # Step A: Geocode
            # We append "Canada" to ensure we don't get a "Foothills" in the USA
            query_str = f"{location_query}, Canada"
            location = geolocator.geocode(query_str, timeout=10)
            
            if location:
                lat, lon = location.latitude, location.longitude
                
                # Step B: Find Nearest Station
                st_id, st_name, dist = find_nearest_station(lat, lon, inventory)
                download_city_weather(st_id, st_name, location_query, start_year=1990, end_year=2025)
                results.append({
                    'Original_Query': location_query,
                    'Geocoded_Address': location.address,
                    'Lat': lat,
                    'Lon': lon,
                    'Nearest_Station_ID': st_id,
                    'Nearest_Station_Name': st_name,
                    'Distance_KM': round(dist, 2)
                })
                print(f"  -> Match: {st_name} ({round(dist, 2)} km away)")
            else:
                print("  -> Could not geocode location.")
                results.append({'Original_Query': location_query, 'Nearest_Station_ID': None, 'Status': 'Geocode Failed'})
                
        except Exception as e:
            print(f"  -> Error: {e}")
            results.append({'Original_Query': location_query, 'Nearest_Station_ID': None, 'Status': 'Error'})
        
        # Rate limit to respect Nominatim's usage policy (1 second delay)
        time.sleep(1)

    # 4. Save Results
    results_df = pd.DataFrame(results)
    display(results_df)

In [0]:
save_weather_data()