In [None]:
!pip install pandas statsmodels pymysql sqlalchemy ipython-sql --quiet

In [None]:
# Import libraries
import pandas as pd
import numpy as np
import mysql.connector
from mysql.connector import Error
from datetime import datetime, timezone
import warnings
import re
from pathlib import Path
import time
import json
import random
import logging
import requests
from sqlalchemy import text
from typing import Optional, Dict
warnings.filterwarnings('ignore')

# Display settings
pd.set_option('display.max_columns', 20)
pd.set_option('display.max_rows', 50)
pd.set_option('display.width', None)

print("Libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")

In [None]:
# Database connection configuration
# Update these values if your setup is different

DB_CONFIG = {
    "host": "localhost",      
    "port": 3306,
    "user": "mfre521d_user",
    "password": "mfre521d_user_pw",
    "database": "mfre521d",
}

def get_connection():
    """Create and return a database connection."""
    return mysql.connector.connect(**DB_CONFIG)

def read_table(query):
    """Execute a SQL query and return results as DataFrame."""
    conn = get_connection()
    try:
        df = pd.read_sql(query, conn)
        return df
    finally:
        conn.close()

# Test connection
try:
    conn = get_connection()
    print("Connected to MySQL successfully!")
    conn.close()
except Error as e:
    print(f"Error: {e}")
    print("Make sure your Docker container is running.")


---
### Part A

```
┌─────────────────────────────────────────────────────────────┐
│                     SOURCE FILES / APIs                     │
│             country_centroids.csv source file               │
│     https://open-meteo.com/en/docs/historical-weather-api   │
└─────────────────────────┬───────────────────────────────────┘
                          │ Extract (no changes)
                          ▼
┌─────────────────────────────────────────────────────────────┐
│                      RAW LAYER                              │
│   - Exact copy of source                                    │
│   - All data as strings                                     │
│   - Add: meta data, extraction timestamp, row number        │
│   - Never modify this layer                                 │
└─────────────────────────┬───────────────────────────────────┘
                          │ Transform
                          ▼
┌─────────────────────────────────────────────────────────────┐
│                    CLEANED LAYER                            │
│   - Correct data types                                      │
│   - Standardized formats                                    │
│   - Nulls handled consistently                              │
│   - Validated                                               │
└─────────────────────────────────────────────────────────────┘
                          │ Load 
                          ▼
┌─────────────────────────────────────────────────────────────┐
│                    CLEANED LAYER                            │
│   - weather_daily                                           │
│   - weather_daily_clean                                     │
│   - Inserts, commits, errors                                │
└─────────────────────────────────────────────────────────────┘
```

In [None]:
# Read countries table
df_countries = read_table("SELECT * FROM countries")
print(f"Countries: {len(df_countries)} rows")

# Read crop_production table
df_crops_clean = read_table("SELECT * FROM crop_production")
print(f"Crop Production: {len(df_crops_clean)} rows")

# Read temperature_anomalies table
df_temp_clean = read_table("SELECT * FROM temperature_anomalies")
print(f"Temperature Anomalies: {len(df_temp_clean)} rows")

print("\nData loaded successfully!")

In [None]:
# View sample of each table
print("=" * 60)
print("COUNTRIES TABLE")
print("=" * 60)
df_countries.head()


In [None]:
print("=" * 60)
print("CROP PRODUCTION TABLE")
print("=" * 60)
df_crops_clean.head()


In [None]:
print("=" * 60)
print("TEMPERATURE ANOMALIES TABLE")
print("=" * 60)
df_temp_clean.head()

In [None]:
# Print the data types
print(df_crops_clean.dtypes)
# Print info (includes non-null counts)
df_crops_clean.info()

In [None]:
# Print the data types
print(df_temp_clean.dtypes)

# Print info
df_temp_clean.info()

In [None]:
# Rename 'december' to 'dec' 
df_temp = df_temp_clean.rename(columns={'december': 'dec'})
df_temp.info()

df_temp.to_csv("temperature_anomalies.csv", index=False)

In [None]:
from pathlib import Path
from datetime import datetime

# ---- Part E: Data Lineage ----

def create_lineage_readme(source_file, raw_file, cleaned_file, transformations):
    readme = f"""# Data Lineage Documentation

Generated: {datetime.now().isoformat()}

## Source
- File: {source_file}

## Raw Layer
- File: {raw_file}
- Contains exact copy of source with metadata columns added

## Cleaned Layer
- File: {cleaned_file}

## Transformations Applied
"""
    for t in transformations:
        readme += f"- {t}\n"
    return readme


# Transformations Applied for crop production data
transformations_crop = [
    "Converted all column names to lowercase",
    "Stripped whitespace from text columns",
    "Converted Year from string to integer",
    "Replaced missing codes (NA, .., -, N/A) with NULL",
    "Created a country_id column by mapping country names to IDs",
    "Inserted cleaned data into MySQL crop_production table",
]

readme_crop = create_lineage_readme(
    "crop_production_1990_2023.csv",
    "Not saved, dataframe only",
    "Not saved, dataframe only",
    transformations_crop
)

# Transformations Applied for temperature anomalies data
transformations_temp = [
    "Converted all column names to lowercase",
    "Stripped whitespace from text columns",
    "Created country_std for mapping to countries table",
    "Converted numeric columns to float (handled European decimals)",
    "Converted Year from string to integer",
    "Converted monthly anomaly columns to float (handled European decimals)",
    "Renamed month columns to match DB schema (e.g., 'january' to 'jan')",
    "Replaced missing codes (NA, .., -, N/A) with NULL",
    "Inserted cleaned data into MySQL temperature_anomalies table",
]

readme_temp = create_lineage_readme(
    "temperature_anomalies_1990_2023.csv",
    "Not saved, dataframe only",
    "Not saved, dataframe only",
    transformations_temp
)

# Create directory FIRST
Path("data/cleaned").mkdir(parents=True, exist_ok=True)

#  Write files ONCE
with open("data/cleaned/README_CROP.md", "w", encoding="utf-8") as f:
    f.write(readme_crop)

with open("data/cleaned/README_TEMP.md", "w", encoding="utf-8") as f:
    f.write(readme_temp)

print("Wrote:")
print("- data/cleaned/README_CROP.md")
print("- data/cleaned/README_TEMP.md")


In [None]:
from pathlib import Path
print("CWD:", Path.cwd())
print("data/ exists?", (Path.cwd() / "data").exists())
print("Datasets/ exists?", (Path.cwd() / "Datasets").exists())

In [None]:
# Task Two: Implement a Python-based ETL pipeline that extracts weather data from the Open-Meteo API and loads it into your MySQL database.
# API Configuration
API_BASE_URL = "https://archive-api.open-meteo.com/v1/archive"
RATE_LIMIT_DELAY = 5  # seconds between requests (minimum per assignment)
MAX_RETRIES = 3
BACKOFF_FACTOR = 2  # exponential backoff multiplier

# Data extraction parameters
START_DATE = "2015-01-01"
END_DATE = "2023-12-31"

# Weather variables to extract 
WEATHER_VARIABLES = [
    "temperature_2m_mean",
    "temperature_2m_max",
    "temperature_2m_min",
    "precipitation_sum",
    "rain_sum",
    "et0_fao_evapotranspiration"
]

print("Configuration loaded.")

In [None]:
# Part F: Country centroids for API requests
# These are approximate geographic centers of each country

