In [13]:
import pandas as pd
pd.set_option("display.max_columns", 999)

In [14]:
# This data will initially come from data extractor script.
meteo_data = pd.read_parquet("/Users/trentino/Work/OFM/droplet/2026 Forecast/data/open_meteo_weather.parquet")

In [15]:
def find_missing_dates(df: pd.DataFrame) -> pd.DatetimeIndex:

    if 'date' not in df.columns:
        raise ValueError("DataFrame must have a 'date' column.")

    try:
        df_dates = pd.to_datetime(df['date']).dt.date
    except Exception as e:
        print(f"Error converting 'date' column: {e}")
        return pd.DatetimeIndex([])

    min_date = df_dates.min()
    max_date = df_dates.max()

    if pd.isna(min_date) or pd.isna(max_date):
        print("No valid dates found in the 'date' column.")
        return pd.DatetimeIndex([])

    full_date_range = pd.date_range(start=min_date, end=max_date, freq='D')

    full_range_set = set(full_date_range.date)
    df_dates_set = set(df_dates)

    missing_dates = full_range_set - df_dates_set

    return pd.DatetimeIndex(sorted(list(missing_dates)))


In [16]:
missing = find_missing_dates(meteo_data)

print(f"Min date: {meteo_data['date'].min()}, Max date: {meteo_data['date'].max()}\n")
print(f"Missing dates:\n{missing}")

Min date: 2018-01-01 00:00:00, Max date: 2025-11-03 00:00:00

Missing dates:
DatetimeIndex([], dtype='datetime64[ns]', freq=None)


In [17]:
min(missing), max(missing)

ValueError: min() arg is an empty sequence

In [None]:
# Run below code only if there are missing dates in the above code
# Also, in the below code you have to update the missing dates by using the start and end dates from above.

In [12]:
import time
import logging
import pandas as pd
import numpy as np
from datetime import datetime
from pymongo import MongoClient

# --- OpenMeteo Specific Imports ---
import openmeteo_requests
import requests_cache
from retry_requests import retry

# --- CONFIGURATION ---
MONGO_COLLECTION_NAME = "OpenMeteoWeatherData"
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
# --- ---


def convert_data(data):
    """
    Converts numpy/pandas types to Python-native types for MongoDB.
    """
    for document in data:
        for key, value in document.items():
            if isinstance(value, (np.float32, np.float64, np.float_)):
                document[key] = float(value)
            elif isinstance(value, (np.int32, np.int64, np.int_)):
                document[key] = int(value)
            elif isinstance(value, (pd.Timestamp, np.datetime64)):
                document[key] = pd.to_datetime(value).to_pydatetime()
            elif pd.isna(value):
                document[key] = None
            elif key in ("date", "message_date") and isinstance(value, str):
                document[key] = datetime.strptime(value, "%Y-%m-%d")
    return data


def get_mongo_collection(collection_name):
    client = MongoClient(
        "mongodb+srv://mosqapp:40M631IeaF78vSX5@closio-mongodb-77838290.mongo.ondigitalocean.com/mosqapp-staging?replicaSet=closio-mongodb&tls=true&authSource=admin"
    )
    db = client["datamart"]
    return db[collection_name]


def store_to_mongo(data, collection_name):
    """
    Converts and stores the fetched data into MongoDB.
    """
    data = convert_data(data)
    collection = get_mongo_collection(collection_name)
    if data:
        collection.insert_many(data)
        logging.info(f"Inserted {len(data)} records into {collection_name}.")
    else:
        logging.info(f"No new data to insert for this batch.")


# --- Open-Meteo Client Setup ---
cache_session = requests_cache.CachedSession(".cache", expire_after=-1)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
openmeteo = openmeteo_requests.Client(session=retry_session)

# --- Locations ---
locations = {
    "Alkmaar": (52.6324, 4.7534),
    "Almere": (52.3508, 5.2647),
    "Beek": (50.9396, 5.7950),
    "Deventer": (52.2661, 6.1552),
    "Doesburg": (52.0136, 6.1317),
    "Druten": (51.8886, 5.6064),
    "Ede": (52.0403, 5.6641),
    "Geldermalsen": (51.8797, 5.2872),
    "Goes": (51.5048, 3.8887),
    "Helmond": (51.4793, 5.6570),
    "Lelystad": (52.5185, 5.4714),
    "Naaldwijk": (51.9942, 4.2092),
    "Nieuwegein": (52.0290, 5.0913),
    "Oosterhout": (51.6455, 4.8597),
    "Reusel": (51.3647, 5.1686),
    "Rosmalen": (51.7247, 5.3486),
    "Sliedrecht": (51.8242, 4.7764),
    "Sluis": (51.3083, 3.3861),
    "Tiel": (51.8860, 5.4293),
    "Utrecht": (52.0907, 5.1214),
    "Veghel": (51.6167, 5.5486),
    "Veldhoven": (51.4186, 5.4025),
    "Voorburg": (52.0747, 4.3594),
    "Waalwijk": (51.6879, 5.0575),
    "Weert": (51.2517, 5.7061),
    "Woerden": (52.0850, 4.8833),
}

