# EarthScope GNSS Stations: Data Processing & Analysis

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/national-data-platform/ep-tutorials/blob/main/examples/earthscope-seismic-streaming.ipynb)

## Why NDP-EP?

This example demonstrates the **real advantage** of using NDP-EP for data management:

- **User A** (first time): Downloads raw CSV from NDP catalog, cleans data, processes by region, stores in NDP-EP
- **User B** (later): Finds processed data in NDP-EP, downloads directly - **much faster!**

### Benefits

| Without NDP-EP | With NDP-EP |
|----------------|-------------|
| Download 150 KB raw CSV | Download 5 KB processed JSON |
| Clean malformed headers | Already cleaned |
| Process 1100+ stations | Already aggregated by region |
| ~10 seconds processing | Instant |

## About EarthScope Data

**EarthScope Consortium** provides high-rate (1Hz) GNSS position data from nearly 1,100 stations across the US. This data supports research on:
- Earthquakes and seismic events
- Volcanic activity
- Ground deformation
- Tectonic plate movement

## Prerequisites

- Access to an NDP-EP API instance
- Valid authentication token

## Setup

In [None]:
!pip install ndp-ep pandas matplotlib folium requests -q

In [None]:
import json
import time
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import folium
from folium.plugins import MarkerCluster
from ndp_ep import Client
import tempfile
import os

## Configuration

In [None]:
# NDP-EP Configuration
API_URL = "http://localhost:8002"  # Change to your API endpoint
TOKEN = "testing_token"            # Change to your token
ORGANIZATION = "earthscope-demo"   # Organization for registering datasets

# Dataset identifiers
DATASET_NAME = "earthscope-stations-processed"
BUCKET_NAME = "seismic-data"
S3_KEY = "earthscope/stations_by_region.json"

# Source data URL (from NDP Global Catalog)
SOURCE_CSV_URL = "https://nationaldataplatform.org/catalog/dataset/811f0bcc-99e5-455c-bcf6-7c63c2634f41/resource/a420cc30-2262-423a-8c63-3ad8d91f2a8f/download/earthscope_converted_data.csv"

# Initialize client
client = Client(base_url=API_URL, token=TOKEN)

# Verify connection
status = client.get_system_status()

print(f"NDP-EP: {status['ep_name']} v{status['api_version']}")
print()
print("Services status:")
print(f"  - Local Catalog: {'✓ Connected' if status.get('backend_connected') else '✗ NOT CONNECTED'}")
print(f"  - S3 Storage:    {'✓ Connected' if status.get('s3_connected') else '✗ NOT CONNECTED'}")

if not status.get('backend_connected'):
    raise RuntimeError("Local catalog is not connected.")
if not status.get('s3_connected'):
    raise RuntimeError("S3 storage is not connected.")

# Ensure organization exists
existing_orgs = client.list_organizations(server="local")
if ORGANIZATION not in existing_orgs:
    print(f"\nCreating organization '{ORGANIZATION}'...")
    client.register_organization({
        "name": ORGANIZATION,
        "title": "EarthScope Demo Organization",
        "description": "Demo organization for EarthScope seismic data"
    })
    print(f"  ✓ Created")

print("\n✓ Ready!")

---

# Step 1: Search for Existing Data

Before downloading anything, let's check if someone has already processed EarthScope station data.

In [None]:
# Search for existing processed data
print("Searching for 'earthscope stations' in NDP-EP local catalog...")
print()

results = client.search_datasets(terms=["earthscope", "stations"], server="local")

# Look for our specific processed dataset
found_dataset = None
for dataset in results:
    if dataset.get('name') == DATASET_NAME:
        found_dataset = dataset
        break

if found_dataset:
    print(f"✓ Found existing dataset: {found_dataset['name']}")
    print(f"  Title: {found_dataset.get('title', 'N/A')}")
    print(f"\n→ Skip to USER B section below!")
    DATA_EXISTS = True
else:
    print("✗ No processed EarthScope station data found in local catalog.")
    print("\n→ Continue with USER A workflow to download and process.")
    DATA_EXISTS = False

---

# USER A: First-Time Processing

**Scenario**: No one has processed EarthScope station data yet. We need to:
1. Download raw CSV from NDP catalog (150 KB, malformed headers)
2. Clean and parse the data
3. Process: aggregate stations by US region
4. Upload processed JSON to NDP-EP S3
5. Register in local catalog for others to find

In [None]:
# Only run if data doesn't exist
if DATA_EXISTS:
    print("⏭ Data already exists. Skip to USER B section.")
else:
    print("="*60)
    print("USER A: Starting full processing workflow")
    print("="*60)
    
    # Start timing
    user_a_start = time.time()