COUNTRY_CENTROIDS = {
    # North America
    'USA': {'name': 'United States', 'lat': 39.8283, 'lon': -98.5795, 'hemisphere': 'Northern'},
    'CAN': {'name': 'Canada', 'lat': 56.1304, 'lon': -106.3468, 'hemisphere': 'Northern'},
    'MEX': {'name': 'Mexico', 'lat': 23.6345, 'lon': -102.5528, 'hemisphere': 'Northern'},
    
    # South America
    'BRA': {'name': 'Brazil', 'lat': -14.2350, 'lon': -51.9253, 'hemisphere': 'Southern'},
    'ARG': {'name': 'Argentina', 'lat': -38.4161, 'lon': -63.6167, 'hemisphere': 'Southern'},
    
    # Europe
    'DEU': {'name': 'Germany', 'lat': 51.1657, 'lon': 10.4515, 'hemisphere': 'Northern'},
    'FRA': {'name': 'France', 'lat': 46.2276, 'lon': 2.2137, 'hemisphere': 'Northern'},
    'GBR': {'name': 'United Kingdom', 'lat': 55.3781, 'lon': -3.4360, 'hemisphere': 'Northern'},
    'UKR': {'name': 'Ukraine', 'lat': 48.3794, 'lon': 31.1656, 'hemisphere': 'Northern'},
    'RUS': {'name': 'Russia', 'lat': 55.7558, 'lon': 37.6173, 'hemisphere': 'Northern'},
    'TUR': {'name': 'Turkey', 'lat': 38.9637, 'lon': 35.2433, 'hemisphere': 'Northern'},
    
    # Asia
    'CHN': {'name': 'China', 'lat': 35.8617, 'lon': 104.1954, 'hemisphere': 'Northern'},
    'IND': {'name': 'India', 'lat': 20.5937, 'lon': 78.9629, 'hemisphere': 'Northern'},
    'JPN': {'name': 'Japan', 'lat': 36.2048, 'lon': 138.2529, 'hemisphere': 'Northern'},
    'KOR': {'name': 'South Korea', 'lat': 35.9078, 'lon': 127.7669, 'hemisphere': 'Northern'},
    'IDN': {'name': 'Indonesia', 'lat': -0.7893, 'lon': 113.9213, 'hemisphere': 'Southern'},
    'THA': {'name': 'Thailand', 'lat': 15.8700, 'lon': 100.9925, 'hemisphere': 'Northern'},
    'VNM': {'name': 'Vietnam', 'lat': 14.0583, 'lon': 108.2772, 'hemisphere': 'Northern'},
    'PAK': {'name': 'Pakistan', 'lat': 30.3753, 'lon': 69.3451, 'hemisphere': 'Northern'},
    'BGD': {'name': 'Bangladesh', 'lat': 23.6850, 'lon': 90.3563, 'hemisphere': 'Northern'},
    'MMR': {'name': 'Myanmar', 'lat': 21.9162, 'lon': 95.9560, 'hemisphere': 'Northern'},
    'PHL': {'name': 'Philippines', 'lat': 12.8797, 'lon': 121.7740, 'hemisphere': 'Northern'},
    'NPL': {'name': 'Nepal', 'lat': 28.3949, 'lon': 84.1240, 'hemisphere': 'Northern'},
    
    # Africa
    'EGY': {'name': 'Egypt', 'lat': 26.8206, 'lon': 30.8025, 'hemisphere': 'Northern'},
    'NGA': {'name': 'Nigeria', 'lat': 9.0820, 'lon': 8.6753, 'hemisphere': 'Northern'},
    'ETH': {'name': 'Ethiopia', 'lat': 9.1450, 'lon': 40.4897, 'hemisphere': 'Northern'},
    'ZAF': {'name': 'South Africa', 'lat': -30.5595, 'lon': 22.9375, 'hemisphere': 'Southern'},
    'KEN': {'name': 'Kenya', 'lat': -0.0236, 'lon': 37.9062, 'hemisphere': 'Southern'},
    'TZA': {'name': 'Tanzania', 'lat': -6.3690, 'lon': 34.8888, 'hemisphere': 'Southern'},
    'MAR': {'name': 'Morocco', 'lat': 31.7917, 'lon': -7.0926, 'hemisphere': 'Northern'},
    'UGA': {'name': 'Uganda', 'lat': 1.3733, 'lon': 32.2903, 'hemisphere': 'Northern'},
    'MWI': {'name': 'Malawi', 'lat': -13.2543, 'lon': 34.3015, 'hemisphere': 'Southern'},
    'MLI': {'name': 'Mali', 'lat': 17.5707, 'lon': -3.9962, 'hemisphere': 'Northern'},
    
    # Oceania
    'AUS': {'name': 'Australia', 'lat': -25.2744, 'lon': 133.7751, 'hemisphere': 'Southern'},
}

print(f"Loaded {len(COUNTRY_CENTROIDS)} country centroids.")

In [None]:
# Save country centroids to CSV for reference
centroids_df = pd.DataFrame([
    {'iso3_code': k, 'country_name': v['name'], 'latitude': v['lat'], 
     'longitude': v['lon'], 'hemisphere': v['hemisphere']}
    for k, v in COUNTRY_CENTROIDS.items()
])

centroids_df.to_csv('country_centroids.csv', index=False)
print("Country centroids saved to country_centroids.csv")
centroids_df.head(10)

In [None]:
# Setup logging
def setup_logging(log_file='etl_pipeline.log'):
    """
    Configure logging for the ETL pipeline.
    Logs to both file and console.
    """
    # Remove existing handlers
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s | %(levelname)-8s | %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
        handlers=[
            logging.FileHandler(log_file, mode='a'),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)

logger = setup_logging()
logger.info("Logging initialized")

In [None]:
# Part G: Implement rate limiting to avoid exceeding API quotas (minimum 5 second delay between requests)
class RateLimiter:
    """
    Rate limiter to ensure minimum delay between API requests.
    Implements adaptive delay on rate limit errors.
    """
    
    def __init__(self, min_delay: float = 5.0):
        self.min_delay = min_delay
        self.current_delay = min_delay
        self.last_request_time = 0
        self.request_count = 0
    
    def wait(self):
        """Wait if needed to respect rate limit."""
        elapsed = time.time() - self.last_request_time
        if elapsed < self.current_delay:
            sleep_time = self.current_delay - elapsed
            time.sleep(sleep_time)
        self.last_request_time = time.time()
        self.request_count += 1
    
    def increase_delay(self):
        """Increase delay on rate limit error."""
        self.current_delay = min(self.current_delay * 2, 30.0)
        logger.warning(f"Rate limit hit, increased delay to {self.current_delay}s")
    
    def reset_delay(self):
        """Reset delay after successful request."""
        self.current_delay = self.min_delay


rate_limiter = RateLimiter(min_delay=RATE_LIMIT_DELAY)
print(f"Rate limiter initialized with {RATE_LIMIT_DELAY}s minimum delay")

In [None]:
# Part H: Handle API errors gracefully with retry logic (at least 3 retries with exponential backoff)
def fetch_weather_data(iso3_code: str, lat: float, lon: float,
                       start_date: str, end_date: str,
                       max_retries: int = MAX_RETRIES) -> Optional[Dict]:
    """
    Fetch weather data from Open-Meteo API with retry logic + exponential backoff.
    Enforces >= 5s delay between requests via RateLimiter.
    """
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start_date,
        "end_date": end_date,
        "daily": ",".join(WEATHER_VARIABLES),
        "timezone": "UTC"
    }

    retryable_codes = {429, 500, 502, 503, 504}

    for attempt in range(1, max_retries + 1):
        try:
            rate_limiter.wait()

            # Make request
            response = requests.get(API_BASE_URL, params=params, timeout=30)

            if response.status_code == 200:
                rate_limiter.reset_delay()
                logger.info(f"{iso3_code}: Success")
                return response.json()

            if response.status_code in retryable_codes:
                if response.status_code == 429:
                    rate_limiter.increase_delay()

                wait_time = (BACKOFF_FACTOR ** (attempt - 1)) + random.random()
                logger.warning(
                    f"{iso3_code}: HTTP {response.status_code}, retry {attempt}/{max_retries} in {wait_time:.1f}s"
                )
                time.sleep(wait_time)
                continue

            logger.error(f"{iso3_code}: HTTP {response.status_code}, not retrying")
            return None

        except requests.exceptions.Timeout:
            wait_time = (BACKOFF_FACTOR ** (attempt - 1)) + random.random()
            logger.warning(f"{iso3_code}: Timeout, retry {attempt}/{max_retries} in {wait_time:.1f}s")
            time.sleep(wait_time)

        except requests.exceptions.RequestException as e:
            logger.error(f"{iso3_code}: Request failed: {e}")
            return None

    logger.error(f"{iso3_code}: All {max_retries} retries exhausted")
    return None
