In [1]:
pip install requests python-dotenv

Note: you may need to restart the kernel to use updated packages.




In [7]:
import requests
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
import time
import json
import pandas as pd
import psycopg2

# EXTRACT



In [12]:
%%writefile encodings_setup.py


import os
import json
import pandas as pd
import requests
from dotenv import load_dotenv
from scripts.logger import logger
from scripts.config import ENV_PATH, CITIES_CONFIG_PATH  # Import paths


# loading environment variables (API key)
def load_env_api():
    
    try:
        # load envrionment variables from the .env file
        load_dotenv(ENV_PATH)

        # get the API key from the .env file
        api_key = os.getenv("API_KEY")
        if not api_key:
            raise ValueError("API_KEY is missing in the environment variables")
        logger.info("Successfully loaded API_KEY")
        return api_key
    except Exception as e:
        logger.error(f"Error loading API_KEY: {e}")
        raise

# load list of cities to query the API about
def load_env_cities():
    
    try:
        # load envrionment variables from the .env file
        load_dotenv(ENV_PATH)

        # get the list of cities from the .env file
        cities_str = os.getenv("CITIES")
        if not cities_str:
            raise ValueError("Cities variable is missing in the env file")
        cities = cities_str.split(";")
        logger.info(f"Successfully loaded {len(cities)} cities.")
        return cities
    except Exception as e:
        logger.error(f"Error loading cities: {e}")
        raise

# getting latitude and longitude encodings for cities (list of cities in config file)
## function for getting lat & long encoding of cities
def encoding(api_key, cities):
    
    # Geocoding API endpoint
    geocoding_url = "http://api.openweathermap.org/data/2.5/weather"

    # create dataframe to store encodings
    encodings = pd.DataFrame(columns=['name', 'latitude', 'longitude'])
    
    # Loop through the cities and get their lat, lon
    for city in cities:
        try:
            # Send GET request to the OpenWeatherMap Geocoding API
            response = requests.get(geocoding_url, params={
                'q': city,
                'appid': api_key
            })

            # Check if the request was successful
            if response.status_code == 200:
                data = response.json()
                lat = data['coord']['lat']
                lon = data['coord']['lon']
                logger.info(f"Retrieved coordinate for {city}: ({lat}, {lon})")
                print(f"City: {city} - Latitude: {lat}, Longitude: {lon}")
                new_row = pd.DataFrame({"name": [city], "latitude": [lat], "longitude": [lon]})
                encodings = pd.concat([encodings, new_row], ignore_index=True)
            else:
                print(f"Failed to get data for {city}")
                logger.warning(f"Failed to fetch for data {city}. Status code:{response.status_code}")
        except requets.exceptions.RequestException as e:
            logger.error(f"Request error for {city}: {e}")
            
    return encodings;

## write the city encodings to the config file
def encodings_to_config(encodings):
    try:
        # Convert the DataFrame to the desired dictionary format
        config_data = {
            "cities": encodings.to_dict(orient="records")  # Convert rows to list of dictionaries
        }

        # Write the dictionary to a JSON file
        with open(CITIES_CONFIG_PATH, 'w') as json_file:
            json.dump(config_data, json_file, indent=4)
        logger.info("Successfully wrote city encodings to cities_config.json")
        print("Data has been written to cities_config.json")
    except:
        logger.error(f"Error writing to config file: {e}")
        raise

try:
    # executing the defined functions above 
    api_key = load_env_api()
    cities = load_env_cities()
    encodings = encoding(api_key, cities)
    encodings_to_config(encodings)
except Exception as e:
    logger.critical(f"Pipeline execution failed at encodings: {e}")

Writing encodings_setup.py


# API Weather Extract

In [15]:
%%writefile extract.py

import os
import json
import requests
from dotenv import load_dotenv
from scripts.logger import logger
from scripts.config import ENV_PATH, CITIES_CONFIG_PATH, RAW_DATA_PATH, RAW_COMPILED_PATH  # Import paths


# loading environment variables (API key)
def load_env_api():
    try:
        # load envrionment variables from the .env file
        load_dotenv(ENV_PATH)
        # get the API key from the .env file
        api_key = os.getenv("API_KEY")
        if not api_key:
            raise ValueError("API_KEY is missing in the env variables")
        logger.info("Successfully loaded API key")
        return api_key
    except Exception as e:
        logger.error(f"Error loading API key: {e}")
        raise