### A.1 Download Raw CSV from NDP Catalog

In [None]:
if not DATA_EXISTS:
    print("Downloading raw CSV from NDP catalog...")
    print(f"Source: {SOURCE_CSV_URL[:60]}...")
    print()
    
    download_start = time.time()
    
    response = requests.get(SOURCE_CSV_URL, timeout=60)
    response.raise_for_status()
    
    download_time = time.time() - download_start
    download_kb = len(response.content) / 1024
    
    print(f"✓ Downloaded {download_kb:.1f} KB in {download_time:.2f}s")
    
    raw_csv = response.text

### A.2 Clean and Parse Data

The raw CSV has malformed headers with units embedded. We need to clean it.

In [None]:
if not DATA_EXISTS:
    print("Cleaning and parsing raw data...")
    process_start = time.time()
    
    # Parse raw CSV - headers are malformed
    # Format: Site,Latitude,(deg),Longitude,(deg),EllipElev,(m),X,(m),Y,(m),Z,(m),...
    lines = raw_csv.strip().split('\n')
    
    # Define correct column names
    columns = [
        'site', 'latitude', 'longitude', 'elevation_m',
        'x_m', 'y_m', 'z_m', 'epoch_yr', 'network',
        'status', 'last_update', 'antenna_height_m', 'antenna_type', 'dome'
    ]
    
    # Parse data rows
    data = []
    for line in lines[1:]:  # Skip header
        parts = line.split(',')
        if len(parts) >= 14:
            try:
                row = {
                    'site': parts[0],
                    'latitude': float(parts[1]),
                    'longitude': float(parts[2]),
                    'elevation_m': float(parts[3]),
                    'x_m': float(parts[4]),
                    'y_m': float(parts[5]),
                    'z_m': float(parts[6]),
                    'epoch_yr': float(parts[7]),
                    'network': parts[8],
                    'status': parts[9],
                    'last_update': parts[10],
                    'antenna_height_m': float(parts[11]) if parts[11] else 0,
                    'antenna_type': parts[12],
                    'dome': parts[13]
                }
                data.append(row)
            except (ValueError, IndexError):
                continue
    
    df = pd.DataFrame(data)
    
    print(f"\n✓ Parsed {len(df)} stations")
    print(f"  Columns: {list(df.columns)}")
    print(f"  Networks: {df['network'].nunique()} unique")
    print(f"  Active stations: {len(df[df['status'] == 'ACTIVE'])}")

### A.3 Process: Aggregate by US Region

In [None]:
if not DATA_EXISTS:
    print("Processing: Aggregating stations by US region...")
    
    # Define US regions by longitude/latitude
    def get_region(lat, lon):
        if lon > -100:  # East of -100
            if lat > 40:
                return 'Northeast'
            else:
                return 'Southeast'
        else:  # West of -100
            if lat > 45:
                return 'Pacific Northwest'
            elif lat > 35:
                return 'California'
            else:
                return 'Southwest'
    
    # Only include US stations (continental)
    df_us = df[(df['latitude'] > 24) & (df['latitude'] < 50) & 
               (df['longitude'] > -125) & (df['longitude'] < -66)].copy()
    
    df_us['region'] = df_us.apply(lambda r: get_region(r['latitude'], r['longitude']), axis=1)
    
    # Aggregate statistics by region
    region_stats = {}
    for region in df_us['region'].unique():
        region_df = df_us[df_us['region'] == region]
        
        # Only include sample of stations (10 per region) to keep JSON small
        sample_stations = region_df[['site', 'latitude', 'longitude', 'elevation_m', 'network', 'status']].head(10).to_dict('records')
        
        region_stats[region] = {
            'station_count': len(region_df),
            'active_count': len(region_df[region_df['status'] == 'ACTIVE']),
            'networks': region_df['network'].unique().tolist(),
            'avg_elevation_m': round(region_df['elevation_m'].mean(), 2),
            'min_elevation_m': round(region_df['elevation_m'].min(), 2),
            'max_elevation_m': round(region_df['elevation_m'].max(), 2),
            'center_lat': round(region_df['latitude'].mean(), 4),
            'center_lon': round(region_df['longitude'].mean(), 4),
            'sample_stations': sample_stations  # Only 10 samples, not all
        }
    
    # Create processed dataset
    processed_data = {
        'metadata': {
            'source': 'EarthScope Consortium',
            'processed_date': time.strftime('%Y-%m-%d'),
            'total_stations': len(df_us),
            'regions': list(region_stats.keys())
        },
        'regions': region_stats
    }
    
    process_time = time.time() - process_start
    
    print(f"\n✓ Processing complete in {process_time:.2f}s")
    print(f"\nRegion summary:")
    for region, stats in region_stats.items():
        print(f"  {region}: {stats['station_count']} stations ({stats['active_count']} active)")