print("Fetch function defined with retry logic and exponential backoff")


In [None]:
#Part J: Logging all extraction activities

def transform_weather_data(json_data, country_name, logger=None):
    """
    Transform Open-Meteo daily JSON into normalized rows for WEATHER_VARIABLES table.
    Returns a list of daily records (dicts).
    """
    try:
        daily = json_data["daily"]
        dates = daily["time"]

        tmax = daily.get("temperature_2m_max", [])
        tmin = daily.get("temperature_2m_min", [])
        tmean = daily.get("temperature_2m_mean", [])
        prcp = daily.get("precipitation_sum", [])
        rain = daily.get("rain_sum", [])
        et0  = daily.get("et0_fao_evapotranspiration", [])

        rows = []
        for i, d in enumerate(dates):
            rows.append({
                "country_name": country_name,
                "date": d,

                "temperature_2m_mean": tmean[i] if i < len(tmean) else None,
                "temperature_2m_max":  tmax[i]  if i < len(tmax)  else None,
                "temperature_2m_min":  tmin[i]  if i < len(tmin)  else None,
                "precipitation_sum":   prcp[i]  if i < len(prcp)  else None,
                "rain_sum":            rain[i]  if i < len(rain)  else None,
                "et0_fao_evapotranspiration": et0[i] if i < len(et0) else None,
            })

        return rows

    except KeyError as e:
        if logger:
            logger.error(f"{country_name}: Missing key in JSON data: {e}")
        return []

In [None]:
# Defining the upsert function

def upsert_daily_weather(rows, logger=None):
    """
    Loads transformed weather records into the daily_weather table (MySQL).
    Uses UPSERT on (country_name, date).
    """
    if not rows:
        return

    sql = """
    INSERT INTO daily_weather (
        country_name, date,
        temperature_2m_mean, temperature_2m_max, temperature_2m_min,
        precipitation_sum, rain_sum, et0_fao_evapotranspiration
    )
    VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
    ON DUPLICATE KEY UPDATE
        temperature_2m_mean = VALUES(temperature_2m_mean),
        temperature_2m_max  = VALUES(temperature_2m_max),
        temperature_2m_min  = VALUES(temperature_2m_min),
        precipitation_sum   = VALUES(precipitation_sum),
        rain_sum            = VALUES(rain_sum),
        et0_fao_evapotranspiration = VALUES(et0_fao_evapotranspiration),
        updated_at = CURRENT_TIMESTAMP;
    """


    values = [
        (
            r.get("country_name"),
            r.get("date"),
            r.get("temperature_2m_mean"),
            r.get("temperature_2m_max"),
            r.get("temperature_2m_min"),
            r.get("precipitation_sum"),
            r.get("rain_sum"),
            r.get("et0_fao_evapotranspiration"),
        )
        for r in rows
    ]

    conn = get_connection()
    cur = conn.cursor()
    try:
        cur.executemany(sql, values)
        conn.commit()
    except Exception as e:
        if logger:
            logger.error(f"Failed to load data: {e}")
        raise
    finally:
        cur.close()
        conn.close()

In [None]:
import mysql.connector, time
t0 = time.time()
conn = mysql.connector.connect(**DB_CONFIG, connection_timeout=5)
print("connected in", round(time.time()-t0, 2), "seconds")
conn.close()


In [None]:
def execute_sql(query, params=None, fetch=False):
    """
    Execute a single SQL statement in MySQL.
    - If fetch=True, returns fetched results (list of tuples).
    - Otherwise commits the change and returns None.
    NOTE: This helper executes ONE statement at a time.
    """
    conn = get_connection()
    cursor = conn.cursor()
    try:
        cursor.execute(query, params or ())
        if fetch:
            return cursor.fetchall()
        conn.commit()
    finally:
        cursor.close()
        conn.close()


In [None]:
# Dropping the table if it exists
execute_sql("DROP TABLE IF EXISTS daily_weather;")

# Creating the daily_weather table to extract the weather variables
execute_sql("""
CREATE TABLE daily_weather (
    country_name VARCHAR(100) NOT NULL,
    date DATE NOT NULL,

    temperature_2m_mean DOUBLE,
    temperature_2m_max DOUBLE,
    temperature_2m_min DOUBLE,
    precipitation_sum DOUBLE,
    rain_sum DOUBLE,
    et0_fao_evapotranspiration DOUBLE,

    source VARCHAR(50) NOT NULL DEFAULT 'open-meteo-archive',
    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    PRIMARY KEY (country_name, date)
);
""")


In [None]:
#Part J: Logging all extraction activities including successes, failures and record counts
for iso3_code, v in COUNTRY_CENTROIDS.items():
    country_name = v["name"]
    lat, lon = v["lat"], v["lon"]

    for year in range(2015, 2024):
        start_date = f"{year}-01-01"
        end_date = f"{year}-12-31"

        logger.info(f"{country_name}: Extraction started for {start_date} to {end_date}")

        json_data = fetch_weather_data(iso3_code, lat, lon, start_date, end_date)

        if json_data is None:
            logger.error(f"{country_name}: Extraction failed (no JSON returned) | {year}")
            continue

        rows = transform_weather_data(json_data, country_name)
        record_count = len(rows)

        if record_count == 0:
            logger.error(f"{country_name}: Extraction failed (0 records) | {year}")
            continue

        upsert_daily_weather(rows, logger=logger)
        logger.info(f"{country_name}: Loaded {record_count} records to database")
        logger.info(f"{country_name}: Extraction successful | year={year} | records_extracted={record_count}")

In [None]:
#checking to see the data saved in the table
progress = execute_sql("""
SELECT country_name, COUNT(*) as rows_saved
FROM daily_weather
GROUP BY country_name
""", fetch=True)

progress

In [None]:
# Making the pipeline idempotent

class DataStore:
    def __init__(self):
        self.data = {}
        self.insert_count = 0
        self.skip_count = 0

    def upsert(self, key, record):
        if key in self.data:
            self.data[key] = record
            self.skip_count += 1
            return 'updated'
        else:
            self.data[key] = record
            self.insert_count += 1
            return 'inserted'

    def get_stats(self):
        return {
            'total_records': len(self.data),
            'inserted': self.insert_count,
            'updated': self.skip_count
        }

# Now it exists
store = DataStore()
print("DataStore initialized")


In [None]:
# Proving idempotency
store = DataStore()

def run_once(country_name, lat, lon, year):
    start_date = f"{year}-01-01"
    end_date   = f"{year}-12-31"

    json_data = fetch_weather_data(country_name, lat, lon, start_date, end_date)
    if json_data is None:
        return 0

    rows = transform_weather_data(json_data, country_name)  # returns list of daily dicts
    for r in rows:
        key = (r["country_name"], r["date"])   # idempotent key
        store.upsert(key, r)

    return len(rows)