# function to execute the API call
# api call to get the current weather 
def get_weather(api_key, city, lat, lon, exclude='minutely,daily,hourly', units='imperial', lang='en'):
    # Build the base URL for the OneCall API
    api_url = f"https://api.openweathermap.org/data/3.0/onecall"
    
    # Prepare the parameters for the API call
    params = {
        'lat': lat,
        'lon': lon,
        'appid': api_key,
        'units': units,  # 'imperial' for Fahrenheit, 'metric' for Celsius
        'lang': lang      # Language for the response
    }
    
    # Add the 'exclude' parameter if it's provided
    if exclude:
        params['exclude'] = exclude
        
    try:
        # Make the API request
        response = requests.get(api_url, params=params)
        response.raise_for_status()
        # Check if the request was successful
        data = response.json()
        # Print or process the data
        data['City']=city
        logger.info(f"Successfully fetched weather data for {city}")
        print(data)
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error: {response.status_code}, {response.text}")
        logger.error(f"Error fetching data from {api_url}: {e}")
    except json.JSONDecodeError as e:
        logger.error(f"Failed to decode JSON response for {city}: {e}")
    return None
    

# function to run the API call based upon cities in config file
def city_weather_data_extraction():
    try:
        # Step 1: Load the config file
        with open(CITIES_CONFIG_PATH, 'r') as f:
            config_data = json.load(f)
    except (IOError, json.JSONDecodeError) as e:
        logger.error(f"Error loading cities config file: {e}")
        raise

    weather_data = []

    # Step 2: Loop through each city and use the data for API requests
    for city in config_data['cities']:
        api_key = load_env_api()
        latitude = city['latitude']
        longitude = city['longitude']
        city_name = city['name']
        
        if not all([city_name,latitude,longitude]):
            logger.warning(f"skipping city due to missing data: {city}")
            continue

        weather_data.append(get_weather(api_key, city_name, latitude, longitude))
        
    return weather_data

# writing the extracted data to the raw_weather_data.json file
def write_raw_data(weather_data):

    try:
        # Check if the file exists to decide whether to append or create new
        if os.path.exists(RAW_DATA_PATH):
            # If the file exists, load the existing data, then append new data
            with open(RAW_DATA_PATH, 'r') as json_file:
                existing_data = json.load(json_file)
                existing_data.extend(weather_data)

            # Append to the file
            with open(RAW_DATA_PATH, 'w') as json_file:
                json.dump(existing_data, json_file, indent=4)
        else:
            # If the file doesn't exist, create it and write the new data
            with open(RAW_DATA_PATH, 'w') as json_file:
                json.dump(weather_data, json_file, indent=4)

        print(f"Data saved to {RAW_DATA_PATH}")
        logger.info(f"Weather data successfully saved to {RAW_DATA_PATH}")
    except (IOError, json.JSONDecodeError, ValueError) as e:
        logger.error(f"Error writing to {RAW_FILE_PATH}: {e}")
        raise

    
# writing the extracted data to the raw_compiled_data.json file
def write_compiled_raw_data(weather_data):
    
    try:
        # Check if the file exists to decide whether to append or create new
        if os.path.exists(RAW_COMPILED_PATH):
            # If the file exists, load the existing data, then append new data
            with open(RAW_COMPILED_PATH, 'r') as json_file:
                existing_data = json.load(json_file)
                existing_data.extend(weather_data)

            # Append to the file
            with open(RAW_COMPILED_PATH, 'w') as json_file:
                json.dump(existing_data, json_file, indent=4)
        else:
            # If the file doesn't exist, create it and write the new data
            with open(RAW_COMPILED_PATH, 'w') as json_file:
                json.dump(weather_data, json_file, indent=4)

        print(f"Data saved to {RAW_COMPILED_PATH}")
        logger.info(f"Weather data successfully saved to {RAW_COMPILED_PATH}")
    except (IOError, json.JSONDecodeError, ValueError) as e:
        logger.error(f"Error writing to {RAW_COMPILED_PATH}: {e}")
        raise


try:
    # executing the api calls & writing to the file functions
    weather_data = city_weather_data_extraction()
    write_raw_data(weather_data)
    write_compiled_raw_data(weather_data)
