In [13]:
####################### get packages #######################

import psycopg2
from sqlalchemy import create_engine, DateTime, Float, String, Integer, Column
from dotenv import load_dotenv
import os
import requests_cache
import pandas as pd
from retry_requests import retry
from datetime import timedelta

# Load login data from .env file
load_dotenv()

DB_NAME = os.getenv('DB_NAME')
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')

DB_STRING = f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# Create SQLAlchemy engine
engine = create_engine(DB_STRING)

# Create a new connection using psycopg2 for non-pandas operations
conn = psycopg2.connect(
    database=DB_NAME,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT
)

try:
    cursor = conn.cursor()
    cursor.execute("SELECT version();")
    record = cursor.fetchone()
    print("You are connected to -", record, "\n")
    
    # Load coordinates of used weather stations from database for fetching data from open-meteo api
    query_string1 = 'SELECT * FROM "02_silver"."dim_weather_stations"'
    weather_stations = pd.read_sql(query_string1, engine)    
    stations_id = weather_stations.stations_id.to_list()    
    stations_latitude = weather_stations.latitude.to_list()
    stations_longitude = weather_stations.longitude.to_list()
    
    # Load current dataset with a window of 7 days in the past
    query_string2 = """
        SELECT *
        FROM "01_bronze".raw_open_meteo_weather_history romwh
        WHERE "timestamp" >= current_date - interval '7 days'
        """
        
    df_hist = pd.read_sql(query_string2, engine)
    
except Exception as error:
    print("Error while connecting to PostgreSQL:", error)
    
finally:
    if conn:
        cursor.close()
        conn.close()
        print("PostgreSQL connection is closed")

You are connected to - ('PostgreSQL 16.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit',) 

PostgreSQL connection is closed


In [14]:
df_hist.timestamp.min()

Timestamp('2024-06-24 00:00:00+0000', tz='UTC')

In [15]:
df_hist.timestamp.max()

Timestamp('2024-06-25 21:00:00+0000', tz='UTC')

In [8]:
# Get the maximum timestamp all weather stations have entries available
max_timestamp = df_hist.groupby('stations_id')['timestamp'].max().min()
cut_off_time = max_timestamp.floor("d") #as timestamp
cut_off_date = cut_off_time.strftime("%Y-%m-%d") #as date
print(f"{cut_off_time}")
print(f"{cut_off_date}")  

2024-06-25
2024-06-25 00:00:00+00:00


In [12]:
# Testing if dataframe has been limited to cut_off_time
df_hist2 = df_hist.query("timestamp <= @cut_off_time")
df_hist2.timestamp.max()

Timestamp('2024-06-25 00:00:00+0000', tz='UTC')

In [16]:
####################### get weather forecast from api and push it into new table #######################

# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after=3600)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)

#Set input for params for API
weather_variables = [
            "temperature_2m",
            "relative_humidity_2m",
            "apparent_temperature",
            "precipitation",
            "cloud_cover",
            "wind_speed_10m",
            "wind_direction_10m",
            "direct_radiation",
            "diffuse_radiation",
            "sunshine_duration"
        ]
# define date at which new data set and old dataset will be merged together without voerlapping
start_date = cut_off_date

# Define limit for pulling data from api. Due to providing data with null values fecthing is restrcited of recent dates for last two days.
end_date = (pd.to_datetime("today") - timedelta(days=2)).strftime("%Y-%m-%d")

# Function to fetch weather data for a specific station
def fetch_weather_data(station_id, latitude, longitude):
    url = "https://archive-api.open-meteo.com/v1/archive"
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "hourly": weather_variables,
        "start_date": start_date,
	    "end_date": end_date,
    }
    response = retry_session.get(url, params=params)
    response.raise_for_status()
    data = response.json()
    
    hourly = data['hourly']
    dates = pd.date_range(
        start=pd.to_datetime(hourly['time'][0], utc=True),
        periods=len(hourly['time']),
        freq=pd.Timedelta(hours=1)
    )
    
    timestamp_fetched = pd.to_datetime('today').floor('s')
    
    hourly_data = pd.DataFrame({
        'timestamp': dates,
        'timestamp_fetched': timestamp_fetched,
        'stations_id': station_id,
        'temperature_2m': hourly['temperature_2m'],
        'relative_humidity_2m': hourly['relative_humidity_2m'],
        'apparent_temperature': hourly['apparent_temperature'],
        'precipitation': hourly['precipitation'],
        'cloud_cover': hourly['cloud_cover'],
        'wind_speed_10m': hourly['wind_speed_10m'],
        'wind_direction_10m': hourly['wind_direction_10m'],
        'direct_radiation': hourly['direct_radiation'],
        'diffuse_radiation': hourly['diffuse_radiation'],
        'sunshine_duration': hourly['sunshine_duration']               
    })
    
    return hourly_data