# picking one country from the COUNTRY_CENTROIDS dict
v = COUNTRY_CENTROIDS["CAN"]  
country_name, lat, lon = v["name"], v["lat"], v["lon"]

# Run 1
n1 = run_once(country_name, lat, lon, 2019)
stats1 = store.get_stats()
print("Run 1 extracted:", n1, "| stats:", stats1)

# Run 2 (same exact request again)
n2 = run_once(country_name, lat, lon, 2019)
stats2 = store.get_stats()
print("Run 2 extracted:", n2, "| stats:", stats2)

# Sanity checks
assert stats2["total_records"] == stats1["total_records"], "NOT idempotent: total_records increased"
assert stats2["inserted"] == stats1["inserted"], "NOT idempotent: inserted increased on rerun"
assert stats2["updated"] >= stats1["updated"], "Expected updates on rerun"
print("✅ DataStore idempotency test PASSED on real data")


# Sanity Checks

In [None]:
# Running ETL twice and comparing rows
rows_before = execute_sql("SELECT COUNT(*) FROM daily_weather", fetch=True)[0][0]
 
rows_after  = execute_sql("SELECT COUNT(*) FROM daily_weather", fetch=True)[0][0]

print("Before:", rows_before)
print("After:", rows_after)


In [None]:
# Verifying if there are zero duplicates
duplicates = execute_sql("""
SELECT country_name, date, COUNT(*) AS n
FROM daily_weather
GROUP BY country_name, date
HAVING n > 1
""", fetch=True)

print(duplicates)

In [None]:
# Part L: Creating aggregated view of monthly weather summaries by country

execute_sql("DROP VIEW IF EXISTS monthly_weather;")

execute_sql("""
CREATE VIEW monthly_weather AS
SELECT
    country_name,
    YEAR(date)  AS year,
    MONTH(date) AS month,

    -- Basic monthly summaries
    AVG(temperature_2m_mean) AS avg_temp_mean_c,
    AVG(temperature_2m_max)  AS avg_temp_max_c,
    AVG(temperature_2m_min)  AS avg_temp_min_c,

    SUM(precipitation_sum) AS total_precip_mm,
    SUM(rain_sum)          AS total_rain_mm,
    SUM(et0_fao_evapotranspiration) AS total_et0_mm,

    COUNT(*) AS n_days,

    -- Derived: Growing Degree Days (base 10C)
    SUM(CASE
        WHEN temperature_2m_mean IS NULL THEN 0
        WHEN temperature_2m_mean > 10 THEN (temperature_2m_mean - 10)
        ELSE 0
    END) AS gdd_base10,

    -- Derived: Extreme temperature days
    SUM(CASE WHEN temperature_2m_max > 35 THEN 1 ELSE 0 END) AS extreme_hot_days_gt35c,
    SUM(CASE WHEN temperature_2m_min < 0  THEN 1 ELSE 0 END) AS freeze_days_lt0c,

    -- Derived: Precipitation variability (CV^2)
    CASE
        WHEN AVG(precipitation_sum) IS NULL OR AVG(precipitation_sum) = 0 THEN NULL
        ELSE (AVG(precipitation_sum * precipitation_sum) - POW(AVG(precipitation_sum), 2))
             / POW(AVG(precipitation_sum), 2)
    END AS precip_variability_cv2

FROM daily_weather
GROUP BY country_name, YEAR(date), MONTH(date);
""")


In [None]:
# Part M: Creating an aggregated view of yearly variables

execute_sql("DROP VIEW IF EXISTS annual_weather;")

execute_sql("""
CREATE VIEW annual_weather AS
SELECT
    country_name,
    YEAR(date) AS year,

    -- Basic annual summaries
    AVG(temperature_2m_mean) AS avg_temp_mean_c,
    AVG(temperature_2m_max)  AS avg_temp_max_c,
    AVG(temperature_2m_min)  AS avg_temp_min_c,

    SUM(precipitation_sum) AS total_precip_mm,
    SUM(rain_sum)          AS total_rain_mm,
    SUM(et0_fao_evapotranspiration) AS total_et0_mm,

    COUNT(*) AS n_days,

    -- Derived: Growing Degree Days (base 10C)
    SUM(CASE
        WHEN temperature_2m_mean IS NULL THEN 0
        WHEN temperature_2m_mean > 10 THEN (temperature_2m_mean - 10)
        ELSE 0
    END) AS gdd_base10,

    -- Derived: Extreme temperature days
    SUM(CASE WHEN temperature_2m_max > 35 THEN 1 ELSE 0 END) AS extreme_hot_days_gt35c,
    SUM(CASE WHEN temperature_2m_min < 0  THEN 1 ELSE 0 END) AS freeze_days_lt0c,

    -- Derived: Precipitation variability (CV^2)
    CASE
        WHEN AVG(precipitation_sum) IS NULL OR AVG(precipitation_sum) = 0 THEN NULL
        ELSE (AVG(precipitation_sum * precipitation_sum) - POW(AVG(precipitation_sum), 2))
             / POW(AVG(precipitation_sum), 2)
    END AS precip_variability_cv2

FROM daily_weather
GROUP BY country_name, YEAR(date);
""")

In [None]:
# Part N: Creating an integrated view that combines weather data with crop production data (MySQL)

execute_sql("DROP VIEW IF EXISTS country_year_weather_crop;")

execute_sql("""
CREATE VIEW country_year_weather_crop AS
SELECT
    c.country_id,
    c.country_name,
    c.iso3_code,
    c.region,
    c.income_group,

    cp.year,
    cp.crop,
    cp.yield_kg_ha,
    cp.production_tonnes,
    cp.area_harvested_ha,
    cp.fertilizer_use_kg_ha,
    cp.irrigation_pct,

    aw.avg_temp_mean_c,
    aw.avg_temp_max_c,
    aw.avg_temp_min_c,
    aw.total_precip_mm,
    aw.total_rain_mm,
    aw.total_et0_mm,
    aw.gdd_base10,
    aw.extreme_hot_days_gt35c,
    aw.freeze_days_lt0c,
    aw.precip_variability_cv2

FROM crop_production cp
JOIN countries c
    ON c.country_id = cp.country_id
JOIN annual_weather aw
    ON aw.country_name = c.country_name
   AND aw.year = cp.year;
""")


In [None]:
# Checking the integrated view (top 10 rows)
read_table("""
SELECT *
FROM country_year_weather_crop
LIMIT 10;
""")


# Question 1: Indentifying Climate Vulnerable Crops

In [None]:
#Filtering for the major cereals
selected_crops = """
SELECT *
FROM country_year_weather_crop
WHERE crop IN ('Wheat','Rice','Maize','Soybeans');
"""

df_cereals = read_table(selected_crops)

df_cereals = df_cereals.dropna(subset=[
    "yield_kg_ha",
    "avg_temp_mean_c",
    "precip_variability_cv2",
    "fertilizer_use_kg_ha",
    "irrigation_pct"
])

df_cereals["year"] = df_cereals["year"].astype(int)
df_cereals.head()

df_cereals.to_csv("final_analysis_dataset.csv", index=False)

In [None]:
#Using country and year fixed effects to know how much yield falls when temperature rises
import statsmodels.formula.api as smf

results = []

