In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from sqlalchemy import create_engine
import os

BASE_URL = "https://environment.data.gov.uk/flood-monitoring/id"

def fetch_water_level_stations():
    """Fetches all water level monitoring stations from the API."""
    stations_url = f"{BASE_URL}/stations?parameter=level"
    print("Fetching list of water level monitoring stations...")
    try:
        response = requests.get(stations_url)
        response.raise_for_status() 
        data = response.json()
        stations = data.get('items', [0:100])
        print(f"Found {len(stations)} stations.")
        return stations
    except requests.exceptions.RequestException as e:
        print(f"Error fetching stations: {e}")
        return []

def fetch_readings_since(station_id, since_timestamp):
    """Fetches all readings for a specific station since the provided timestamp."""
    readings_url = f"{BASE_URL}/stations/{station_id}/readings?since={since_timestamp}"
    try:
        response = requests.get(readings_url)
        response.raise_for_status()
        data = response.json()
        return data.get('items', []) 
    except requests.exceptions.RequestException:
        return [] 

def load_data_to_postgres(df, connection_string):
    """Loads a DataFrame into a PostgreSQL table."""
    if df.empty:
        print("DataFrame is empty. No data will be loaded to PostgreSQL.")
        return
    
    table_name = 'flood_level_readings' 

    print(f"\nLoading {len(df)} records into table: {table_name}...")
    try:
        engine = create_engine(connection_string)
        
        df.to_sql(
            name=table_name,
            con=engine,
            if_exists='append',
            index=False       
        )
        print("Successfully loaded data into PostgreSQL.")
    except Exception as e:
        print(f"Error loading data into PostgreSQL: {e}")

def run_etl_pipeline():
    """The main function to orchestrate the ETL process."""
    stations = fetch_water_level_stations()
    
    if not stations:
        print("No stations found. Aborting pipeline.")
        return

    one_hour_ago = datetime.now(timezone.utc) - timedelta(hours=2)
    since_timestamp_iso = one_hour_ago.strftime('%Y-%m-%dT%H:%M:%SZ')
    print(f"\nFetching new readings since: {since_timestamp_iso}")

    final_data_list = []
    total_stations = len(stations)
    
    print(f"Starting data collection for {total_stations} stations...")
    
    for i, station in enumerate(stations):
        station_id = station.get('stationReference')
        
        if (i + 1) % 200 == 0:
            print(f"Processing progress: {i + 1}/{total_stations}...")
            
        readings = fetch_readings_since(station_id, since_timestamp_iso)
        
        for reading in readings:
            transformed_data = {
                'station_id': station_id,
                'station_name': station.get('label'),
                'town': station.get('town'),
                'river_name': station.get('riverName'),
                'latitude': station.get('lat'),
                'longitude': station.get('long'),
                'reading_datetime': reading.get('dateTime'),
                'water_level_metres': reading.get('value')
            }
            final_data_list.append(transformed_data)

    if not final_data_list:
        print("\nNo new readings found in the last hour. ETL process finished.")
        return

    df = pd.DataFrame(final_data_list)
    df['reading_datetime'] = pd.to_datetime(df['reading_datetime'])
    df['load_timestamp'] = datetime.now(timezone.utc)
    
    print(f"\nETL process completed. {len(df)} new readings collected.")
    print("Sample of the transformed data:")
    print(df.head())
    
    CONNECTION_STRING = os.getenv('NEON_CONNECTION_STRING')
    
    if not CONNECTION_STRING:
        print("\nERROR: Connection string not found. Please set the NEON_CONNECTION_STRING environment variable.")
        return

    load_data_to_postgres(df, CONNECTION_STRING)


if __name__ == "__main__":
    run_etl_pipeline()

Buscando lista de estações...
Encontradas 4177 estações.

Buscando leituras que ocorreram desde: 2025-10-15T21:37:15Z
Iniciando coleta das leituras para 4177 estações...


KeyboardInterrupt: 