except Exception as e:
    logger.critical(f"Pipeline extraction failed at extract.py: {e}")

Writing extract.py


# Transform:
clean the weather data 
write it to a clean_data and db_ready csv files

In [19]:
%%writefile transform.py

import os
import json
import pandas as pd
from datetime import datetime, timedelta
from scripts.logger import logger
from scripts.config import RAW_DATA_PATH, CLEAN_DATA_PATH  # Import paths


# reading the raw data from the raw_weather_data.json file
def read_raw_data():
    
    try:
        with open(RAW_DATA_PATH, 'r') as json_file:
            data = json.load(json_file)
        logger.info(f"Successfully loaded raw weather data from {RAW_DATA_PATH}")
        return data
    except FileNotFoundError:
        print(f"Error: the file '{RAW_DATA_PATH}' does not exist")
        logger.error(f"Error: the file {RAW_DATA_PATH} does not exist")
        return None
    except json.jSONDecodeError as e:
        print(f"Error: failed to decode JSON frim the file '{RAW_DATA_PATH}'")
        logger.error(f"Error: failed to decode JSON from the file {RAW_DATA_PATH}: {e}")
        return None
    except Exception as e:
        logger.error(f"Unexpected error while reading raw data: {e}")
        return None

# Function to convert Unix timestamp to local time
def convert_to_local_time(timestamp, offset):
    try:
        utc_time = datetime.utcfromtimestamp(timestamp)
        return utc_time + timedelta(seconds=offset)
    except Exception as e:
        logger.error(f"Error converting timestamp {timestamp} with offset {offset}: {e}")
        return None

# processing & cleaning the weather data & storing in a dataframe
def transform_data(weather_data):
    if not weather_data:
        logger.error("No data provided for transformation")
        return None
    
    # Initialize an empty list to store records
    data = []

    try:
        # Process each record
        for record in weather_data:
            latitude, longitude = record["lat"], record["lon"]
            timezone = record["timezone"]
            timezone_offset = record["timezone_offset"]
            city = record['City']
            
            if latitude is None or longitude is None:
                logger.warning("skipping record due to missing latitude/longitude")
                continue

            # Convert timestamps
            current_time = convert_to_local_time(record["current"]["dt"], timezone_offset)
            sunrise = convert_to_local_time(record["current"]["sunrise"], timezone_offset)
            sunset = convert_to_local_time(record["current"]["sunset"], timezone_offset)

            # Extract weather details
            temp = record["current"]["temp"]
            feels_like = record["current"]["feels_like"]
            pressure = record["current"]["pressure"]
            humidity = record["current"]["humidity"]
            dew_point = record["current"]["dew_point"]
            uvi = record["current"]["uvi"]
            clouds = record["current"]["clouds"]
            visibility = record["current"]["visibility"]
            wind_speed = record["current"]["wind_speed"]
            wind_deg = record["current"]["wind_deg"]
            wind_gust = record["current"].get("wind_gust", 0)
            weather = record["current"]["weather"][0]
            weather_id = weather["id"]
            weather_main = weather["main"]
            weather_description = weather["description"]

            # Handle alerts (if any)
            alerts = record.get("alerts", [])
            alert_messages = "; ".join([alert["event"] + ": " + alert["description"] for alert in alerts])

            # Add the record to the data list
            data.append({
                "latitude": latitude,
                "longitude": longitude,
                "timezone": timezone,
                "timezone_offset": timezone_offset,
                "city": city,
                "current_time": current_time,
                "sunrise": sunrise,
                "sunset": sunset,
                "temp_F": temp,
                "feels_like_F": feels_like,
                "humidity": humidity,
                "dew_point": dew_point,
                "uvi": uvi,
                "clouds": clouds,
                "visibility": visibility,
                "wind_speed_mph": wind_speed,
                "wind_deg": wind_deg,
                "wind_gust_mph": wind_gust,
                "weather_id": weather_id,
                "weather_main": weather_main,
                "weather_description": weather_description,
                "alerts": alert_messages
            })

        # Create a DataFrame from the data list
        df = pd.DataFrame(data)
        logger.info("Successfully transformed weather data into DataFrame")
        return df
    except KeyError as e:
        logger.error(f"Missing expected key in weather data: {e}")
        return None
    except Exception as e:
        logger.erorr(f"Unexpected error during transformation: {e}")
        return None