for crop in ["Wheat", "Rice", "Maize", "Soybeans"]:
    df = df_cereals[df_cereals["crop"] == crop].copy()

    # OLS with controls + country FE + year FE
    model = smf.ols(
        "yield_kg_ha ~ avg_temp_mean_c + precip_variability_cv2 + total_precip_mm "
        "+ fertilizer_use_kg_ha + irrigation_pct + C(country_name) + C(year)",
        data=df
    ).fit(cov_type="HC1")

    results.append({
        "crop": crop,
        "n": int(model.nobs),
        "beta_temp": model.params.get("avg_temp_mean_c"),
        "p_temp": model.pvalues.get("avg_temp_mean_c"),
        "beta_precip_var": model.params.get("precip_variability_cv2"),
        "p_precip_var": model.pvalues.get("precip_variability_cv2"),
    })

res = pd.DataFrame(results)
res


Country and year fixed effects are included to isolate the impact of climate variability on crop yields from confounding factors. Country fixed effects control for time-invariant characteristics such as soil quality, geography, agricultural institutions, and baseline technology, ensuring that identification comes from changes in climate conditions within a country over time rather than differences across countries. Year fixed effects account for global shocks and trends that affect all countries simultaneously, including technological progress, input price fluctuations, and worldwide climate phenomena.

In [None]:
#Creating the ranked vulnerability assessment

# Strongest negative response to temperature increases
temp_rank = res.sort_values("beta_temp", ascending=True)[["crop","n","beta_temp","p_temp"]]

# Most sensitive to precipitation variability
precip_rank = res.sort_values("beta_precip_var", ascending=True)[["crop","n","beta_precip_var","p_precip_var"]]

temp_rank, precip_rank


Using country- and year-fixed effects regressions that control for fertilizer intensity and irrigation, we find substantial heterogeneity in crop yield responses to climate conditions. Among the four major cereals examined, soybeans exhibit the strongest negative response to temperature increases, with estimated yields declining by approximately 78 kg/ha for a one-unit increase in average temperature. Rice also shows a negative temperature response (−57 kg/ha), though the magnitude is smaller than for soybeans. In contrast, wheat and maize do not display strong negative temperature sensitivity in this specification, suggesting greater tolerance to gradual warming. With respect to precipitation variability, maize is the most climate-vulnerable crop, experiencing yield reductions of roughly 5.7 kg/ha as rainfall becomes more volatile, followed closely by wheat. Rice and soybeans appear comparatively resilient to precipitation variability, consistent with the prevalence of irrigation and adaptive water management in their production systems. While coefficient estimates are not statistically precise, the relative magnitudes provide a clear ranking of climate vulnerability across crops.

## Extension: Do vulnerabilities differ by income groups?

In [None]:
ext_results = []

income_groups = sorted(df_cereals["income_group"].dropna().unique())

for income in income_groups:
    for crop in ["Wheat", "Rice", "Maize", "Soybeans"]:
        df = df_cereals[(df_cereals["income_group"] == income) & (df_cereals["crop"] == crop)].copy()

        # Skip tiny samples (prevents unstable results / errors)
        if df.shape[0] < 30:
            continue

        model = smf.ols(
            "yield_kg_ha ~ avg_temp_mean_c + precip_variability_cv2 + total_precip_mm "
            "+ fertilizer_use_kg_ha + irrigation_pct + C(country_name) + C(year)",
            data=df
        ).fit(cov_type="HC1")

        ext_results.append({
            "income_group": income,
            "crop": crop,
            "n": int(model.nobs),
            "beta_temp": model.params.get("avg_temp_mean_c"),
            "p_temp": model.pvalues.get("avg_temp_mean_c"),
            "beta_precip_var": model.params.get("precip_variability_cv2"),
            "p_precip_var": model.pvalues.get("precip_variability_cv2"),
        })

ext_res = pd.DataFrame(ext_results)
ext_res


Climate vulnerability varies substantially across income groups, underscoring the role of adaptive capacity in mediating climate impacts. In high-income countries, maize yields exhibit a positive and statistically significant response to temperature increases, suggesting that technological inputs, irrigation, and management practices can offset or even reverse warming effects. In contrast, maize displays large and negative temperature responses in lower-middle-income and upper-middle-income countries, indicating heightened vulnerability in these settings. Precipitation variability further exacerbates yield losses for maize across income categories, with particularly strong negative effects in upper-middle-income countries. Rice shows comparatively greater resilience to rainfall variability but remains sensitive to temperature increases in low-income countries. Overall, these results suggest that income-related differences in adaptation capacity play a critical role in shaping crop-level climate vulnerability, and that maize-focused adaptation strategies are especially urgent in lower- and upper-middle-income countries.

# Question 3

In [None]:
# Load integrated dataset from MySQL view

import numpy as np
import pandas as pd

# Pull the integrated view created earlier (country + crop + yearly weather + inputs)
df = read_table("SELECT * FROM country_year_weather_crop;")

# Basic cleaning
df = df.copy()
df["year"] = df["year"].astype(int)

# Keep only rows where key outcomes exist
df = df.dropna(subset=[
    "country_name", "crop", "year",
    "yield_kg_ha",
    "avg_temp_mean_c",
    "total_precip_mm"
])


df.head(), df.shape


In [None]:
# Check duplicates at the key level
dup = df.duplicated(subset=["country_name", "year", "crop"]).sum()
print("Duplicate country-year-crop rows:", dup)

# If duplicates exist, keep one row (or average)
df = df.groupby(["country_name", "year", "crop"], as_index=False).mean(numeric_only=True)\
       .merge(df[["country_name","year","crop","iso3_code","region","income_group"]].drop_duplicates(),
              on=["country_name","year","crop"], how="left")


In [None]:
# Country-level climate variability (across years)

import numpy as np
import pandas as pd

def safe_cv(x):
    x = pd.Series(x).dropna()
    if len(x) < 3:
        return np.nan
    m = x.mean()
    if m == 0:
        return np.nan
    return x.std(ddof=1) / m

country_climate = (
    df.groupby("country_name")
      .agg(
          temp_sd=("avg_temp_mean_c", lambda s: s.std(ddof=1)),
          temp_cv=("avg_temp_mean_c", safe_cv),
          precip_sd=("total_precip_mm", lambda s: s.std(ddof=1)),
          precip_cv=("total_precip_mm", safe_cv),
          hot_days_mean=("extreme_hot_days_gt35c", "mean"),
          freeze_days_mean=("freeze_days_lt0c", "mean"),
          precip_intra_cv=("precip_variability_cv2", "mean"),
          n_years=("year", "nunique")
      )
      .reset_index()
)

#  Use a smaller threshold so we don't drop all countries
country_climate = country_climate[country_climate["n_years"] >= 5].copy()

# Quick checks
country_climate.shape, country_climate["n_years"].describe()


In [None]:
# Yield stability (country–crop yield variability)

import numpy as np
import pandas as pd

def safe_cv(x):
    x = pd.Series(x).dropna()
    if len(x) < 3:
        return np.nan
    m = x.mean()
    if m == 0:
        return np.nan
    return x.std(ddof=1) / m

yield_stability = (
    df.groupby(["country_name", "crop"])
      .agg(
          yield_mean=("yield_kg_ha", "mean"),
          yield_sd=("yield_kg_ha", lambda s: s.std(ddof=1)),
          yield_cv=("yield_kg_ha", safe_cv),
          n_years=("year", "nunique"),
          region=("region", lambda s: s.dropna().mode().iloc[0] if len(s.dropna()) else np.nan),
          income_group=("income_group", lambda s: s.dropna().mode().iloc[0] if len(s.dropna()) else np.nan),
      )
      .reset_index()
)

# Keep only country–crop pairs with enough years
yield_stability = yield_stability[yield_stability["n_years"] >= 5].copy()

yield_stability.sort_values("yield_cv").head(10)


In [None]:

# Merge climate exposure with yield stability


q3 = (
    yield_stability
      .merge(country_climate, on="country_name", how="inner")
)

