# Extracting `HOURLY` data from API and Loading it to DB 

As the purpose of our pipeline is to make Weather Data available for comparison to flights and airports data, in the first step we need to load the weather data in a raw form (JSON) into our database. So in later steps we can transform it to meaningful and useful tables.

### General Presteps:

The Goal of this Notebook is to get raw JSON data for **Hourly** Weather for 3 airport weather stations and load it as it is to our database.
- Find Station IDs for **defined** airports (we use the same stations as for the daily data)
- Define the start and end of the period
- get the API Key from the `.env`

### Imports

we will need the credentials we saved in the `.env` file. We also will need SQLAlchemy and its functions

In [None]:
# we will need the credentials we saved in the .env file
from dotenv import dotenv_values

# We also will need SQLAlchemy and its functions
from sqlalchemy import create_engine, types
from sqlalchemy.dialects.postgresql import JSON as postgres_json

import pandas as pd

# requests library will make the API calls. 
# the json package will parse the JSON string and convert it to Python data structures
import requests
import json

# with 'datetime' we want to catch the timestamp of the API call. For the actuality reference. 
# and 'time' for slowing down a .bit
from datetime import datetime
import time

### Defining Airports andd finding the Station IDs

For our Pipeline we will use weather data from the weather stations at the 3 highly frequented airports
- **JFK**: John F. Kennedy Airport
- **MIA**: Miami International Airport
- **LAX**: Los Angeles Airport

To find the Station IDs for the airpors without stressing our API Call limits, we will use the   search option of the **https://meteostat.net/**  

Search for the names of the airports above and find the Station IDs

In [35]:
airport_staids = {
        'MIA': 72202,  
    'MCO': 72205, 
    'TPA': 72211 
                          }

### Defining the period

Our flight Data is from 2024-01-01 until 2024-03-31. For the lectures we will use the same period for the meteostat JSON API.

In [36]:
period_start = "2017-08-30"  # Added the closing quote
period_end = "2017-09-30"

### loading API Key

In [38]:
# getting API and DB credentials - Alternative 1: dotenv_values()

config = dotenv_values()

api_key = config['X-RapidAPI_Key'] # align the key label with your .env file

# Part 2: Hourly Station Data

During your research of the API you might have noticed that the `Hourly Station API` has an additional restriction:

>#### Hourly data can be queried for a maximum of 30 days per request. 
>https://dev.meteostat.net/api/stations/hourly.html  
> 
>...actually the notation is not exact, we can still get a full month with 31 days. Nobody is perfect.

### Objectives -  Hourly Station Data:

- create a for-loop for the 3 airports with a nested for-loop for monthly start-, end-dates
- each nested iteration shall generate a **querystring for an API call**
- define an empty dictionary to collect: 
  - time of the call
  - airport code
  - station id 
  - related data
- make the API calls using the nested for-loops and fill the dictionary
- create pandas dataframe from the dictionary
- load the DB credentials from the `.env`
- create the engine
- define data types for the postgresql table columns
- using pandas import the dataframe to the Table in the Schema of the DB

### Generating start & end days per month

We need to create a for-loop iterating over pairs of first and last days of months

Let's say, we want to cover same 3 months of the flights data: **01/01/2024 - 31/03/2024**

In [39]:
print(f'period: from {period_start} until {period_end}')

period: from 2017-08-30 until 2017-09-30



We can use `pd.date_range()` to get first days in a monthly frequence.  
Then using `pd.offsets.MonthEnd()` we get the last days of the month.  
Cool thing is, `pd.offsets.MonthEnd()` actually evaluates whether the month is 31, 30, 29 or 28 day long.

#### Test: getting the first day of each month

In [40]:
pd.date_range(start=period_start, end=period_end, freq='MS')

DatetimeIndex(['2017-09-01'], dtype='datetime64[ns]', freq='MS')

#### Test: getting the last day of a month by adding `+ pd.offsets.MonthEnd()`

In [41]:
pd.date_range(start=period_start, end=period_end, freq='MS') + pd.offsets.MonthEnd()

DatetimeIndex(['2017-09-30'], dtype='datetime64[ns]', freq=None)

#### let's save it in variables...

In [42]:
first_days = pd.date_range(start=period_start, end=period_end, freq='MS')
last_days = first_days + pd.offsets.MonthEnd() # see, what we did here? DRY rules! :)

#### ... and make it lists of strings

In [43]:
first_days_list = first_days.strftime('%Y-%m-%d').tolist()
last_days_list = last_days.astype(str).tolist()

In [44]:
print(first_days_list) 
print(last_days_list)

['2017-09-01']
['2017-09-30']


#### building pairs of (start, end) per month in a list

In [45]:
monthly_ranges =[]

for start_date, end_date in zip(first_days_list, last_days_list):
    monthly_ranges.append((start_date, end_date))

monthly_ranges

# alternative as list comprehension (less beginner-friendly)
# monthly_ranges = [(start_date, end_date) for start_date, end_date in zip(first_days_list, last_days_list)]

[('2017-09-01', '2017-09-30')]

### Test: `simulating` nested for-loops, creating querystring for each airport for each month