all_data = []

print("Fetching data from API...")
for i in range(len(stations_id)):
    station_data = fetch_weather_data(stations_id[i], stations_latitude[i], stations_longitude[i])
    all_data.append(station_data)

# Combine all data into a single DataFrame
new_weather_data = pd.concat(all_data, ignore_index=True)



Fetching data from API...


In [17]:
end_date

'2024-06-29'

In [18]:
new_weather_data

Unnamed: 0,timestamp,timestamp_fetched,stations_id,temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,cloud_cover,wind_speed_10m,wind_direction_10m,direct_radiation,diffuse_radiation,sunshine_duration
0,2024-06-25 00:00:00+00:00,2024-07-01 11:26:47,183,15.4,88,16.2,0.0,8,1.9,158,0.0,0.0,0.00
1,2024-06-25 01:00:00+00:00,2024-07-01 11:26:47,183,12.9,94,12.4,0.0,26,8.0,144,0.0,0.0,0.00
2,2024-06-25 02:00:00+00:00,2024-07-01 11:26:47,183,12.9,95,12.4,0.0,16,7.7,139,0.0,0.0,0.00
3,2024-06-25 03:00:00+00:00,2024-07-01 11:26:47,183,12.8,95,12.3,0.0,16,7.7,118,0.0,3.0,0.00
4,2024-06-25 04:00:00+00:00,2024-07-01 11:26:47,183,14.5,93,14.8,0.0,10,5.2,106,23.0,30.0,2604.43
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2875,2024-06-29 19:00:00+00:00,2024-07-01 11:26:48,5792,10.6,55,7.9,0.1,65,5.6,140,0.0,15.0,0.00
2876,2024-06-29 20:00:00+00:00,2024-07-01 11:26:48,5792,8.1,70,5.6,0.0,29,5.6,135,0.0,1.0,0.00
2877,2024-06-29 21:00:00+00:00,2024-07-01 11:26:48,5792,7.0,75,4.5,0.0,80,5.8,176,0.0,0.0,0.00
2878,2024-06-29 22:00:00+00:00,2024-07-01 11:26:48,5792,7.6,71,4.8,0.1,87,7.5,163,0.0,0.0,0.00


In [19]:
# Check for timeframe of fetched data
print(f"{new_weather_data.duplicated().value_counts()}")
print(f"{new_weather_data.timestamp.max()}")
print(f"{new_weather_data.timestamp.min()}")

False    2880
Name: count, dtype: int64
2024-06-29 23:00:00+00:00
2024-06-25 00:00:00+00:00


In [22]:
# Check for timeframe of fetched data
print(f"{df_hist.duplicated().value_counts()}")
print(f"{df_hist.timestamp.max()}")
print(f"{df_hist.timestamp.min()}")

False    1104
Name: count, dtype: int64
2024-06-25 21:00:00+00:00
2024-06-24 00:00:00+00:00


In [25]:
hist_timestamp_max = df_hist.timestamp.max()
new_timestamp_min = new_weather_data.timestamp.min()
time_diff = hist_timestamp_max - new_timestamp_min
time_diff

Timedelta('0 days 21:00:00')

In [28]:
df_hist_cut = df_hist.query("timestamp < @new_timestamp_min")
df_hist_cut.tail()

Unnamed: 0,timestamp,timestamp_fetched,stations_id,temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,cloud_cover,wind_speed_10m,wind_direction_10m,direct_radiation,diffuse_radiation,sunshine_duration
1094,2024-06-24 19:00:00+00:00,2024-06-27 18:17:04+00:00,662,23.7,49,23.2,0.0,60,8.8,71,160.0,143.0,3600.0
1095,2024-06-24 20:00:00+00:00,2024-06-27 18:17:04+00:00,662,22.0,58,21.2,0.0,15,12.3,75,121.0,83.0,3600.0
1096,2024-06-24 21:00:00+00:00,2024-06-27 18:17:04+00:00,662,20.3,64,20.2,0.0,8,8.0,85,41.0,43.0,3600.0
1097,2024-06-24 22:00:00+00:00,2024-06-27 18:17:04+00:00,662,18.6,69,18.5,0.0,1,7.2,96,3.0,7.0,0.0
1098,2024-06-24 23:00:00+00:00,2024-06-27 18:17:04+00:00,662,17.1,75,17.0,0.0,0,6.6,99,0.0,0.0,0.0


