In [None]:
# Configure shared library path
# This allows the notebook to use packages installed in the shared directory
import sys

SITE_SHARED = "/workspace/site-packages-shared"

if SITE_SHARED not in sys.path:
    sys.path.append(SITE_SHARED)
    
print(f"✓ Shared library path configured: {SITE_SHARED}")


# Terra Climate Demonstration Notebook - Snowflake Edition

**Modified for Snowflake Notebook Deployment**

This notebook demonstrates how to access the TerraClimate dataset within Snowflake using PyPI-enabled UDFs and Anaconda packages. TerraClimate is a dataset of monthly climate and climatic water balance for global terrestrial surfaces from 1958 to the present.

**Key Changes from Local Version:**
- Uses Snowflake UDFs with PyPI packages for data acquisition
- Leverages Anaconda packages for data processing
- Stores outputs to Snowflake stages
- Uses Snowpark for data manipulation

**Prerequisites:**
1. Run `snowflake_setup.sql` to create UDFs and infrastructure
2. PyPI repository access granted to your role
3. Connected to TERRACLIMATE_DB database

## 1. Setup and Connect to Snowflake

In [None]:
# Import Snowflake libraries
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, avg, min as min_, max as max_, count
from snowflake.snowpark.types import *

# Get the active Snowflake session
session = get_active_session()

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')

print("✓ Connected to Snowflake")
print(f"  Current database: {session.get_current_database()}")
print(f"  Current schema: {session.get_current_schema()}")
print(f"  Current warehouse: {session.get_current_warehouse()}")

In [None]:
# Set context (if not already set)
session.sql("USE DATABASE TERRACLIMATE_DB").collect()
session.sql("USE SCHEMA CLIMATE_DATA").collect()

print("✓ Context set to TERRACLIMATE_DB.CLIMATE_DATA")

## 2. Import Python Dependencies

Using Anaconda packages available in Snowflake

In [None]:
# Import common data science and visualization libraries (from Anaconda)
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.cm import jet
import json

# Configure matplotlib for inline plotting
plt.rcParams['figure.figsize'] = (12, 6)

print("✓ Libraries imported successfully")
print(f"  NumPy version: {np.__version__}")
print(f"  Pandas version: {pd.__version__}")

## 3. Access TerraClimate Collection Metadata

Using UDF that connects to Planetary Computer via PyPI packages

In [None]:
# Call the UDF to get metadata
metadata_df = session.sql("SELECT get_terraclimate_metadata() as metadata").collect()
metadata = json.loads(metadata_df[0]['METADATA'])

print("TerraClimate Collection Metadata:")
print("=" * 50)
print(f"ID: {metadata.get('id')}")
print(f"Title: {metadata.get('title')}")
print(f"\nDescription: {metadata.get('description', 'N/A')[:200]}...")
print(f"\nLicense: {metadata.get('license')}")
print(f"\nSpatial Extent: {metadata.get('spatial_extent')}")
print(f"Temporal Extent: {metadata.get('temporal_extent')}")
print(f"\nAvailable Assets: {metadata.get('assets')}")

## 4. Get Zarr Asset Information

In [None]:
# Get Zarr asset information
asset_df = session.sql("SELECT get_zarr_asset_info() as asset_info").collect()
asset_info = json.loads(asset_df[0]['ASSET_INFO'])

print("Zarr Asset Information:")
print("=" * 50)
print(f"Data URL: {asset_info.get('href')}")
print(f"\nTitle: {asset_info.get('title')}")
print(f"\nDescription: {asset_info.get('description', 'N/A')}")
print(f"\nExtra Fields: {json.dumps(asset_info.get('extra_fields', {}), indent=2)}")

## 5. Prepare Data Access Parameters

Define the region and time period of interest (southeastern Australia, 2017-2019)

In [None]:
# Define parameters for our analysis
start_date = "2017-11-01"
end_date = "2019-11-01"

# Southeastern Australia bounding box
min_lon = 139.94
max_lon = 151.48
min_lat = -39.74
max_lat = -30.92

print("Analysis Parameters:")
print("=" * 50)
print(f"Time Period: {start_date} to {end_date}")
print(f"Region: Southeastern Australia")
print(f"  Longitude: {min_lon}° to {max_lon}°")
print(f"  Latitude: {min_lat}° to {max_lat}°")