# writing the cleaned data to the compiled clean_weather_data.csv file
def write_to_cleaned_data(df):
    if df is None or df.empty:
        logger.error("No data available to write to CSV")
        return

    try:
        # Check if the file exists to decide whether to append or create new
        if os.path.exists(CLEAN_DATA_PATH):
            # If the file exists, load the existing data, then append new data
            existing_data = pd.read_csv(CLEAN_DATA_PATH)
            updated_data = pd.concat([existing_data, df], ignore_index=True)

            # Append to the file
            updated_data.to_csv(CLEAN_DATA_PATH, index=False)
        else:
            # If the file doesn't exist, create it and write the new data
            df.to_csv(CLEAN_DATA_PATH, index=False)

        print(f"Data saved to {CLEAN_DATA_PATH}")
        logger.info(f"Data successfully saved to {CLEAN_DATA_PATH}")
    except Exception as e:
        logger.error(f"Error writing data to CSV: {e}")


try:
    
    # execution of the transformation functions
    data = read_raw_data()
    df = transform_data(data)
    write_to_cleaned_data(df)
except Exception as e:
    logger.critical(f"Pipeline execution failed at transfrom.py script: {e}")

Writing transform.py


# Load:
write the cleaned data to a postgresql database

In [21]:
%%writefile load.py

import os
import pandas as pd
import psycopg2
from dotenv import load_dotenv
from scripts.logger import logger
from scripts.config import ENV_PATH, RAW_DATA_PATH, CLEAN_DATA_PATH  # Import paths


# function to read the clean_data file and return a dataframe
def read_clean_data():
    try:
        df_db = pd.read_csv(CLEAN_DATA_PATH)
        logger.info(f"Successfully loaded clean data from {CLEAN_DATA_PATH}")
        return df_db
    except FileNotFoundError:
        logger.error(f"Error: the file {CLEAN_DATA_PATH} does not exist")
    except pd.errors.EmptyDataError:
        logger.error(f"Error: the file {CLEAN_DATA_PATH} is empty")
    except Exception as e:
        logger.error(f"Unexpected error while reading clean data: {e}")
    return None

# function to load env variables & establish database connection
def env_db_connection():
    ## load environment variables and establish database connection

    load_dotenv(ENV_PATH)


    try:
           ## get local database credentials
        conn = psycopg2.connect(
            dbname=os.getenv("DB_NAME"),
            user=os.getenv("DB_USER"),
            password=os.getenv("DB_PASSWORD"),
            host=os.getenv("DB_HOST"),
            port=os.getenv("DB_PORT")
        )
        
        logger.info("Connected to PostgreSQL database")
        print("connected to postgresql on local host")
        return conn
    except psycopg2.Error as e:
        print("connection error: {e}")
        logger.error(f"Database connection error: {e}")
    return None
        

# function to insert weather data into the location database table
# Insert or get Location_ID
def get_or_insert_location(cursor, lat, lon, city, timezone, tz_offset):
    try:
        cursor.execute(
            "SELECT Location_ID FROM Locations WHERE Lat=%s AND Long=%s;",
            (lat, lon)
        )
        location = cursor.fetchone()
        if location:
            print("location is already stored in the database")
            return location[0]
        else:
            cursor.execute(
                "INSERT INTO Locations (Lat, Long, City, Timezone, Timezone_offset) VALUES (%s, %s, %s, %s, %s) RETURNING Location_ID;",
                (lat, lon, city, timezone, tz_offset)
            )
            return cursor.fetchone()[0]
    except Error as e:
        logger.error(f"Error inserting location: {e}")
    return None

# function to insert weather data into the weather database table
# Insert or get Weather_ID
def get_or_insert_weather(cursor, weather_id, main, description):
    try:
        cursor.execute(
            "SELECT Weather_ID FROM Weather where Weather_ID=%s;",
            (weather_id,)
        )
        weather = cursor.fetchone()
        if weather:
            print("weather is already stored in the database")
            return weather[0]
        else:
            cursor.execute(
                "INSERT INTO Weather (Weather_ID, Main, Description) VALUES (%s, %s, %s) RETURNING Weather_ID;",
                (weather_id, main, description)
            )
            return cursor.fetchone()[0]
    except Error as e:
        logger.error(f"Error inserting weather data: {e}")
    return None