q3.shape


In [None]:
# Define resilience (median-based classification)


# Climate exposure measure
q3["climate_exposure"] = (
    (q3["temp_sd"] - q3["temp_sd"].mean()) / q3["temp_sd"].std(ddof=1) +
    (q3["precip_cv"] - q3["precip_cv"].mean()) / q3["precip_cv"].std(ddof=1)
)

# Cutoffs
climate_cut = q3["climate_exposure"].median()
yield_cut   = q3["yield_cv"].median()

# Classification
q3["resilience_class"] = "Other"

q3.loc[
    (q3["climate_exposure"] >= climate_cut) &
    (q3["yield_cv"] <= yield_cut),
    "resilience_class"
] = "RESILIENT"

q3.loc[
    (q3["climate_exposure"] >= climate_cut) &
    (q3["yield_cv"] > yield_cut),
    "resilience_class"
] = "VULNERABLE"

q3["resilience_class"].value_counts()


In [None]:
# Visualize resilience vs vulnerability

import matplotlib.pyplot as plt

plt.figure(figsize=(9,6))

for label, dsub in q3.groupby("resilience_class"):
    plt.scatter(
        dsub["climate_exposure"],
        dsub["yield_cv"],
        alpha=0.65,
        label=label
    )

plt.axvline(climate_cut, linestyle="--", linewidth=1)
plt.axhline(yield_cut, linestyle="--", linewidth=1)

plt.xlabel("Climate Exposure Index (higher = more variable climate)")
plt.ylabel("Yield Variability (CV of yield)")
plt.title("Q3: Resilient vs Vulnerable Agricultural Systems")
plt.legend()

plt.savefig("resilient_vs_vulnerable.png", dpi=250, bbox_inches="tight")

plt.show()


This figure plots yield stability against climate exposure. Systems in the bottom-right quadrant are classified as resilient because they maintain stable yields despite experiencing high climate variability. In contrast, systems in the top-right quadrant face similar climate stress but show high yield volatility, indicating vulnerability.

In [None]:
# Compare drivers of resilience


drivers_summary = (
    q3[q3["resilience_class"].isin(["RESILIENT", "VULNERABLE"])]
      .groupby("resilience_class")
      .agg(
          hot_days_mean=("hot_days_mean", "mean"),
          freeze_days_mean=("freeze_days_mean", "mean"),
          precip_intra_cv=("precip_intra_cv", "mean"),
          avg_yield_cv=("yield_cv", "mean")
      )
)

drivers_summary


Resilient and vulnerable systems experience similar levels of heat exposure and precipitation variability. However, vulnerable systems face substantially higher exposure to freezing temperatures, which is associated with significantly greater yield volatility. This suggests that resilience is driven less by reduced climate exposure and more by the ability to manage specific climate stresses, particularly cold extremes.

In [None]:
# Case studies (top examples)


resilient_cases = (
    q3[q3["resilience_class"]=="RESILIENT"]
      .sort_values(["climate_exposure","yield_cv"], ascending=[False, True])
      [["country_name","crop","region","income_group",
        "climate_exposure","yield_cv",
        "hot_days_mean","freeze_days_mean","precip_intra_cv"]]
      .head(5)
)

vulnerable_cases = (
    q3[q3["resilience_class"]=="VULNERABLE"]
      .sort_values(["climate_exposure","yield_cv"], ascending=[False, False])
      [["country_name","crop","region","income_group",
        "climate_exposure","yield_cv",
        "hot_days_mean","freeze_days_mean","precip_intra_cv"]]
      .head(5)
)

resilient_cases, vulnerable_cases

This table presents representative case studies of resilient and vulnerable agricultural systems identified in the analysis. The selected systems experience high levels of climate exposure, as reflected by elevated values of the climate exposure index, but differ markedly in their yield stability.

Resilient systems, such as rice and wheat production in Morocco and wheat production in Australia, maintain relatively low yield variability despite substantial climate stress. These systems face frequent extreme heat events and, in some cases, notable precipitation variability, yet their yields remain stable. This suggests the presence of effective adaptation mechanisms, including appropriate crop selection, management practices, and technological capacity.

In contrast, vulnerable systems—such as maize and soybean production in Morocco and maize and wheat production in Egypt—exhibit significantly higher yield variability under similar levels of climate exposure. These systems are more sensitive to climate extremes, indicating limited adaptive capacity or greater crop-specific vulnerability.

Overall, the comparison highlights that resilience is not driven by reduced exposure to climate variability, but rather by the ability of agricultural systems to manage climate stress and stabilize production outcomes.

# Question 4: Production Trends and Future Outlook

In [None]:
# Create dataframe for Q4
df_4=df

In [None]:
# Select baseline period and create subset based on those years

BASE_START, BASE_END = 2015, 2023

baseline = df_4[(df_4["year"] >= BASE_START) & (df_4["year"] <= BASE_END)].copy()

use_fallback = baseline.empty

# Define grouping levels
group_keys = ["country_id", "crop"]

# Take mean of yearly to establish baseline for each country_crop combination
if not use_fallback:
    base_stats = baseline.groupby(group_keys).agg(
        base_temp=("avg_temp_mean_c","mean"),
        base_precip=("total_precip_mm","mean"),
        base_gdd=("gdd_base10","mean"),
        base_hot=("extreme_hot_days_gt35c","mean"),
        base_freeze=("freeze_days_lt0c","mean"),
    ).reset_index()
else:
    base_stats = df_4.groupby(group_keys).agg(
        base_temp=("avg_temp_mean_c","mean"),
        base_precip=("total_precip_mm","mean"),
        base_gdd=("gdd_base10","mean"),
        base_hot=("extreme_hot_days_gt35c","mean"),
        base_freeze=("freeze_days_lt0c","mean"),
    ).reset_index()

# Merge new baseline values into original dataframe
df_4 = df_4.merge(base_stats, on=group_keys, how="left")

# Create anomoly columns by subtracting baseline values from observed values
df_4["temp_anom"]   = df_4["avg_temp_mean_c"] - df_4["base_temp"]
df_4["precip_anom"] = df_4["total_precip_mm"] - df_4["base_precip"]
df_4["gdd_anom"]    = df_4["gdd_base10"] - df_4["base_gdd"]
df_4["hot_anom"]    = df_4["extreme_hot_days_gt35c"] - df_4["base_hot"]
df_4["freeze_anom"] = df_4["freeze_days_lt0c"] - df_4["base_freeze"]

In [None]:
#Create log yield and associated regression
df_4["ln_yield"] = np.log(df_4["yield_kg_ha"])

def run_crop_model(data):
    d = data.dropna(subset=[
        "ln_yield",
        "temp_anom",
        "precip_anom",
        "irrigation_pct"
    ])
    
    if d["country_id"].nunique() < 10:
        return None
    
    formula = """
    ln_yield ~ temp_anom
             + precip_anom
             + irrigation_pct
             + temp_anom:irrigation_pct
             + C(country_id)
             + C(year)
    """
    
    model_4 = smf.ols(
        formula,
        data=d
    ).fit(
        cov_type="cluster",
        cov_kwds={"groups": d["country_id"]}
    )
    
    return model_4


In [None]:
crop_models = {}

for crop, g in df_4.groupby("crop"):
    m = run_crop_model(g)
    if m is not None:
        crop_models[crop] = m

len(crop_models)

In [None]:
rows = []

for crop, m in crop_models.items():
    rows.append({
        "crop": crop,
        "beta_temp": m.params["temp_anom"],
        "beta_precip": m.params["precip_anom"],
        "beta_irrig": m.params["irrigation_pct"],
        "beta_temp_x_irrig": m.params["temp_anom:irrigation_pct"],
        "n_obs": int(m.nobs)
    })

