In [None]:
%pip install pandas-gbq google-cloud-bigquery requests

In [None]:
import pandas_gbq
import pandas as pd
import os
import time
import requests
from datetime import datetime, timedelta
from google.cloud import bigquery

In [None]:
# Google Cloud Project
PROJECT_ID = "adcz-adoki-poc"
LOCATION_ID = "europe-west1"

print(f"Project '{PROJECT_ID}' on a Location '{LOCATION_ID}'")

In [None]:
# Use local computing power
dataset_id = "demo_real_estate"
table_id = "property"

sql = f"SELECT property_id, address_latitude, address_longitude FROM {dataset_id}.{table_id} WHERE address_latitude > 0 LIMIT 10"
df_property = pandas_gbq.read_gbq(sql, project_id=PROJECT_ID, location=LOCATION_ID)

display(df_property)

In [None]:
coords_df = df_property[['address_latitude', 'address_longitude']].dropna().drop_duplicates().rename(columns={
    'address_latitude': 'lat',
    'address_longitude': 'lon'
})

hourly_params = [
    'temperature_2m',
    'relative_humidity_2m',
    'rain',
    'snowfall',
    'snow_depth',
    'windspeed_10m'
 ]

results = []
BASE_URL = "https://archive-api.open-meteo.com/v1/archive"
target_date = (datetime.now() - timedelta(days=2)).date()

for row in coords_df.itertuples(index=False):
    lat, lon = row.lat, row.lon
    params = {
        'latitude': lat,
        'longitude': lon,
        'start_date': target_date,
        'end_date': target_date,
        'hourly': ','.join(hourly_params),
        'timezone': 'Europe/Prague'
    }
    try:
        r = requests.get(BASE_URL, params=params, timeout=60)
        r.raise_for_status()
        data = r.json()
        hourly = data.get('hourly', {})
        times = hourly.get('time', [])

        for i, ts in enumerate(times):
            try:
                date_part, time_part = ts.split('T')
                hour_part = int(time_part.split(':')[0])
                record = {
                    'lat': lat,
                    'lon': lon,
                    'date': date_part,
                    'hour': hour_part,
                    'temperature': hourly.get('temperature_2m', [None]*len(times))[i],
                    'relative_humidity': hourly.get('relative_humidity_2m', [None]*len(times))[i],
                    'rain': hourly.get('rain', [None]*len(times))[i],
                    'wind_speed': hourly.get('windspeed_10m', [None]*len(times))[i],
                    'snow_depth': None
                }
                # Prefer snow_depth, fallback to snowfall
                snow_depth_val = hourly.get('snow_depth', [None]*len(times))[i]
                snowfall_val = hourly.get('snowfall', [None]*len(times))[i]
                record['snow_depth'] = snow_depth_val if snow_depth_val is not None else snowfall_val

                results.append(record)
            except Exception:
                continue
        time.sleep(0.15)  # Be gentle to the API
    except Exception as e:
        print(f"Error fetching weather for {lat},{lon}: {e}")

weather_df = pd.DataFrame(results)

prop_coord_map = df_property[['property_id', 'address_latitude', 'address_longitude']].dropna()

# Merge weather data with property IDs
merged = weather_df.merge(
    prop_coord_map,
    left_on=['lat', 'lon'],
    right_on=['address_latitude', 'address_longitude'],
    how='left'
 )

final_weather_df = merged[['property_id', 'date', 'hour', 'temperature', 'relative_humidity', 'rain', 'snow_depth', 'wind_speed']].copy()

if not final_weather_df.empty:
    final_weather_df['hour'] = final_weather_df['hour'].astype(int)

print(f"Total hourly weather records: {len(final_weather_df)}")
display(final_weather_df.head(30))

In [None]:
process_id = os.getenv("AIP_EXECUTION_ID", "exception")
now = datetime.utcnow()
final_weather_df["ins_dt"] = now
final_weather_df["ins_process_id"] = process_id
final_weather_df["upd_dt"] = now
final_weather_df["upd_process_id"] = process_id
final_weather_df["del_flag"] = False

if "date" in final_weather_df.columns:
    final_weather_df["date"] = pd.to_datetime(final_weather_df["date"]).dt.date

# Verify the DataFrame before loading
print("--- Verifying DataFrame before loading ---")
final_weather_df.info()
print("\n")
display(final_weather_df.head())
print("----------------------------------------")


# Import to BigQuery
project_id = "adcz-adoki-poc"
dataset_id = "demo_real_estate"
table_id = "property_weather"
full_table_id = f"{project_id}.{dataset_id}.{table_id}"

client = bigquery.Client(project=project_id)

# Define the schema to ensure data types match the target table
schema = [
    bigquery.SchemaField("property_id", "STRING"),
    bigquery.SchemaField("date", "DATE"),
    bigquery.SchemaField("hour", "INT64"),
    bigquery.SchemaField("temperature", "FLOAT64"),
    bigquery.SchemaField("relative_humidity", "FLOAT64"),
    bigquery.SchemaField("rain", "FLOAT64"),
    bigquery.SchemaField("snow_depth", "FLOAT64"),
    bigquery.SchemaField("wind_speed", "FLOAT64"),
    bigquery.SchemaField("ins_dt", "TIMESTAMP"),
    bigquery.SchemaField("ins_process_id", "STRING"),
    bigquery.SchemaField("upd_dt", "TIMESTAMP"),
    bigquery.SchemaField("upd_process_id", "STRING"),
    bigquery.SchemaField("del_flag", "BOOL"),
]

job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)

print(f"\nAttempting to load {len(final_weather_df)} rows into {full_table_id}...")

try:
    job = client.load_table_from_dataframe(final_weather_df, full_table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.

    if job.errors:
        print("\nBigQuery job failed with errors:")
        for error in job.errors:
            print(f"- {error['message']}")
    else:
        table = client.get_table(full_table_id)
        print(f"\nSuccessfully loaded {job.output_rows} rows.")
        print(f"Table '{full_table_id}' now contains {table.num_rows} rows.")

except Exception as e:
    print(f"An exception occurred during the load job: {e}")