# function to insert weather data into the record database table
# Insert Record
def insert_record(cursor, location_id, weather_id, row):
    try:
        if not isinstance(location_id, int) or not isinstance(weather_id, int):
            logger.error("Invalid location_id or weather_id: skipping weather insertion")
            return None
        
        cursor.execute(
        """
        INSERT INTO Records (Location_ID, Weather_ID, Local_time, Sunrise, Sunset, Temp_F, Feels_like_F, 
                            Humidity, Dew_Point, UVI, Clouds, Visibility, Wind_speed_mph, Wind_deg, Wind_gust_mph)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING Record_ID;
        """,
        (location_id, weather_id, row['current_time'], row['sunrise'], row['sunset'], row['temp_F'],
            row['feels_like_F'], row['humidity'], row['dew_point'], row['uvi'], row['clouds'], row['visibility'], 
            row['wind_speed_mph'], row['wind_deg'], row['wind_gust_mph'])
        )
        print("record has been inserted into the table")
        logger.info("Record has been inserted into the table")
        return cursor.fetchone()[0]
    except Error as e:
        logger.error(f"Error inserting record: {e}")
    return None

# function to insert weather data into the alert database table
# Insert Alert
def insert_alert(cursor, record_id, alert_description):
    try:
        cursor.execute(
            "INSERT INTO Alerts (Record_ID, Description) VALUES (%s, %s);",
            (record_id, alert_description)
        )
        print("alert has been inserted into the table")
        logger.info("alert has been inserted into the table")
    except Error as e:
        logger.erorr(f"Error inserting alert: {e}")

# function to delete the clean_data csv file once the data has been uploaded to avoid duplication of data loading/records
def delete_clean_data():
    try:
        if os.path.exists(CLEAN_DATA_PATH):
            os.remove(CLEAN_DATA_PATH)
            print("File deleted successfully")
            logger.info(f"File {CLEAN_DATA_PATH} deleted successfully")
        else:
            print("File does not exist")
            logger.warning(f"File {CLEAN_DATA_PATH} does not exist")
    except Exception as e:
        logger.error(f"Error deleting file {CLEAN_DATA_PATH}: {e}")
        
# function to delete the raw_data csv file once the data has been uploaded to avoid duplication of data loading/records
def delete_raw_data():
    try:
        if os.path.exists(RAW_DATA_PATH):
            os.remove(RAW_DATA_PATH)
            print("File deleted successfully")
            logger.info(f"File {RAW_DATA_PATH} deleted successfully")
        else:
            print("File does not exist")
            logger.info(f"File {RAW_DATA_PATH} does not exisit")
    except Exception as e:
        logger.error(f"Error deleting file {RAW_DATA_PATH}: {e}")


# execution of functions above and the load process

df = read_clean_data()
if df is not None:
    conn = env_db_connection()
    if conn:
        try:
            with conn.cursor() as cursor:
                for _, row in df.iterrows():
                    location_id = get_or_insert_location(cursor, row['latitude'], row['longitude'], row['city'], row['timezone'], row['timezone_offset'])
                    weather_id = get_or_insert_weather(cursor, row['weather_id'], row['weather_main'], row['weather_description'])
                    record_id = insert_record(cursor, location_id, weather_id, row)

                    if pd.notna(row['alerts']) and row['alerts'].strip():
                        insert_alert(cursor, record_id, row['alerts'])
                conn.commit()
                logger.info("Data successfully inserted into database")
        except Exception as e:
            logger.error(f"Error during data insertion {e}")
        finally:
            conn.close()
            logger.info("database connection closed")
            
    delete_clean_data()
    delete_raw_data()

Writing load.py


# Config file

In [4]:
%%writefile config.py

## using this for maintaining the directories
import os

# Define the base directory explicitly
BASE_DIR = r"C:\Users\matth\OneDrive\Desktop\DataEngineering\WeatherETLPipeline"

# Log file path
LOG_Path = os.path.join(BASE_DIR,"logs","pipeline.log")