In [29]:
# Concat old and new dataframe together
updated_weather_data = pd.concat([df_hist_cut, new_weather_data], ignore_index=True)
updated_weather_data.head()

Unnamed: 0,timestamp,timestamp_fetched,stations_id,temperature_2m,relative_humidity_2m,apparent_temperature,precipitation,cloud_cover,wind_speed_10m,wind_direction_10m,direct_radiation,diffuse_radiation,sunshine_duration
0,2024-06-24 00:00:00+00:00,2024-06-27 18:17:04+00:00,691,16.0,77,16.5,0.0,0,1.4,180,0.0,0.0,0.0
1,2024-06-24 01:00:00+00:00,2024-06-27 18:17:04+00:00,691,13.3,89,13.0,0.0,1,5.4,188,0.0,0.0,0.0
2,2024-06-24 02:00:00+00:00,2024-06-27 18:17:04+00:00,691,12.6,92,11.9,0.0,11,7.1,210,0.0,0.0,0.0
3,2024-06-24 03:00:00+00:00,2024-06-27 18:17:04+00:00,691,12.0,92,11.1,0.0,10,7.9,231,0.0,0.0,0.0
4,2024-06-24 04:00:00+00:00,2024-06-27 18:17:04+00:00,691,11.9,94,11.5,0.0,5,5.0,300,0.0,0.0,0.0


In [30]:
# Check for tiemframe of concatenated dataframe
print(f"{updated_weather_data.drop(columns='timestamp_fetched').duplicated().value_counts()}")
print(f"{updated_weather_data.timestamp.max()}")
print(f"{updated_weather_data.timestamp.min()}")

False    3456
Name: count, dtype: int64
2024-06-29 23:00:00+00:00
2024-06-24 00:00:00+00:00


In [32]:
# Load login data from .env file
load_dotenv()

DB_NAME = os.getenv('DB_NAME')
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')

DB_STRING = f'postgresql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

# Create SQLAlchemy engine
engine = create_engine(DB_STRING)

# Create a new connection using psycopg2 for non-pandas operations
conn = psycopg2.connect(
    database=DB_NAME,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    host=DB_HOST,
    port=DB_PORT
)
try:
    cursor = conn.cursor()
    cursor.execute("SELECT version();")
    record = cursor.fetchone()
    print("You are connected to -", record, "\n")
    
    # Insert new data into old history table
    query_string1 = """
    INSERT INTO "01_bronze".raw_open_meteo_weather_history (
        timestamp,
        stations_id,
        temperature_2m,
        relative_humidity_2m,
        apparent_temperature,
        precipitation,
        cloud_cover,
        wind_speed_10m,
        wind_direction_10m,
        direct_radiation,
        diffuse_radiation,
        sunshine_duration
    )
    SELECT
        timestamp,
        stations_id,
        temperature_2m,
        relative_humidity_2m,
        apparent_temperature,
        precipitation,
        cloud_cover,
        wind_speed_10m,
        wind_direction_10m,
        direct_radiation,
        diffuse_radiation,
        sunshine_duration
    FROM "01_bronze".raw_open_meteo_weather_history_update_temp
    ON CONFLICT do nothing;
    """
        
    cursor = conn.cursor()
    cursor.execute(query_string1)
    
except Exception as error:
    print("Error while connecting to PostgreSQL:", error)
    
finally:
    if conn:
        cursor.close()
        conn.close()
        print("PostgreSQL connection is closed")

You are connected to - ('PostgreSQL 16.3 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 7.3.1 20180712 (Red Hat 7.3.1-12), 64-bit',) 

PostgreSQL connection is closed


In [36]:
# Check for tiemframe of concatenated dataframe
print(f"{df_history.duplicated().value_counts()}")
print(f"{df_history.timestamp.max()}")
print(f"{df_history.timestamp.min()}")

False    1209552
Name: count, dtype: int64
2024-06-29 21:00:00+00:00
2018-10-01 00:00:00+00:00