In [None]:
# Use UDF to prepare data access
access_query = f"""
SELECT prepare_terraclimate_access(
    '{start_date}',
    '{end_date}',
    {min_lon},
    {max_lon},
    {min_lat},
    {max_lat}
) as access_info
"""

access_df = session.sql(access_query).collect()
access_info = json.loads(access_df[0]['ACCESS_INFO'])

print("\nData Access Information:")
print(json.dumps(access_info, indent=2))

## 6. Data Description and Available Variables

**Available TerraClimate Variables:**

| Variable | Description | Units |
|----------|-------------|-------|
| `aet` | Actual Evapotranspiration | mm |
| `def` | Climate Water Deficit | mm |
| `pet` | Potential Evapotranspiration | mm |
| `ppt` | Precipitation | mm |
| `q` | Runoff | mm |
| `soil` | Soil Moisture | mm |
| `srad` | Downward Surface Shortwave Radiation | W/m² |
| `swe` | Snow Water Equivalent | mm |
| `tmax` | Maximum Temperature | °C |
| `tmin` | Minimum Temperature | °C |
| `vap` | Vapor Pressure | kPa |
| `vpd` | Vapor Pressure Deficit | kPa |
| `ws` | Wind Speed | m/s |

## 7. Working with Sample Data

**Note:** Direct xarray loading from external URLs in Snowflake notebooks requires network access configuration. For demonstration purposes, we'll work with sample data structure.

**Alternative Approach:** Load pre-processed data from Snowflake stages or tables.

In [None]:
# Create sample data structure to demonstrate the workflow
# In production, this would come from actual TerraClimate data loaded into Snowflake

# Generate sample dates
dates = pd.date_range(start=start_date, end=end_date, freq='MS')  # Monthly start

# Create sample climate data
np.random.seed(42)
sample_data = pd.DataFrame({
    'time': dates,
    'tmax': 15 + 10 * np.sin(np.arange(len(dates)) * 2 * np.pi / 12) + np.random.randn(len(dates)) * 2,
    'ppt': 50 + 30 * np.cos(np.arange(len(dates)) * 2 * np.pi / 12) + np.random.randn(len(dates)) * 10,
    'soil': 150 + 50 * np.cos(np.arange(len(dates)) * 2 * np.pi / 12 + np.pi/2) + np.random.randn(len(dates)) * 20,
    'srad': 200 + 50 * np.sin(np.arange(len(dates)) * 2 * np.pi / 12) + np.random.randn(len(dates)) * 15,
    'vap': 1.5 + 0.5 * np.sin(np.arange(len(dates)) * 2 * np.pi / 12) + np.random.randn(len(dates)) * 0.2
})

print("Sample Data Structure (mimicking TerraClimate format):")
print("=" * 50)
print(sample_data.head(10))
print(f"\nTotal records: {len(sample_data)}")
print(f"Date range: {sample_data['time'].min()} to {sample_data['time'].max()}")

## 8. Upload Sample Data to Snowflake Table

Store the data in Snowflake for analysis

**Important Notes:**
- **Column names**: Pandas DataFrame columns (lowercase) are converted to uppercase to avoid Snowflake quoted identifier issues
- **Date types**: Datetime columns are explicitly cast to DATE/TIMESTAMP types for date functions like DATE_TRUNC

In [None]:
# Convert time column to string format for Snowflake
sample_data['time'] = sample_data['time'].dt.strftime('%Y-%m-%d')

# Rename columns to uppercase to match Snowflake convention
# This prevents quoted identifier issues
sample_data.columns = sample_data.columns.str.upper()

# Convert to Snowpark DataFrame and save to table
snowpark_df = session.create_dataframe(sample_data)

# Write to a temporary table for demonstration
table_name = "SAMPLE_CLIMATE_TIME_SERIES"
snowpark_df.write.mode("overwrite").save_as_table(table_name)

# Cast the TIME column to proper DATE type
cast_query = f"""
CREATE OR REPLACE TABLE {table_name} AS
SELECT 
    TO_DATE(TIME) as TIME,
    TMAX,
    PPT,
    SOIL,
    SRAD,
    VAP
FROM {table_name}
"""
session.sql(cast_query).collect()