# Common file paths
CITIES_CONFIG_PATH = os.path.join(BASE_DIR,"cities_config.json")
RAW_COMPILED_PATH = os.path.join(BASE_DIR, "data", "raw_compiled_data.json")
RAW_DATA_PATH = os.path.join(BASE_DIR, "data", "raw_weather_data.json")
CLEAN_DATA_PATH = os.path.join(BASE_DIR, "data", "clean_weather_data.csv")
ENV_PATH = os.path.join(BASE_DIR, ".env")  # If you store your env file here
LOG_PATH = os.path.join(BASE_DIR, "logs", "pipeline.log")  # Example log file path

Writing config.py


# Main Pipeline Script

In [23]:
%%writefile etl_pipeline.py

import pandas as pd
from scripts.logger import logger
from scripts.encodings_setup import load_env_api, load_env_cities, encoding, encodings_to_config
from scripts.extract import city_weather_data_extraction, write_raw_data, write_compiled_raw_data
from scripts.transform import read_raw_data, transform_data, write_to_cleaned_data
from scripts.load import read_clean_data, env_db_connection, get_or_insert_location, get_or_insert_weather, insert_record, insert_alert, delete_clean_data, delete_raw_data

def run_pipeline():
    try:
        print("Starting the ETL process \n")
        logger.info("Starting the ETL process")

        print("Encoding cities \n")
        logger.info("Encoding cities")
        #Encoding
        api_key = load_env_api()
        cities = load_env_cities()
        encodings = encoding(api_key, cities)
        encodings_to_config(encodings)
        print("Encoding Complete \n")
        logger.info("Encoding complete")

        #Extract Data
        print("Extracting Data \n")
        logger.info("Extracting data")
        weather_data = city_weather_data_extraction()
        if weather_data is None:
            logger.error("Weather data extraction failed")
            return
        write_raw_data(weather_data)
        print("Data Extraction Complete")
        logger.info("Data extraction complete")


        #Transform Data
        print("Transforming Data \n") 
        logger.info("Transforming Data")
        data = read_raw_data()
        df = transform_data(data)
        if df is None:
            logger.error("Data transformation failed")
            return
        write_to_cleaned_data(df)
        print("Transforming Data Complete \n")
        logger.info("Data Transformation Complete")

        #Load Data
        print("Loading Data \n")
        logger.info("Loading Data")
        df = read_clean_data()
        if df is None or df.empty:
            logger.error("No clean data available for loading")
            return
        
        conn = env_db_connection()
        if conn is None:
            logger.error("Database connection failed")
            return

        try:
            with conn.cursor() as cursor:
                for _, row in df.iterrows():
                    location_id = get_or_insert_location(cursor, row['latitude'], row['longitude'], row['city'], row['timezone'], row['timezone_offset'])
                    weather_id = get_or_insert_weather(cursor, row['weather_id'], row['weather_main'], row['weather_description'])
                    record_id = insert_record(cursor, location_id, weather_id, row)

                    if pd.notna(row['alerts']) and row['alerts'].strip():
                        insert_alert(cursor, record_id, row['alerts'])
                conn.commit()
            logger.info("data inserted successfully")
        except Exception as e:
            logger.error(f"Error while inserting data: {e}")
        finally:
            conn.close()
            print("data inserted successfully & connection closed")
            logger.info("Database connection closed")
            
        delete_clean_data()
        delete_raw_data()
        print("Loading Data Complete \n")
        logger.info("Loading Data Complete")

        print("ETL process completed successfully")
        logger.info("ETL process completed successfully")
        
    except Exception as e:
        logger.critical(f"Pipeline execution failed: {e}", exc_info=True)

run_pipeline()

Writing etl_pipeline.py


# Testing scripts 

In [45]:
%run scripts/encodings_setup.py

City: Denver,CO,USA - Latitude: 39.7392, Longitude: -104.9847
City: Austin,TX,USA - Latitude: 30.2711, Longitude: -97.7437
City: Stuttgart,DE - Latitude: 48.7823, Longitude: 9.177
Data has been written to cities_config.json


  encodings = pd.concat([encodings, new_row], ignore_index=True)


In [46]:
%run scripts/extract.py

