# API ELT Notebook

This notebook demonstrates an Extract, Load, and Transform (ELT) process using data from an API. The following sections provide a step-by-step implementation of the pipeline, including API interaction, data transformation, and database integration.

## Sections Overview

1. **API Interaction**: Fetch data from the API using Python libraries.
2. **Data Transformation**: Process and clean the data for analysis.
3. **Database Integration**: Store the transformed data into a database for further use.

---


### SECTION 1: IMPORTS AND ENVIRONMENT SETUP
> This section imports all required libraries and loads environment variables
> for API authentication and database connectivity.


In [None]:

from pydotenv import Environment
import datetime
from time import sleep
import requests
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.orm import mapped_column, Mapped, declarative_base

# Load environment variables from local.env file
# Contains sensitive credentials like API keys and database connection details
env = Environment("./local.env")
API_KEY = env.get("weather_api_key")

---
### 2. CONFIGURATION AND DATABASE SETUP
> Define API endpoint template and establish database connection


In [None]:

# API endpoint template with placeholders for dynamic parameters
# Used to construct URLs for weather API requests
base_url = "http://api.weatherapi.com/v1/{method}.json?key={api_key}&q={location}&dt={date}"

# Create database engine for PostgreSQL connection
# Uses credentials from environment variables for security
engine = create_engine(
    f'postgresql://{env.get("db_login")}@{env.get("db_host")}/{env.get("db_name")}', 
    echo=True
)

## 3. Extract: Fetch Historical Weather Data from API

This section retrieves historical weather data for Lagos from the Weather API.

**Process Overview:**
- Iterates through date range (last 90 days to today)
- Constructs API URLs with proper parameters
- Makes HTTP requests with error handling
- Implements rate limiting with 1-second delays
- Stores all API responses for downstream processing

**Key Features:**
- Date formatting and incrementation
- Status code validation
- Exception handling for API errors
- Progress reporting

In [None]:

# Initialize date range: start from 1 day ago, end at today
current_datetime = datetime.datetime.today() - datetime.timedelta(days=14)  # Start date: 14 days ago
today = datetime.datetime.today()  # End date: today
responses = []  # List to store all API responses for processing

# Main extraction loop: iterates through each day in the date range
while True: 
    try:
        # Format current date as 'YYYY-MM-DD' for API query
        current_date = current_datetime.strftime('%Y-%m-%d')
        
        # Construct the API URL with formatted parameters
        # method: 'history' for historical data
        # location: 'Lagos' (hardcoded, can be parameterized)
        # date: current iteration date
        api_url = base_url.format(
            method='history', 
            api_key=API_KEY,
            location='Lagos',
            date=current_date
        )

        # Log progress for monitoring
        print("Fetching records for date: ", current_date, end='...')
        
        # Make HTTP GET request to the API
        response = requests.get(api_url)
        
        # Validate response status code
        if response.status_code != 200:
            raise Exception(f"Error: {response.status_code}")
        
        # Confirm successful request and store response
        print('Success.')
        responses.append(response.json())
        
        # Increment date by 1 day for next iteration
        current_datetime = current_datetime + datetime.timedelta(days=1)
        
        # Check if we've reached today's date; if so, exit loop
        if (current_datetime - today).days == 0:
            print("All records fetched successfully")
            break
        
        # Introduce 1-second delay to respect API rate limits
        sleep(1)
        
    except Exception as e:
        print(f"Error occurred: {e}")
        break

## 4. Transform: Parse and Structure API Response Data

This section transforms the nested JSON responses from the API into structured pandas DataFrames.

**Data Extraction Strategy:**
- **Hourly Data**: Extracts detailed hourly weather metrics from nested forecast arrays
- **Astronomical Data**: Extracts sunrise, sunset, and moonphase information
- **Daily Data**: Extracts daily aggregated weather metrics and conditions

**Transformation Steps:**
1. Flatten nested JSON structures using `json_normalize()`
2. Extract specific forecast data for each day
3. Combine related data (e.g., condition details) into single records
4. Validate dimensions of resulting DataFrames

In [None]:

# Extract hourly weather data from all responses
# json_normalize flattens the nested structure (response > forecast > forecastday > hour)
hour_dfs = [
    pd.json_normalize(response, record_path=['forecast', 'forecastday', 'hour']) 
    for response in responses
]
# Concatenate all hourly dataframes into a single dataframe
hour_df = pd.concat(hour_dfs, ignore_index=True, axis=0)

# Extract astronomical data (sunrise, sunset, moonphase)
# Takes the first (and only) forecastday's astro data from each response
astro_df = pd.DataFrame([
    response['forecast']['forecastday'][0]['astro'] 
    for response in responses
])

# Extract daily weather data (temperature, precipitation, conditions)
day_df = pd.DataFrame([
    response['forecast']['forecastday'][0]['day'] 
    for response in responses
])

# Flatten condition nested object into separate columns
# Drop the original 'condition' column and add its normalized components
day_df = pd.concat(
    [
        day_df.drop('condition', axis=1),  # All columns except 'condition'
        pd.json_normalize(day_df.condition)  # Normalized 'condition' data
    ], 
    axis=1
)

# Display dataframe dimensions for verification
print("Hourly data shape:", hour_df.shape)
print("Astronomical data shape:", astro_df.shape)
print("Daily data shape:", day_df.shape)

## 5. Load: Persist Transformed Data to PostgreSQL Database

This final section loads the three transformed dataframes into dedicated PostgreSQL tables.

**Tables Created/Updated:**
- `hourly_weather`: Detailed hourly weather observations
- `astro_weather`: Astronomical phenomena data (sunrise, sunset, moon phase)
- `daily_weather`: Daily aggregated weather summaries

**Configuration:**
- `if_exists='append'`: Adds new records to existing tables without truncating
- `index=False`: Excludes pandas index from being written to database
- Connection uses SQLAlchemy engine configured in Section 2

In [None]:

# Load hourly weather data
# Table: hourly_weather
# Records: One row per hour per day in the date range
hour_df.to_sql('hourly_weather', con=engine, if_exists='append', index=False)
print("✓ Hourly weather data loaded successfully")

# Load astronomical data
# Table: astro_weather
# Records: One row per day (sunrise, sunset, moonphase information)
astro_df.to_sql('astro_weather', con=engine, if_exists='append', index=False)
print("✓ Astronomical weather data loaded successfully")

# Load daily weather summary data
# Table: daily_weather
# Records: One row per day with aggregated metrics (min/max temps, precipitation, etc.)
day_df.to_sql('daily_weather', con=engine, if_exists='append', index=False)
print("✓ Daily weather data loaded successfully")

print("\nELT Pipeline completed successfully!")

## Summary

This ELT pipeline successfully:
1. **Extracted** historical weather data from the Weather API (90-day lookback)
2. **Transformed** nested JSON responses into three structured dataframes
3. **Loaded** the data into PostgreSQL tables for analysis

**Next Steps:**
- Verify data integrity in database tables
- Build analytical queries on weather patterns
- Create visualizations for weather trends
- Set up scheduled runs using Apache Airflow or similar orchestration tool