def fetch_and_store_archive():
    """
    Fetches historical weather data for all locations and stores
    it in MongoDB, one city at a time to save memory.
    """


    params = {
        "start_date": "2025-10-15", # Add correct missing dates from above.
        "end_date": "2025-10-16",
        "daily": [
            "weather_code", "temperature_2m_max", "temperature_2m_min", "temperature_2m_mean",
            "apparent_temperature_max", "apparent_temperature_min", "apparent_temperature_mean",
            "sunrise", "sunset", "daylight_duration", "sunshine_duration",
            "precipitation_sum", "rain_sum", "snowfall_sum", "precipitation_hours",
            "wind_speed_10m_max", "wind_gusts_10m_max", "wind_direction_10m_dominant",
            "shortwave_radiation_sum", "et0_fao_evapotranspiration"
        ],
        "timezone": "Europe/Berlin"
    }

    url = "https://archive-api.open-meteo.com/v1/archive"

    logging.info("Starting historical data fetch...")

    for city, (lat, lon) in locations.items():
        params["latitude"] = lat
        params["longitude"] = lon

        logging.info(f"--- Fetching data for {city} ---")

        try:
            responses = openmeteo.weather_api(url, params=params)
            response = responses[0]

            daily = response.Daily()

            daily_data = {
                "date": pd.date_range(
                    start=pd.to_datetime(daily.Time(), unit="s", utc=True),
                    end=pd.to_datetime(daily.TimeEnd(), unit="s", utc=True),
                    freq=pd.Timedelta(seconds=daily.Interval()),
                    inclusive="left"
                ),
                "city": city,
                "latitude": lat,
                "longitude": lon,
                "weather_code": daily.Variables(0).ValuesAsNumpy(),
                "temperature_2m_max": daily.Variables(1).ValuesAsNumpy(),
                "temperature_2m_min": daily.Variables(2).ValuesAsNumpy(),
                "temperature_2m_mean": daily.Variables(3).ValuesAsNumpy(),
                "apparent_temperature_max": daily.Variables(4).ValuesAsNumpy(),
                "apparent_temperature_min": daily.Variables(5).ValuesAsNumpy(),
                "apparent_temperature_mean": daily.Variables(6).ValuesAsNumpy(),
                "sunrise": daily.Variables(7).ValuesAsNumpy(),
                "sunset": daily.Variables(8).ValuesAsNumpy(),
                "daylight_duration": daily.Variables(9).ValuesAsNumpy(),
                "sunshine_duration": daily.Variables(10).ValuesAsNumpy(),
                "precipitation_sum": daily.Variables(11).ValuesAsNumpy(),
                "rain_sum": daily.Variables(12).ValuesAsNumpy(),
                "snowfall_sum": daily.Variables(13).ValuesAsNumpy(),
                "precipitation_hours": daily.Variables(14).ValuesAsNumpy(),
                "wind_speed_10m_max": daily.Variables(15).ValuesAsNumpy(),
                "wind_gusts_10m_max": daily.Variables(16).ValuesAsNumpy(),
                "wind_direction_10m_dominant": daily.Variables(17).ValuesAsNumpy(),
                "shortwave_radiation_sum": daily.Variables(18).ValuesAsNumpy(),
                "et0_fao_evapotranspiration": daily.Variables(19).ValuesAsNumpy(),
                "message_date": datetime.utcnow()
            }

            city_df = pd.DataFrame(data=daily_data)

            records_list = city_df.to_dict('records')

            store_to_mongo(records_list, MONGO_COLLECTION_NAME)

        except Exception as e:
            logging.error(f"Error fetching data for {city}: {e}")

        time.sleep(2)

    logging.info("Historical data pipeline completed successfully.")


if __name__ == "__main__":
    fetch_and_store_archive()

2025-11-03 17:58:19,147 - INFO - Starting historical data fetch...
2025-11-03 17:58:19,147 - INFO - --- Fetching data for Alkmaar ---
2025-11-03 17:58:21,914 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:23,922 - INFO - --- Fetching data for Almere ---
2025-11-03 17:58:25,820 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:27,823 - INFO - --- Fetching data for Beek ---
2025-11-03 17:58:29,661 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:31,665 - INFO - --- Fetching data for Deventer ---
2025-11-03 17:58:33,493 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:35,501 - INFO - --- Fetching data for Doesburg ---
2025-11-03 17:58:37,464 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:39,471 - INFO - --- Fetching data for Druten ---
2025-11-03 17:58:41,457 - INFO - Inserted 2 records into OpenMeteoWeatherData.
2025-11-03 17:58:43,464 - INFO - --- Fetching data for Ede -