in order not to overstress the API with too many calls at once, we will use `time.sleep(0.34)` at the end of each loop

In [46]:
import time

for airport in airport_staids:
    print(airport)
    
    for onemonth in monthly_ranges:
    
        querystring = {
            "station":airport_staids[airport]
            ,"start":onemonth[0]
            ,"end":onemonth[1]
            ,"model":"true"
        }
        
        print(querystring)
        
        time.sleep(0.34)
    print()

MIA
{'station': 72202, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}

MCO
{'station': 72205, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}

TPA
{'station': 72211, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}



How many API calls would that cost per attempt?

>#### If the querystrings look reasonable, we can now run the Hourly Data API calls. Fingers crossed!

### API CALL hourly ( per station) 

In [47]:
#  let's catch each response in a dictionary. create an empty dictionary with the following keys:
weather_hourly_dict = {'extracted_at':[], 
                       'airport_code':[], 
                       'station_id':[], 
                       'extracted_data':[]}

# API CALL hourly (station) - for the syntax: see the rapidapi interface

url = "https://meteostat.p.rapidapi.com/stations/hourly"

headers = {
        "X-RapidAPI-Key": api_key,
        "X-RapidAPI-Host": "meteostat.p.rapidapi.com"
}


# double for-loop for the querystrings
for airport in airport_staids:
    
    # adding some logs
    print(airport) 
    
    for onemonth in monthly_ranges:
    
        querystring = {
            "station":airport_staids[airport]
            ,"start":onemonth[0]
            ,"end":onemonth[1]
            ,"model":"true"
        }
        
        # making one call with the current querystring
        response = requests.get(url, headers=headers, params=querystring)
        
        # adding some logs to catch errors
        if response.status_code != 200:
            print(f'status code {response.status_code} -> research error')
            print(querystring, end="\n\n")
        else:
            print(querystring)
        
        # appending data to the dictionary:
        weather_hourly_dict['extracted_at'].append(datetime.now())                # timestamp,
        weather_hourly_dict['airport_code'].append(airport)                       # airport code
        weather_hourly_dict['station_id'].append(airport_staids[airport])         # weater Station ID
        weather_hourly_dict['extracted_data'].append(json.loads(response.text))   # JSON string
        
        time.sleep(0.34)
        
    print()

MIA
{'station': 72202, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}

MCO
{'station': 72205, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}

TPA
{'station': 72211, 'start': '2017-09-01', 'end': '2017-09-30', 'model': 'true'}



In [48]:
# checking the dictionary
weather_hourly_dict

