In [0]:
%pip install openmeteo-requests requests-cache retry-requests numpy pandas
%restart_python

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


**Data Import**

The plot location file path will need to be updated in case this notebook is copied for reuse


In [0]:
mylocation_sdf = spark.read.option("header", "true").csv("/Volumes/workspace/default/cbs_mv/APJ_Location.csv")  # Spark DataFrame

# Convert to pandas
mylocation_df = mylocation_sdf.toPandas()

# Show top rows
display(mylocation_df.head())


PlantID,Plant,Block,Unit,unit_lat,unit_long,Country,Block_Lat,Block_Long
MY0101,MY,Block_1,Unit_1,2.8809,101.352748,Malaysia,2.856769,101.333383
MY0102,MY,Block_1,Unit_2,2.8454,101.32249,Malaysia,2.856769,101.333383
MY0103,MY,Block_1,Unit_3,2.8535,101.352748,Malaysia,2.856769,101.333383
MY0104,MY,Block_1,Unit_4,2.8865,101.382602,Malaysia,2.856769,101.333383
MY0205,MY,Block_2,Unit_5,2.9405,101.399143,Malaysia,2.874095,101.395916


**Fetch API**


In [0]:
import openmeteo_requests
import pandas as pd
import requests_cache
from retry_requests import retry
import warnings
from pyspark.sql.functions import current_date
warnings.filterwarnings("ignore", category=UserWarning)  # for general warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)  # for deprecation

# Set up Open-Meteo client with cache and retries
cache_path = "/tmp/openmeteo_cache"
cache_session = requests_cache.CachedSession(cache_path, expire_after=-1)
retry_session = retry(cache_session, retries=5, backoff_factor=0.2)
openmeteo = openmeteo_requests.Client(session=retry_session)

# API endpoint and common request parameters
url = "https://archive-api.open-meteo.com/v1/archive"
start_date = "2025-03-01"
#end_date = spark.sql("select string(current_date()-5)").collect()[0][0]
end_date = spark.sql("select string(current_date()-2)").collect()[0][0]
hourly_vars = [
    "temperature_2m", "precipitation", "rain", "wind_speed_10m",
    "soil_temperature_0_to_7cm", "soil_moisture_0_to_7cm"
]

# This will store weather data for all units
all_weather_data = []

# Loop through each unit in DataFrame
for idx, row in mylocation_df.iterrows():
    latitude = row["unit_lat"]
    longitude = row["unit_long"]
    plant_id = row["PlantID"]
    unit_id = row["Unit"]

    # Prepare the API params
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "hourly": hourly_vars
    }

    try:
        responses = openmeteo.weather_api(url, params=params)
        response = responses[0]

        hourly = response.Hourly()
        df = pd.DataFrame({
            "datetime": pd.date_range(
                start=pd.to_datetime(hourly.Time(), unit="s", utc=True),
                end=pd.to_datetime(hourly.TimeEnd(), unit="s", utc=True),
                freq=pd.Timedelta(seconds=hourly.Interval()),
                inclusive="left"
            ),
            "temperature_2m": hourly.Variables(0).ValuesAsNumpy(),
            "precipitation": hourly.Variables(1).ValuesAsNumpy(),
            "rain": hourly.Variables(2).ValuesAsNumpy(),
            "wind_speed_10m": hourly.Variables(3).ValuesAsNumpy(),
            "soil_temperature_0_to_7cm": hourly.Variables(4).ValuesAsNumpy(),
            "soil_moisture_0_to_7cm": hourly.Variables(5).ValuesAsNumpy(),
            "PlantID": plant_id,
            "Unit": unit_id
        })

        all_weather_data.append(df)

    except Exception as e:
        print(f"Failed for {plant_id} - {unit_id}: {e}")

# --- Combine All Data ---
if all_weather_data:
    combined_weather_df = pd.concat(all_weather_data, ignore_index=True)
    display(combined_weather_df.head())
else:
    print("No data fetched.")




datetime,temperature_2m,precipitation,rain,wind_speed_10m,soil_temperature_0_to_7cm,soil_moisture_0_to_7cm,PlantID,Unit
2025-03-01T00:00:00.000Z,24.369501,0.0,0.0,3.9763298,24.1695,0.454,MY0101,Unit_1
2025-03-01T01:00:00.000Z,26.219501,0.0,0.0,5.232399,25.969501,0.437,MY0101,Unit_1
2025-03-01T02:00:00.000Z,27.469501,0.0,0.0,5.020677,26.5695,0.436,MY0101,Unit_1
2025-03-01T03:00:00.000Z,28.869501,0.0,0.0,5.3305535,27.5195,0.434,MY0101,Unit_1
2025-03-01T04:00:00.000Z,30.2695,0.1,0.1,4.9785542,28.619501,0.432,MY0101,Unit_1


In [0]:
# Merge on PlantID and Unit
combined_weather_df = combined_weather_df.merge(
    mylocation_df[["PlantID", "Unit", "unit_lat", "unit_long"]],
    on=["PlantID", "Unit"],
    how="left"
)

# Rename for clarity
combined_weather_df = combined_weather_df.rename(columns={
    "unit_lat": "latitude",
    "unit_long": "longitude"
})

In [0]:
export_df = spark.createDataFrame(combined_weather_df)

export_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("default.weatherHistory")

**Output Table**

In [0]:
%sql
SELECT * FROM weatherHistory ORDER BY datetime DESC

datetime,temperature_2m,precipitation,rain,wind_speed_10m,soil_temperature_0_to_7cm,soil_moisture_0_to_7cm,PlantID,Unit,latitude,longitude
2025-09-08T23:00:00.000Z,24.0195,0.0,0.0,4.445672,24.5195,0.314,IDN04016,Unit_16,-2.866134,104.816345
2025-09-08T23:00:00.000Z,24.0195,0.0,0.0,4.445672,24.5195,0.314,IDN04013,Unit_13,-2.833569,104.829788
2025-09-08T23:00:00.000Z,24.8195,0.4,0.4,3.9966483,25.619501,0.46,MY0206,Unit_6,2.874,101.318859
2025-09-08T23:00:00.000Z,24.513,0.1,0.1,3.362677,25.413,0.46,MY0205,Unit_5,2.9405,101.399143
2025-09-08T23:00:00.000Z,24.0195,0.0,0.0,4.445672,24.5195,0.314,IDN04015,Unit_15,-2.853213,104.836627
2025-09-08T23:00:00.000Z,24.8585,0.4,0.4,3.9966483,25.6585,0.46,MY0102,Unit_2,2.8454,101.32249
2025-09-08T23:00:00.000Z,24.8195,0.4,0.4,3.9966483,25.619501,0.46,MY0101,Unit_1,2.8809,101.352748
2025-09-08T23:00:00.000Z,24.013,0.0,0.0,4.445672,24.513,0.314,IDN0309,Unit_9,-2.829389,104.818639
2025-09-08T23:00:00.000Z,23.9935,0.0,0.0,5.1826253,24.793499,0.313,IDN03011,Unit_11,-2.810772,104.806566
2025-09-08T23:00:00.000Z,24.4805,0.1,0.1,3.362677,25.380499,0.46,MY0104,Unit_4,2.8865,101.382602


**1 Minute**