{'lat': 39.7392, 'lon': -104.9847, 'timezone': 'America/Denver', 'timezone_offset': -25200, 'current': {'dt': 1739896438, 'sunrise': 1739886501, 'sunset': 1739925578, 'temp': 10.53, 'feels_like': 2.32, 'pressure': 1024, 'humidity': 66, 'dew_point': 2.37, 'uvi': 1.67, 'clouds': 51, 'visibility': 10000, 'wind_speed': 4.65, 'wind_deg': 28, 'wind_gust': 2.75, 'weather': [{'id': 803, 'main': 'Clouds', 'description': 'broken clouds', 'icon': '04d'}]}, 'City': 'Denver,CO,USA'}
{'lat': 30.2711, 'lon': -97.7437, 'timezone': 'America/Chicago', 'timezone_offset': -21600, 'current': {'dt': 1739896438, 'sunrise': 1739884097, 'sunset': 1739924506, 'temp': 57.79, 'feels_like': 57.36, 'pressure': 1016, 'humidity': 87, 'dew_point': 53.96, 'uvi': 1.79, 'clouds': 100, 'visibility': 10000, 'wind_speed': 4.61, 'wind_deg': 150, 'weather': [{'id': 804, 'main': 'Clouds', 'description': 'overcast clouds', 'icon': '04d'}]}, 'alerts': [{'sender_name': 'NWS Austin/San Antonio TX', 'event': 'Cold Weather Advisory'

In [47]:
%run scripts/transform.py

Data saved to C:\Users\matth\OneDrive\Desktop\DataEngineering\WeatherETLPipeline\data\clean_weather_data.csv


In [48]:
%run scripts/load.py

connected to postgresql on local host
location is already stored in the database
weather is already stored in the database
record has been inserted into the table
location is already stored in the database
weather is already stored in the database
record has been inserted into the table
alert has been inserted into the table
location is already stored in the database
weather is already stored in the database
record has been inserted into the table
alert has been inserted into the table
data inserted successfully & connection closed
File deleted successfully
File deleted successfully


In [49]:
%run scripts/etl_pipeline.py

Starting the ETL process 

Encoding cities 

City: Denver,CO,USA - Latitude: 39.7392, Longitude: -104.9847
City: Austin,TX,USA - Latitude: 30.2711, Longitude: -97.7437
City: Stuttgart,DE - Latitude: 48.7823, Longitude: 9.177
Data has been written to cities_config.json
Encoding Complete 

Extracting Data 



  encodings = pd.concat([encodings, new_row], ignore_index=True)


{'lat': 39.7392, 'lon': -104.9847, 'timezone': 'America/Denver', 'timezone_offset': -25200, 'current': {'dt': 1739896438, 'sunrise': 1739886501, 'sunset': 1739925578, 'temp': 10.53, 'feels_like': 2.32, 'pressure': 1024, 'humidity': 66, 'dew_point': 2.37, 'uvi': 1.67, 'clouds': 51, 'visibility': 10000, 'wind_speed': 4.65, 'wind_deg': 28, 'wind_gust': 2.75, 'weather': [{'id': 803, 'main': 'Clouds', 'description': 'broken clouds', 'icon': '04d'}]}, 'City': 'Denver,CO,USA'}
{'lat': 30.2711, 'lon': -97.7437, 'timezone': 'America/Chicago', 'timezone_offset': -21600, 'current': {'dt': 1739896438, 'sunrise': 1739884097, 'sunset': 1739924506, 'temp': 57.79, 'feels_like': 57.36, 'pressure': 1016, 'humidity': 87, 'dew_point': 53.96, 'uvi': 1.79, 'clouds': 100, 'visibility': 10000, 'wind_speed': 4.61, 'wind_deg': 150, 'weather': [{'id': 804, 'main': 'Clouds', 'description': 'overcast clouds', 'icon': '04d'}]}, 'alerts': [{'sender_name': 'NWS Austin/San Antonio TX', 'event': 'Cold Weather Advisory'

# next steps

data validation & quality checks
automate scheduling (Apache Airflow/Cron Jobs/AWS Lambda)
performance optimization?

#  Log file

In [5]:
%%writefile logger.py

import logging
from scripts.config import LOG_PATH

# clear existing handlers to prevent duplicates
logger = logging.getLogger(__name__)
logger.handlers.clear()

# Configure Logging w/ the correct log file path
logging.basicConfig(
    filename=LOG_PATH,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

logger.info("Logger initialized successfully.")

Writing logger.py