climate_results = pd.DataFrame(rows)
climate_results.sort_values("beta_temp")

In [None]:
RECENT_START, RECENT_END = 2015, 2023

recent = df_4[(df_4["year"] >= RECENT_START) & 
              (df_4["year"] <= RECENT_END)].copy()

In [None]:
# Define function to estimate slope
def trend_slope(group, ycol):
    g = group.dropna(subset=[ycol, "year"])
    if len(g) < 8:
        return np.nan
    m = smf.ols(f"{ycol} ~ year", data=g).fit()
    return m.params["year"]

In [None]:
# Compute slope for each country_crop
rows = []

for (cid, crop), g in recent.groupby(["country_id","crop"]):
    slope = trend_slope(g, "ln_yield")
    rows.append({
        "country_id": cid,
        "country_name": g["country_name"].iloc[0],
        "iso3_code": g["iso3_code"].iloc[0],
        "region": g["region"].iloc[0],
        "income_group": g["income_group"].iloc[0],
        "crop": crop,
        "yield_growth_rate": slope
    })

yield_trends = pd.DataFrame(rows)
yield_trends["yield_pct_per_year"] = 100 * yield_trends["yield_growth_rate"]
yield_trends.head()

In [None]:
# Define criteria for stagnation and decline
STAGNATION = 2
DECLINE    = 0.0

yield_trends["yield_flag"] = np.select(
    [
        yield_trends["yield_pct_per_year"] < DECLINE,
        yield_trends["yield_pct_per_year"] < STAGNATION
    ],
    ["declining","stagnating"],
    default="growing"
)

In [None]:
# View worst performers
yield_trends.sort_values("yield_pct_per_year").head(20)

In [None]:
# Summarize results by region
region_summary = (
    yield_trends
      .groupby(["region","crop","yield_flag"])
      .size()
      .reset_index(name="n")
)

display(region_summary)

In [None]:
# Create log values for area harvested and production
df_4["ln_area"] = np.log(df_4["area_harvested_ha"])
df_4["ln_prod"] = np.log(df_4["production_tonnes"])

In [None]:
RECENT_START, RECENT_END = 2015, 2023
recent = df_4[(df_4["year"] >= RECENT_START) & (df_4["year"] <= RECENT_END)].copy()

In [None]:
def endpoint_change(g, col):
    g = g.sort_values("year")
    return g.iloc[-1][col] - g.iloc[0][col]

rows = []

for (cid, crop), g in recent.groupby(["country_id","crop"]):
    g = g.dropna(subset=["ln_yield","ln_area","ln_prod"])
    if g["year"].nunique() < 2:
        continue
    
    dln_y = endpoint_change(g, "ln_yield")
    dln_a = endpoint_change(g, "ln_area")
    dln_p = endpoint_change(g, "ln_prod")
    
    rows.append({
        "country_id": cid,
        "country_name": g["country_name"].iloc[0],
        "iso3_code": g["iso3_code"].iloc[0],
        "region": g["region"].iloc[0],
        "income_group": g["income_group"].iloc[0],
        "crop": crop,
        "dln_yield": 100 * dln_y,
        "dln_area":  100 * dln_a,
        "dln_prod":  100 * dln_p,
        "identity_gap": 100 * (dln_p - (dln_y + dln_a)),
    })

decomp = pd.DataFrame(rows)
decomp.head()

In [None]:
# Summarize by region
region_decomp = (
    decomp.groupby(["region","crop"], as_index=False)
          .agg(
              avg_dln_yield=("dln_yield","mean"),
              avg_dln_area=("dln_area","mean"),
              avg_dln_prod=("dln_prod","mean"),
              n=("country_id","nunique")
          )
)

display(region_decomp.sort_values(["crop","avg_dln_prod"]))

In [None]:
# Identify stagnating yields and growing harvest areas
decomp_with_flags = decomp.merge(
    yield_trends[["country_id","crop","yield_flag","yield_pct_per_year"]],
    on=["country_id","crop"],
    how="left"
)

area_driven = decomp_with_flags[
    (decomp_with_flags["yield_flag"].isin(["stagnating","declining"])) &
    (decomp_with_flags["dln_prod"] > 0) &
    (decomp_with_flags["dln_area"] > 0)
].sort_values("dln_area", ascending=False)

display(area_driven.head(25))

In [None]:
# Calculate recent production growth rates
def prod_trend_slope(group):
    g = group.dropna(subset=["ln_prod","year"])
    if len(g) < 5:
        return np.nan
    m = smf.ols("ln_prod ~ year", data=g).fit()
    return m.params["year"]

In [None]:
# Production growth by country_crop
rows = []

for (cid, crop), g in recent.groupby(["country_id","crop"]):
    slope = prod_trend_slope(g)
    rows.append({
        "country_id": cid,
        "country_name": g["country_name"].iloc[0],
        "iso3_code": g["iso3_code"].iloc[0],
        "region": g["region"].iloc[0],
        "income_group": g["income_group"].iloc[0],
        "crop": crop,
        "prod_growth_rate": slope
    })

prod_trends = pd.DataFrame(rows)
prod_trends["prod_pct_per_year"] = 100 * prod_trends["prod_growth_rate"]
prod_trends.head()

In [None]:
# Identify future shortfall risk
prod_trends["prod_flag"] = np.select(
    [
        prod_trends["prod_pct_per_year"] < 0,
        prod_trends["prod_pct_per_year"] < 2
    ],
    ["declining","stagnating"],
    default="growing"
)

In [None]:
# Identify risks by region_crop

region_risk = (
    prod_trends
      .groupby(["region","crop","prod_flag"])
      .size()
      .reset_index(name="n")
)

display(region_risk)

In [None]:
# Combine yield production/yield/harvest analysis with climate sensitivity dataframe
high_risk = prod_trends.merge(
    yield_trends[["country_id","crop","yield_flag"]],
    on=["country_id","crop"]
).merge(
    climate_results[["crop","beta_temp"]],
    on="crop"
)

high_risk = high_risk[
    (high_risk["prod_flag"] != "growing") &
    (high_risk["yield_flag"] != "growing") &
    (high_risk["beta_temp"] < 0)
]

high_risk.sort_values("prod_pct_per_year").head(25)

In [None]:
# High risk regions and crops

risk_plot = high_risk.groupby(["region","crop"]).size().reset_index(name="n")

plt.figure(figsize=(10,6))

sns.barplot(
    data=risk_plot,
    x="region",
    y="n",
    hue="crop"
)

plt.ylabel("Number of high-risk country–crop pairs")
plt.title("Regions Facing Production Shortfalls")

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

Southeast Asia and Sub-Saharan Africa face the highest production shortfalls at present, with lower levels of risk in Central Ameriaca, East Asia, Europe, North Africa, North America, South America, and South Asia.  The Middle East and Oceania do not currently have any high-risk country_crop pairs.

In [None]:
# Plot distribution of yield growth across regions

import seaborn as sns

plt.figure(figsize=(10,6))

sns.boxplot(
    data=yield_trends,
    x="region",
    y="yield_pct_per_year"
)

plt.axhline(0, color="red", linestyle="--", label="Zero growth")
plt.axhline(2, color="gray", linestyle=":", label="Stagnation threshold")

plt.ylabel("Yield growth (% per year)")
plt.xlabel("Region")
plt.title("Distribution of Yield Growth Rates by Region (2015–2023)")
plt.legend()

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

Except for N America,the Middle East and Oceania, all regions have at least one countriey with zero growth or stagnating growth.