{'extracted_at': [datetime.datetime(2025, 2, 13, 20, 1, 47, 613777),
  datetime.datetime(2025, 2, 13, 20, 1, 48, 250881),
  datetime.datetime(2025, 2, 13, 20, 1, 48, 951475)],
 'airport_code': ['MIA', 'MCO', 'TPA'],
 'station_id': [72202, 72205, 72211],
 'extracted_data': [{'meta': {'generated': '2025-02-12 20:27:23'},
   'data': [{'time': '2017-09-01 00:00:00',
     'temp': 30.6,
     'dwpt': 23.3,
     'rhum': 65.0,
     'prcp': 0.0,
     'snow': None,
     'wdir': 120.0,
     'wspd': 14.8,
     'wpgt': None,
     'pres': 1015.5,
     'tsun': None,
     'coco': None},
    {'time': '2017-09-01 01:00:00',
     'temp': 30.0,
     'dwpt': 24.4,
     'rhum': 72.0,
     'prcp': 0.0,
     'snow': None,
     'wdir': 140.0,
     'wspd': 11.2,
     'wpgt': None,
     'pres': 1016.6,
     'tsun': None,
     'coco': None},
    {'time': '2017-09-01 02:00:00',
     'temp': 30.0,
     'dwpt': 23.9,
     'rhum': 70.0,
     'prcp': 0.0,
     'snow': None,
     'wdir': 120.0,
     'wspd': 13.0,
     '

In [49]:
# creating a dataframe

weather_hourly_df = pd.DataFrame(weather_hourly_dict)
weather_hourly_df

Unnamed: 0,extracted_at,airport_code,station_id,extracted_data
0,2025-02-13 20:01:47.613777,MIA,72202,"{'meta': {'generated': '2025-02-12 20:27:23'},..."
1,2025-02-13 20:01:48.250881,MCO,72205,"{'meta': {'generated': '2025-02-12 20:27:24'},..."
2,2025-02-13 20:01:48.951475,TPA,72211,"{'meta': {'generated': '2025-02-12 20:27:25'},..."


### SIDEBAR: For the curious and sceptics...

    In case you can't resist to know what the data looks like when flattened. 
    Here is the preview with pandas. BUT we are not transforming before loading in our pipeline just yet. 
    We Extract and Load the raw JSON.

In [72]:
#json

import pandas as pd
import json

weather_hourly_df = pd.DataFrame(data)
weather_hourly_df['data'] = weather_hourly_df['extracted_data'].apply(lambda x: x['data'])
exploded_df = weather_hourly_df.explode('data')
normalized_data = pd.json_normalize(exploded_df['data'])
final_df = pd.concat([exploded_df.drop(columns=['data', 'extracted_data']).reset_index(drop=True), normalized_data], axis=1)
print(final_df)
final_df.info()


                   extracted_at airport_code  station_id                 time  \
0    2025-02-13 20:01:47.613777          MIA       72202  2017-09-01 00:00:00   
1    2025-02-13 20:01:47.613777          MIA       72202  2017-09-01 01:00:00   
2    2025-02-13 20:01:47.613777          MIA       72202  2017-09-01 02:00:00   
3    2025-02-13 20:01:47.613777          MIA       72202  2017-09-01 03:00:00   
4    2025-02-13 20:01:47.613777          MIA       72202  2017-09-01 04:00:00   
...                         ...          ...         ...                  ...   
2155 2025-02-13 20:01:48.951475          TPA       72211  2017-09-30 19:00:00   
2156 2025-02-13 20:01:48.951475          TPA       72211  2017-09-30 20:00:00   
2157 2025-02-13 20:01:48.951475          TPA       72211  2017-09-30 21:00:00   
2158 2025-02-13 20:01:48.951475          TPA       72211  2017-09-30 22:00:00   
2159 2025-02-13 20:01:48.951475          TPA       72211  2017-09-30 23:00:00   

      temp  dwpt  rhum  prc

> #### Note: we only used up 9 more API calls

### Loading the data into the DB

In [67]:
# getting API and DB credentials - Alternative 1: dotenv_values()

config = dotenv_values()
 
pg_user = config['POSTGRES_USER'] # align the key labels with your .env file
pg_host = config['POSTGRES_HOST']
pg_port = config['POSTGRES_PORT']
pg_db = config['POSTGRES_DB']
pg_schema = config['POSTGRES_SCHEMA']
pg_pass = config['POSTGRES_PASS']

In [68]:
# updating the url
url = f'postgresql://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}'

# creating the engine
engine = create_engine(url, echo=False)

In [69]:
engine.url # checking the url (pass is hidden)

postgresql://martinvolman:***@data-analytics-course-2.c8g8r1deus2v.eu-central-1.rds.amazonaws.com:5432/hh_analytics_24_4

In [73]:
# defining data types for the DB
dtype_dict = {
    'extracted_at': types.DateTime,
    'airport_code': types.String,
    'station_id': types.Integer,
    'time': types.String,
    'temp': types.Float,
    'dwpt': types.Float,
    'rhum': types.Float,
    'prcp': types.Float,
    'snow': types.String,
    'wdir': types.Float,
    'wspd': types.Float,
    'wpgt': types.String,
    'pres': types.Float,
    'tsun': types.String,
    'coco': types.Float,
}

# dtype_dict = {
#     'extracted_at':types.DateTime,
#     'airport_code': types.String,
#     'station_id': types.Integer,
#     'extracted_data':postgres_json
#              }
#-------
# #pruebo aca
# import pandas as pd
# import json
# import psycopg2  # O el conector de tu base de datos

# def create_dataframe_from_json(json_string):
#     #"""Crea un DataFrame de Pandas desde una cadena JSON."""
#     try:
#         data = json.loads(json_string)['data']  # Accede directamente a 'data'
#         return pd.DataFrame(data)
#     except (json.JSONDecodeError, KeyError, TypeError) as e:
#         print(f"Error al procesar JSON: {e}")
#         return pd.DataFrame()  # Devuelve un DataFrame vacío en caso de error

# def create_table_and_insert(df, table_name, conn):
#     #"""Crea la tabla (si es necesario) e inserta los datos."""
#     if df.empty:
#         return  # No hay nada que hacer si el DataFrame está vacío

#     try:
#         # Usa Pandas para inferir tipos y crear/insertar
#         df.to_sql(table_name, conn, if_exists='replace', index=False, method='multi')  # Usa 'multi' para inserciones más rápidas
#         print(f"Datos insertados en '{table_name}' correctamente.")
#     except Exception as e:
#         print(f"Error al insertar datos: {e}")
#         conn.rollback()  # Importante: Rollback en caso de error

# Ejemplo (reemplaza con tus datos y conexión):
# extracted_data = '{"meta": ..., "data": [...]}' # Tu cadena JSON
# df = create_dataframe_from_json(extracted_data)

# if not df.empty:
#     conn = psycopg2.connect(...)  # Detalles de tu conexión a la base de datos
#     create_table_and_insert(df, "nombre_de_tu_tabla", conn)
#     conn.close()

    


In [76]:
# writing dataframe to DB
# weather_hourly_df.to_sql(name = 'weather_hourly_raw', 
final_df.to_sql(name = 'weather_hourly_cleaned', 
                       con = engine, 
                       schema = pg_schema, 
                       if_exists='replace', 
                       dtype=dtype_dict,
                       index=False
                      )

160

If you see a '45' as the result of the last cell. Something went right again. :) 

Check in DBeaver if you see a new table called "weather_hourly_raw" in your Schema. Don't forget to refresh your Schema.

## Done. We finished "Loading"! :)