# Extracting `DAILY` data from API and Loading it to DB 

As the purpose of our pipeline is to make Weather Data available for comparison to flights and airports data, in the first step we need to load the weather data in a raw form (JSON) into our database. So in later steps we can transform it to meaningful and useful tables.

### General Presteps:

The Goal of this Notebook is to get raw JSON data for Daily and Hourly Weather for 3 airport weather stations and load it as it is to our database.
- Find Station IDs for **defined** airports 
- Define the start and end of the period
- get the API Key from the `.env`

### Imports

we will need the credentials we saved in the `.env` file. We also will need SQLAlchemy and its functions

In [1]:
# we will need the credentials we saved in the .env file
from dotenv import dotenv_values

# We also will need SQLAlchemy and its functions
from sqlalchemy import create_engine, types
from sqlalchemy.dialects.postgresql import JSON as postgres_json

import pandas as pd

# requests library will make the API calls. 
# the json package will parse the JSON string and convert it to Python data structures
import requests
import json

# with 'datetime' we want to catch the timestamp of the API call. For the actuality reference. 
# and 'time' for slowing down a .bit
from datetime import datetime
import time

### Defining Airports andd finding the Station IDs

For our Pipeline we will use weather data from the weather stations at the 3 highly frequented airports
- **JFK**: John F. Kennedy Airport
- **MIA**: Miami International Airport
- **LAX**: Los Angeles Airport

To find the Station IDs for the airpors without stressing our API Call limits, we will use the   search option of the **https://meteostat.net/**  

We can search for the names of the airports above and find the Station IDs.

Let's add them to the dictionary below.

In [2]:
airport_staids = {
    'MIA': 72202
    ,'MSY': 72231
    ,'MOB': 72223
           }
           

### Defining the period

Our flight Data is from 2024-01-01 until 2024-03-31. For the lectures we will use the same period for the meteostat JSON API.

In [44]:
period_start = "2005-07-01"
period_end = "2005-09-30"

### loading API Key

In [45]:
# getting API and DB credentials - Alternative 1: dotenv_values()

config = dotenv_values()

api_key = config['x-rapidapi-key'] # align the key label with your .env file

# Part 1: Daily Station Data

Each API call will get 3 months of weather data for one Station ID.  