### A.4 Upload to NDP-EP S3

In [None]:
if not DATA_EXISTS:
    print("Uploading processed data to NDP-EP S3...")
    
    # Create bucket
    try:
        client.create_bucket(BUCKET_NAME)
        print(f"  ✓ Created bucket: {BUCKET_NAME}")
    except:
        print(f"  ✓ Bucket exists: {BUCKET_NAME}")
    
    # Convert to JSON and upload
    json_data = json.dumps(processed_data, indent=2)
    processed_kb = len(json_data.encode()) / 1024
    
    upload_start = time.time()
    client.upload_object(BUCKET_NAME, S3_KEY, json_data.encode(), "application/json")
    upload_time = time.time() - upload_start
    
    print(f"  ✓ Uploaded {processed_kb:.1f} KB in {upload_time:.2f}s")

### A.5 Register in Catalog

In [None]:
if not DATA_EXISTS:
    print("Registering dataset in NDP-EP catalog...")
    
    result = client.register_s3_link({
        "resource_name": DATASET_NAME,
        "resource_title": "EarthScope GNSS Stations - Processed by Region",
        "owner_org": ORGANIZATION,
        "s3_bucket": BUCKET_NAME,
        "s3_key": S3_KEY,
        "resource_s3": f"{BUCKET_NAME}/{S3_KEY}",
        "notes": "Pre-processed EarthScope GNSS station data aggregated by US region. Includes station counts, elevation statistics, and network information."
    })
    
    print(f"  ✓ Registered: {result}")
    
    # Total time for User A
    user_a_total = time.time() - user_a_start
    
    print("\n" + "="*60)
    print("USER A SUMMARY")
    print("="*60)
    print(f"  Download raw CSV:    {download_time:.2f}s ({download_kb:.1f} KB)")
    print(f"  Clean & Process:     {process_time:.2f}s")
    print(f"  Upload to NDP-EP:    {upload_time:.2f}s ({processed_kb:.1f} KB)")
    print(f"  ─────────────────────────────")
    print(f"  TOTAL TIME:          {user_a_total:.2f}s")
    print(f"  Data downloaded:     {download_kb:.1f} KB")
    print("="*60)
    
    # Store for comparison
    USER_A_TIME = user_a_total
    USER_A_KB = download_kb

---

# USER B: Fast Access (Data Already in NDP-EP)

**Scenario**: Another user (or the same user later) needs EarthScope station data by region.

Instead of repeating all the work, they:
1. Search NDP-EP → Find existing processed dataset
2. Download processed JSON directly (only ~5 KB!)
3. Analyze immediately - no cleaning needed

In [None]:
print("="*60)
print("USER B: Fast workflow (data already in NDP-EP)")
print("="*60)

user_b_start = time.time()

### B.1 Search Catalog

In [None]:
print("Searching NDP-EP catalog for 'earthscope stations'...")

search_start = time.time()
results = client.search_datasets(terms=["earthscope", "stations"], server="local")
search_time = time.time() - search_start

# Find our dataset
found = None
for dataset in results:
    if dataset.get('name') == DATASET_NAME:
        found = dataset
        break

if found:
    print(f"\n✓ Found in {search_time:.3f}s: {found['name']}")
    print(f"  Title: {found.get('title', 'N/A')}")
else:
    raise RuntimeError("Dataset not found. Run USER A section first.")

### B.2 Download from NDP-EP (Fast!)

In [None]:
print("Downloading processed data from NDP-EP S3...")

download_start = time.time()

# Download directly from NDP-EP S3
data = client.download_object(BUCKET_NAME, S3_KEY)

download_time_b = time.time() - download_start
download_kb_b = len(data) / 1024

# Parse JSON
processed_data = json.loads(data.decode())

print(f"\n✓ Downloaded {download_kb_b:.2f} KB in {download_time_b:.3f}s")
print(f"\nDataset info:")
print(f"  Source: {processed_data['metadata']['source']}")
print(f"  Processed: {processed_data['metadata']['processed_date']}")
print(f"  Total stations: {processed_data['metadata']['total_stations']}")
print(f"  Regions: {processed_data['metadata']['regions']}")

### B.3 Analyze Immediately (No Processing Needed!)

In [None]:
print("Analyzing regional data...\n")

# Data is already aggregated - just display
print(f"{'Region':<20} {'Stations':<10} {'Active':<10} {'Avg Elev (m)':<15} {'Networks'}")
print("-" * 80)