Only Central America has a mean yield growth that is below zero growth.  The 25th percentile of South Asia, Southeast Asia, Sub-Saharan Africa all lie at or below zero growth.

Median yield growth in South Asia, Southeast Asia, Sub-Saharan Africa, South America, and North Africa are all in stagnation (greater than 0 and less than 2%).

In [None]:
# Create 10 year horizon and index yields for analysis

HORIZON = 10

proj = yield_trends.copy()

proj["yield_index_0"] = 100

proj["yield_index_future"] = (
    proj["yield_index_0"]
    * (1 + proj["yield_pct_per_year"] / 100) ** HORIZON
)


In [None]:
# Identify criteria to flag declining and stagnating yields

proj["yield_risk_flag"] = np.select( [ proj["yield_index_future"] < 95, proj["yield_index_future"] < 105 ], ["declining","stagnating"], default="growing" )

In [None]:
region_future_risk = (
    proj
      .groupby(["region","crop","yield_risk_flag"])
      .size()
      .reset_index(name="n")
)

region_future_risk

In [None]:
region_only = (
    region_future_risk
      .groupby(["region","yield_risk_flag"], as_index=False)
      .agg(n=("n","sum"))
)

In [None]:
pivot = region_only.pivot_table(
    index="region",
    columns="yield_risk_flag",
    values="n",
    fill_value=0
)

pivot = pivot[["declining","stagnating"]]
pivot_nonzero = pivot.loc[pivot.sum(axis=1) > 0]

In [None]:
pivot_nonzero.plot(
    kind="bar",
    stacked=True,
    figsize=(10,6)
)

plt.ylabel("Number of country–crop pairs")
plt.xlabel("Region")
plt.title("Projected Yield Risk Under Trend Continuation (10-year horizon)")

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

If trends continue, Sub-Saharan Africa and Southeast Asia both have 9 country_crop pairs that will be in declining or stagnating states. 

On the other end of the spectrum, the Middle East, Oceania and North America would not have any crop_country pairs in decline or stagnating.

In [None]:
# Plot growth decomposed by harvest area expansion

# Compute regional averages
region_decomp_plot = (
    decomp.groupby(["region","crop"], as_index=False)
          .agg(
              yield_contrib=("dln_yield","mean"),
              area_contrib=("dln_area","mean")
          )
)

crop_to_plot = region_decomp_plot["crop"].iloc[0]

plot_df = region_decomp_plot[region_decomp_plot["crop"] == crop_to_plot]

plt.figure(figsize=(10,6))

plt.bar(plot_df["region"], plot_df["yield_contrib"], label="Yield contribution")
plt.bar(
    plot_df["region"],
    plot_df["area_contrib"],
    bottom=plot_df["yield_contrib"],
    label="Area contribution"
)

plt.axhline(0, color="black")
plt.ylabel("Production change (%)")
plt.title(f"Production Growth Decomposition (2015–2023): {crop_to_plot}")
plt.legend()

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

Expanding or contracting the area harvested is a significant driver of production growth or loss, as shown by the orange above.

In [None]:
# Climate sensitivity by crop
plt.figure(figsize=(8,6))

sns.barplot(
    data=climate_results.sort_values("beta_temp"),
    x="beta_temp",
    y="crop"
)

plt.axvline(0, color="black", linestyle="--")

plt.xlabel("Effect of +1°C temperature anomaly on yield (log points)")
plt.ylabel("Crop")
plt.title("Temperature Sensitivity of Crop Yields")

plt.tight_layout()
plt.show()

Rice has the greatest negative yield response to a 1-degree rise in temperature (C), followed by maize and soybeans.  Interestingly, the data suggests wheat has a positive yield response to a 1-degree rise in temp (C).

In [None]:
# Compute average yield growth by income group
income_trends = (
    yield_trends
      .groupby(["income_group","crop"], as_index=False)
      .agg(
          mean_yield_growth=("yield_pct_per_year","mean"),
          median_yield_growth=("yield_pct_per_year","median"),
          n=("country_id","nunique")
      )
)

income_trends.sort_values(["crop","mean_yield_growth"])

In [None]:
# Plot average yield growth by income group
plt.figure(figsize=(10,6))

sns.barplot(
    data=income_trends,
    x="income_group",
    y="mean_yield_growth",
    hue="crop"
)

plt.axhline(0, color="black")
plt.ylabel("Mean yield growth (% per year)")
plt.xlabel("Income group")
plt.title("Average Yield Growth by Development Stage")

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Set up data to plot annual yield by income group
income_yields = (
    df_4
      .groupby(["income_group","year"], as_index=False)
      .agg(
          median_yield=("yield_kg_ha","median")
      )
)

In [None]:
income_wide = income_yields.pivot(
    index="year",
    columns="income_group",
    values="median_yield"
)

In [None]:
# Plot yearly yield by income group
plt.figure(figsize=(9,6))

for col in income_wide.columns:
    plt.plot(
        income_wide.index,
        income_wide[col],
        label=col
    )

plt.ylabel("Median yield (kg per hectare)")
plt.xlabel("Year")
plt.title("Yield Trajectories by Income Group")
plt.legend()

plt.tight_layout()
plt.show()

Median yields are higher as income group progresses -- that is, low income countries have the lowest median yield and high income countries have the highest median yield.

For the most part, high, upper-middle, and lower-middle income group yields move in tandem, except from 2022-2023 where the high income group yields dropped and upper-middle and lower-middle group yields rose.

From 2019 to 2020, the gap between low and lower-middle income groups closed to about 0, before reversing trend with 2023's gap returning to approximately 2018 levels.

In [None]:
# Identify heat sensitive crops
heat_sensitive = climate_results[
    climate_results["beta_temp"] < 0
][["crop","beta_temp"]]

In [None]:
# Regress temperature on year and identify trends by country
def temp_trend(group):
    g = group.dropna(subset=["avg_temp_mean_c","year"])
    if len(g) < 8:
        return np.nan
    m = smf.ols("avg_temp_mean_c ~ year", data=g).fit()
    return m.params["year"]

temp_trends = (
    df_4
      .groupby("country_id")
      .apply(temp_trend)
      .reset_index(name="temp_trend_c_per_year")
)

df_4 = df_4.merge(temp_trends, on="country_id", how="left")

In [None]:
# Merge climate-sensitive crop data with country warming trends
climate_risk = yield_trends.merge(
    heat_sensitive,
    on="crop",
    how="inner"
).merge(
    temp_trends,
    on="country_id",
    how="left"
)

In [None]:
# Identify climate-affected country_crop combinations
affected = climate_risk[
    (climate_risk["yield_flag"].isin(["stagnating","declining"])) &
    (climate_risk["temp_trend_c_per_year"] > 0)
]

In [None]:
# Group by region
affected_region = (
    affected
      .groupby(["region","crop"])
      .size()
      .reset_index(name="n_countries")
      .sort_values("n_countries", ascending=False)
)

affected_region.head(20)

In [None]:
#Plot results
plt.figure(figsize=(10,6))

sns.barplot(
    data=affected_region,
    x="region",
    y="n_countries",
    hue="crop"
)

plt.ylabel("Number of affected countries")
plt.title("Countries Showing Evidence of Climate-Related Yield Impacts")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

South Asia, with 5 country_crop combinations affected by Climate Change (defined as an increase in temperature having a negative impact on yeild), leads all regions, with Central America following with 3 affected country_crop combinations.  Otherwise, climate change affects are largely limited, with Maize impacted in one country in East Asia, North Africa, Europe and Sub-Saharan Africa, and Rice affected in one conutry in Southeast Asia.  Consistent with the above results on how a 1-degree increase affects each crop, negative yields in response to temperature rise are not observed for wheat.