In the [**RapidAPI**](https://rapidapi.com/meteostat/api/meteostat/playground) interface you can find the code syntax we need to make the call. 

For each call we need to create a querystring with required parameters.

### Objectives -  Daily Station Data:

- create a for-loop for the 3 airports, generating a **querystring for each API call**
- define an empty dictionary to collect: 
  - time of the call
  - airport code
  - station id
  - related data
- make the API calls using the for-loop and fill the dictionary
- create pandas dataframe from the dictionary
- load the DB credentials from the `.env`
- create the engine
- define data types for the postgresql table columns
- using pandas import the dataframe to the Table in the Schema of the DB

### Test: For-loop generating the querystrings

In [46]:
# testing for-loop: querystring for each airport

for airport in airport_staids:
   
    querystring = {
        "station": airport_staids[airport] # corresponding value in 'airport_staids' dictionary
        ,"start": period_start # variable we used to define a period start
        ,"end": period_end # variable we used to define a period end
        ,"model":"true" # what does this parameter do? check meteostat documentation.
    }
    print(airport, "\n", querystring)

MIA 
 {'station': 72202, 'start': '2005-07-01', 'end': '2005-09-30', 'model': 'true'}
MSY 
 {'station': 72231, 'start': '2005-07-01', 'end': '2005-09-30', 'model': 'true'}
MOB 
 {'station': 72223, 'start': '2005-07-01', 'end': '2005-09-30', 'model': 'true'}


In [40]:
# for-loop for the querystrings
for airport in airport_staids:
   
    querystring = {
        "station": airport_staids[airport],
        "start": period_start,
        "end": period_end,
        "model": "true"
    }
    
    # making one call with the current querystring
    response = requests.get(url, headers=headers, params=querystring)

    # --- Debug + safe parsing ---
    print(f"\nRequest for {airport}: status {response.status_code}")
    try:
        data = response.json()   # safer than json.loads(response.text)
    except ValueError:           # catches JSONDecodeError
        print(f"Invalid JSON for {airport}: {repr(response.text[:200])}...")
        data = None
    # ----------------------------

    # appending data to the dictionary:
    weather_dict['extracted_at'].append(datetime.now())              # timestamp 
    weather_dict['airport_code'].append(airport)                     # airport code    
    weather_dict['station_id'].append(airport_staids[airport])       # weather Station ID
    weather_dict['extracted_data'].append(data)                      # parsed JSON or None



Request for MIA: status 500
Invalid JSON for MIA: 'Internal Server Error'...

Request for MSY: status 500
Invalid JSON for MSY: 'Internal Server Error'...

Request for MOB: status 500
Invalid JSON for MOB: 'Internal Server Error'...


### API CALL daily (per station)

In [47]:
#  let's catch each response in a dictionary. create an empty dictionary with the following keys:

weather_dict = {'extracted_at':[], 
                'airport_code':[], 
                'station_id':[], 
                'extracted_data':[]
               }

# API CALL daily (station) - for the syntax: see the rapidapi interface

url = "https://meteostat.p.rapidapi.com/stations/daily"

headers = {
        "X-RapidAPI-Key": api_key,
        "X-RapidAPI-Host": "meteostat.p.rapidapi.com"
}

# for-loop for the querystrings
for airport in airport_staids:
   
    querystring = {
        "station":airport_staids[airport]
        ,"start":period_start
        ,"end":period_end
        ,"model":"true"
    }
    
    # making one call with the current querystring
    response = requests.get(url, headers=headers, params=querystring)
                
    # appending data to the dictionary:
    weather_dict['extracted_at'].append(datetime.now())       # timestamp, 
    weather_dict['airport_code'].append(airport)       # airport code    
    weather_dict['station_id'].append(airport_staids[airport])         # weater Station ID
    weather_dict['extracted_data'].append(json.loads(response.text))   # JSON string

#### Checkout the filled dictionary

In [48]:
weather_dict

{'extracted_at': [datetime.datetime(2025, 9, 12, 15, 23, 0, 922517),
  datetime.datetime(2025, 9, 12, 15, 23, 1, 153948),
  datetime.datetime(2025, 9, 12, 15, 23, 1, 608136)],
 'airport_code': ['MIA', 'MSY', 'MOB'],
 'station_id': [72202, 72231, 72223],
 'extracted_data': [{'meta': {'generated': '2025-09-12 13:23:00'},
   'data': [{'date': '2005-07-01 00:00:00',
     'tavg': 29.4,
     'tmin': 25.6,
     'tmax': 33.3,
     'prcp': 6.6,
     'snow': None,
     'wdir': None,
     'wspd': 8.6,
     'wpgt': None,
     'pres': 1014.7,
     'tsun': None},
    {'date': '2005-07-02 00:00:00',
     'tavg': 29.4,
     'tmin': 25.0,
     'tmax': 33.3,
     'prcp': 1.5,
     'snow': None,
     'wdir': None,
     'wspd': 9.0,
     'wpgt': None,
     'pres': 1014.3,
     'tsun': None},
    {'date': '2005-07-03 00:00:00',
     'tavg': 29.4,
     'tmin': 26.1,
     'tmax': 32.8,
     'prcp': 0.0,
     'snow': None,
     'wdir': None,
     'wspd': 11.2,
     'wpgt': None,
     'pres': 1016.8,
     'tsu

### Make it a dataframe

this is our raw data, which we now can load into the database

In [49]:
weather_daily_df = pd.DataFrame(weather_dict)
weather_daily_df

Unnamed: 0,extracted_at,airport_code,station_id,extracted_data
0,2025-09-12 15:23:00.922517,MIA,72202,"{'meta': {'generated': '2025-09-12 13:23:00'},..."
1,2025-09-12 15:23:01.153948,MSY,72231,"{'meta': {'generated': '2025-09-12 13:23:01'},..."
2,2025-09-12 15:23:01.608136,MOB,72223,"{'meta': {'generated': '2025-09-12 13:23:01'},..."


### SIDEBAR: For the curious and sceptics...

    In case you can't resist to know what the data looks like when flattened. 
    Here is the preview with pandas. BUT we are not transforming before loading in our pipeline just yet. 
    We Extract and Load the raw JSON.

> #### Note: we only used up 3 API calls per attempt

### Loading the data into the DB

Now all we need to create a table in your Schema in our database is part of the `weather_daily_df` dataframe.  
We can use pandas' ability to work with SQLAlchemy and "save" the data to the DB using the `.to_sql()` method

In [50]:
# getting API and DB credentials - Alternative 1: dotenv_values()

config = dotenv_values()
 
pg_user = config['POSTGRES_USER'] # align the key labels with your .env file
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_schema = config['POSTGRES_SCHEMA']
pg_pass = config['POSTGRES_PASS']

In [51]:
# updating the url
url = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}'

# creating the engine
engine = create_engine(url, echo=False)

In [52]:
engine.url # checking the url (pass is hidden)

postgresql://jonasvyshniauskas:***@data-analytics-course-2.c8g8r1deus2v.eu-central-1.rds.amazonaws.com:5432/gaussian_peppers

In [53]:
# defining data types for the DB
dtype_dict = {
    'extracted_at':types.DateTime,
    'airport_code': types.String,
    'station_id': types.Integer,
    'extracted_data':postgres_json
             }

In [54]:
# writing dataframe to DB
weather_daily_df.to_sql(name = 'weather_daily_raw', 
                       con = engine, 
                       schema = pg_schema, # pandas is allowing to specify, in which schema the table shall be created
                       if_exists='replace', 
                       dtype=dtype_dict,
                       index=False
                      )

3

The result of the last cell should give you the number of rows you imported to the DB table `weather_daily_raw`. Each row contains data from one API call.

Check in DBeaver if you see a new table in your Schema. Don't forget to refresh your Schema.

## Now continue with the hourly data.