for region, stats in processed_data['regions'].items():
    networks = ', '.join(stats['networks'][:3])
    if len(stats['networks']) > 3:
        networks += f" +{len(stats['networks'])-3} more"
    print(f"{region:<20} {stats['station_count']:<10} {stats['active_count']:<10} {stats['avg_elevation_m']:<15} {networks}")

### B.4 Visualize

In [None]:
print("Creating visualization...")
viz_start = time.time()

# Create map
m = folium.Map(location=[39.0, -98.0], zoom_start=4, tiles='CartoDB positron')

# Color by region
colors = {
    'California': 'red',
    'Pacific Northwest': 'blue',
    'Southwest': 'orange',
    'Northeast': 'green',
    'Southeast': 'purple'
}

# Add stations from processed data
for region, stats in processed_data['regions'].items():
    color = colors.get(region, 'gray')
    
    # Add region center marker
    folium.Marker(
        location=[stats['center_lat'], stats['center_lon']],
        popup=f"<b>{region}</b><br>{stats['station_count']} stations",
        icon=folium.Icon(color=color, icon='info-sign')
    ).add_to(m)
    
    # Add sample station markers
    for station in stats.get('sample_stations', []):
        folium.CircleMarker(
            location=[station['latitude'], station['longitude']],
            radius=3,
            color=color,
            fill=True,
            fillOpacity=0.7,
            popup=f"{station['site']} ({station['network']})"
        ).add_to(m)

# Add legend
legend_html = '''
<div style="position: fixed; bottom: 50px; left: 50px; z-index: 1000; background-color: white; padding: 10px; border-radius: 5px; border: 2px solid gray;">
<b>Regions</b><br>
''' + ''.join([f'<i style="background:{c};width:10px;height:10px;display:inline-block;margin-right:5px;"></i>{r}<br>' for r, c in colors.items()]) + '</div>'

m.get_root().html.add_child(folium.Element(legend_html))

viz_time = time.time() - viz_start
print(f"\n✓ Visualization ready in {viz_time:.2f}s")

In [None]:
# Display map
m

In [None]:
# User B total time
user_b_total = time.time() - user_b_start

print("\n" + "="*60)
print("USER B SUMMARY")
print("="*60)
print(f"  Search catalog:      {search_time:.3f}s")
print(f"  Download from S3:    {download_time_b:.3f}s ({download_kb_b:.2f} KB)")
print(f"  Visualization:       {viz_time:.2f}s")
print(f"  ─────────────────────────────")
print(f"  TOTAL TIME:          {user_b_total:.2f}s")
print(f"  Data downloaded:     {download_kb_b:.2f} KB")
print("="*60)

USER_B_TIME = user_b_total
USER_B_KB = download_kb_b

---

# Comparison: User A vs User B

In [None]:
# Get User A values if available, otherwise use typical values
try:
    ua_time = USER_A_TIME
    ua_kb = USER_A_KB
except:
    ua_time = 8.0   # Typical time
    ua_kb = 149.5   # Typical download size

ub_time = USER_B_TIME
ub_kb = USER_B_KB

print("\n" + "="*60)
print("COMPARISON: NDP-EP BENEFITS")
print("="*60)
print()
print(f"                    USER A          USER B")
print(f"                    (first time)    (with NDP-EP)")
print(f"  ──────────────────────────────────────────────")
print(f"  Time:             {ua_time:>6.2f}s         {ub_time:>6.2f}s")
print(f"  Data downloaded:  {ua_kb:>6.1f} KB       {ub_kb:>6.2f} KB")
print(f"  Processing:       Required        None")
print(f"  Data cleaning:    Required        None")
print(f"  ──────────────────────────────────────────────")
print()
if ua_time > ub_time:
    print(f"  Time saved:      {ua_time - ub_time:.2f}s ({(1 - ub_time/ua_time)*100:.0f}% faster)")
if ua_kb > ub_kb:
    print(f"  Data saved:      {ua_kb - ub_kb:.1f} KB ({(1 - ub_kb/ua_kb)*100:.0f}% less)")
print()
print("="*60)

---

## Summary

### Why Use NDP-EP?

**1. Avoid Redundant Work**
- Process data once, use many times
- No need to re-download and re-clean raw data

**2. Faster Access**
- Local S3 is faster than external sources
- Pre-processed data is smaller and cleaner

**3. Data Discovery**
- Search catalog before downloading
- Find datasets others have already prepared

**4. Collaboration**
- Share processed data with your team
- Standard metadata for discovery

## Cleanup (Optional)

In [None]:
# Uncomment to clean up all resources

# client.delete_resource_by_name(DATASET_NAME)
# client.delete_object(BUCKET_NAME, S3_KEY)
# client.delete_bucket(BUCKET_NAME)

print("Done!")