print(f"✓ Data uploaded to table: {table_name}")
print(f"✓ Columns created with uppercase names (TIME, TMAX, PPT, SOIL, SRAD, VAP)")
print(f"✓ TIME column cast to DATE type")

# Verify the data
result = session.table(table_name).count()
print(f"✓ Record count in table: {result}")

## 9. Exploring the Data - Temperature Analysis

Plot mean temperature over the region demonstrating seasonal variation

In [None]:
# Retrieve data for plotting
temp_df = session.table(table_name).select("TIME", "TMAX").to_pandas()

# Plot temperature time series
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(temp_df['TIME'], temp_df['TMAX'], linewidth=2, color='#FF6B35', marker='o', markersize=4)
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Maximum Temperature (°C)', fontsize=12)
ax.set_title('Mean Maximum Temperature Over Time - Southeastern Australia', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print(f"Temperature Statistics:")
print(f"  Mean: {temp_df['TMAX'].mean():.2f}°C")
print(f"  Min: {temp_df['TMAX'].min():.2f}°C")
print(f"  Max: {temp_df['TMAX'].max():.2f}°C")

## 10. Precipitation Analysis

Plot monthly accumulated precipitation

In [None]:
# Retrieve precipitation data
precip_df = session.table(table_name).select("TIME", "PPT").to_pandas()

# Plot precipitation time series
fig, ax = plt.subplots(figsize=(12, 6))
ax.bar(precip_df['TIME'], precip_df['PPT'], color='#4ECDC4', width=20)
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Precipitation (mm)', fontsize=12)
ax.set_title('Monthly Accumulated Precipitation - Southeastern Australia', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3, axis='y')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print(f"Precipitation Statistics:")
print(f"  Mean: {precip_df['PPT'].mean():.2f} mm")
print(f"  Min: {precip_df['PPT'].min():.2f} mm")
print(f"  Max: {precip_df['PPT'].max():.2f} mm")
print(f"  Total: {precip_df['PPT'].sum():.2f} mm")

## 11. Soil Moisture Analysis

Calculate and visualize soil moisture statistics

In [None]:
# Calculate soil moisture statistics using Snowpark
soil_stats = session.table(table_name).select(
    avg(col("SOIL")).alias("avg_soil"),
    min_(col("SOIL")).alias("min_soil"),
    max_(col("SOIL")).alias("max_soil"),
    count(col("SOIL")).alias("count_soil")
).collect()

print("Soil Moisture Statistics:")
print("=" * 50)
print(f"  Mean: {soil_stats[0]['AVG_SOIL']:.2f} mm")
print(f"  Min: {soil_stats[0]['MIN_SOIL']:.2f} mm")
print(f"  Max: {soil_stats[0]['MAX_SOIL']:.2f} mm")
print(f"  Records: {soil_stats[0]['COUNT_SOIL']}")

# Plot soil moisture over time
soil_df = session.table(table_name).select("TIME", "SOIL").to_pandas()

fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(soil_df['TIME'], soil_df['SOIL'], linewidth=2, color='#8B4513', marker='s', markersize=4)
ax.axhline(y=soil_stats[0]['AVG_SOIL'], color='red', linestyle='--', label=f"Mean: {soil_stats[0]['AVG_SOIL']:.1f} mm")
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Soil Moisture (mm)', fontsize=12)
ax.set_title('Soil Moisture Over Time - Southeastern Australia', fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## 12. Calculate Aggregate Statistics Using Snowpark SQL

In [None]:
# Use SQL to calculate comprehensive statistics
stats_query = f"""
SELECT 
    'Temperature (°C)' as variable,
    ROUND(AVG(TMAX), 2) as mean_value,
    ROUND(MIN(TMAX), 2) as min_value,
    ROUND(MAX(TMAX), 2) as max_value,
    ROUND(STDDEV(TMAX), 2) as std_dev
FROM {table_name}
UNION ALL
SELECT 
    'Precipitation (mm)',
    ROUND(AVG(PPT), 2),
    ROUND(MIN(PPT), 2),
    ROUND(MAX(PPT), 2),
    ROUND(STDDEV(PPT), 2)
FROM {table_name}
UNION ALL
SELECT 
    'Soil Moisture (mm)',
    ROUND(AVG(SOIL), 2),
    ROUND(MIN(SOIL), 2),
    ROUND(MAX(SOIL), 2),
    ROUND(STDDEV(SOIL), 2)
FROM {table_name}
UNION ALL
SELECT 
    'Solar Radiation (W/m²)',
    ROUND(AVG(SRAD), 2),
    ROUND(MIN(SRAD), 2),
    ROUND(MAX(SRAD), 2),
    ROUND(STDDEV(SRAD), 2)
FROM {table_name}
UNION ALL
SELECT 
    'Vapor Pressure (kPa)',
    ROUND(AVG(VAP), 2),
    ROUND(MIN(VAP), 2),
    ROUND(MAX(VAP), 2),
    ROUND(STDDEV(VAP), 2)
FROM {table_name}
"""

stats_df = session.sql(stats_query).to_pandas()
print("\nComprehensive Climate Statistics:")
print("=" * 80)
print(stats_df.to_string(index=False))

## 13. Export Results to Snowflake Stage

Save processed data to Snowflake stage (replaces local GeoTIFF export)

In [None]:
# Create a summary table with aggregated results
summary_table = "CLIMATE_SUMMARY_RESULTS"

# Note: TIME column is now properly cast as DATE type, so DATE_TRUNC works correctly
summary_query = f"""
CREATE OR REPLACE TABLE {summary_table} AS
SELECT 
    DATE_TRUNC('MONTH', TIME) as month,
    AVG(TMAX) as avg_max_temp,
    AVG(PPT) as avg_precipitation,
    AVG(SOIL) as avg_soil_moisture,
    AVG(SRAD) as avg_solar_radiation,
    AVG(VAP) as avg_vapor_pressure
FROM {table_name}
GROUP BY DATE_TRUNC('MONTH', TIME)
ORDER BY month
"""

session.sql(summary_query).collect()
print(f"✓ Summary results saved to table: {summary_table}")

# Display the summary
summary_result = session.table(summary_table).to_pandas()
print("\nMonthly Summary:")
print(summary_result)

In [None]:
# Export summary data to Snowflake stage as Parquet
stage_path = "@TERRACLIMATE_STAGE"

# Export using COPY INTO command
export_query = f"""
COPY INTO {stage_path}/summary_results/
FROM {summary_table}
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE
"""

result = session.sql(export_query).collect()
print(f"✓ Data exported to stage: {stage_path}/summary_results/")
print(f"  Rows exported: {result[0][0]}")

# List files in stage
list_query = f"LIST {stage_path}/summary_results/"
files = session.sql(list_query).collect()
print("\nFiles in stage:")
for file in files:
    print(f"  - {file['name']}")

## 14. Summary and Next Steps

### What We Accomplished:
✅ Connected to TerraClimate via PyPI-enabled UDFs  
✅ Retrieved collection metadata and asset information  
✅ Processed climate data using Snowpark and Anaconda packages  
✅ Created visualizations of temperature, precipitation, and soil moisture  
✅ Calculated comprehensive statistics using SQL  
✅ Exported results to Snowflake stages for persistence  

### Key Differences from Local Deployment:
- **Data Access**: Uses UDFs with PyPI packages instead of direct API calls
- **Processing**: Leverages Snowpark for distributed computing
- **Storage**: Uses Snowflake stages instead of local files
- **Packages**: Combines PyPI (via UDFs) and Anaconda packages

### Next Steps for Production Use:
1. **Full Data Loading**: Implement complete xarray data loading via UDFs
2. **Spatial Processing**: Add support for actual spatial data processing
3. **GeoTIFF Export**: Create UDF to generate GeoTIFF files in stages
4. **Scheduled Updates**: Set up tasks to refresh data periodically
5. **Advanced Analytics**: Integrate with Snowflake ML for climate modeling

In [None]:
# Verify available Anaconda packages for spatial operations
packages_query = """
SELECT package_name, version 
FROM information_schema.packages 
WHERE language = 'python' 
AND package_name IN ('numpy', 'pandas', 'xarray', 'matplotlib', 'rasterio', 'rioxarray')
ORDER BY package_name
"""

available_packages = session.sql(packages_query).to_pandas()
print("Available Anaconda Packages for Spatial Analysis:")
print("=" * 50)
print(available_packages.to_string(index=False))