# NDP EP Tutorial: S3 Storage to Dataset Registration Workflow

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sci-ndp/pop/blob/main/docs/s3_to_dataset_workflow_tutorial.ipynb)
[![Open in Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/sci-ndp/pop/main?filepath=docs/s3_to_dataset_workflow_tutorial.ipynb)

> 🚀 **Run Online Options:**
> - **Google Colab**: Dependencies installed automatically in the first cell
> - **Binder**: Pre-configured environment, ready to run immediately
> - **Local**: Requires `pip install requests jupyter`

This notebook demonstrates a complete workflow for scientific data management using the NDP EP API:

1. **Upload files to S3 storage** using MINIO endpoints
2. **Generate presigned URLs** for secure access
3. **Register datasets** with S3 URLs as resources
4. **Manage the complete data lifecycle** from storage to discovery

## Use Cases

This workflow is perfect for:
- **Research data management**: Store large datasets in S3 and register them for discovery
- **Reproducible science**: Create permanent links to data files with rich metadata
- **Data publishing**: Combine storage with catalog registration for data sharing
- **Institutional repositories**: Manage both storage and metadata in one workflow

## Prerequisites

- Python 3.7+
- `requests` library
- Access to a NDP EP API instance with S3/MINIO configured
- Valid authentication credentials
- Data files to upload

## Workflow Overview

```
Local File → S3 Upload → Generate URL → Register Dataset → Published Data
```

In [None]:
# Install required packages
!pip install requests -q

## 1. Setup and Configuration

First, let's import the necessary libraries and configure our API connection parameters.

In [None]:
import requests
import json
from typing import Dict, Any, Optional
import time
import io
import os
from datetime import datetime
from pprint import pprint

### Configuration Variables

**Important:** Replace these values with your actual API endpoint and credentials.

In [None]:
# API Configuration
API_BASE_URL = "http://localhost:8000"  # Replace with your API URL

# Authentication Token
AUTH_TOKEN = "testing_token"  # Replace with your actual token

# Request headers with authentication
HEADERS = {
    "Authorization": f"Bearer {AUTH_TOKEN}",
    "Accept": "application/json"
}

print(f"API Base URL: {API_BASE_URL}")
print(f"Token configured: {'✓' if AUTH_TOKEN != 'your_auth_token_here' else '✗ Please set your token'}")

### Helper Functions

Let's create utility functions for both S3 and dataset operations.

In [None]:
def make_api_request(method: str, endpoint: str, data: Optional[Dict] = None, 
                    params: Optional[Dict] = None, files: Optional[Dict] = None,
                    custom_headers: Optional[Dict] = None) -> Dict[str, Any]:
    """
    Make an API request with proper error handling for both S3 and dataset endpoints.
    """
    url = f"{API_BASE_URL}{endpoint}"
    
    # Prepare headers
    headers = HEADERS.copy()
    if custom_headers:
        headers.update(custom_headers)
    
    # Remove Content-Type for file uploads to let requests set it
    if files and "Content-Type" in headers:
        del headers["Content-Type"]
    
    try:
        response = requests.request(
            method=method,
            url=url,
            headers=headers,
            json=data if not files else None,
            data=data if files else None,
            files=files,
            params=params,
            stream=(method == "GET" and "download" in endpoint.lower())
        )
        
        print(f"🔗 {method} {url}")
        print(f"📊 Status: {response.status_code}")
        
        if response.status_code in [200, 201, 204]:
            # Handle streaming responses (file downloads)
            if response.headers.get('content-type', '').startswith('application/octet-stream') or \
               'attachment' in response.headers.get('content-disposition', ''):
                print("✅ Success! (File download)")
                return {
                    "success": True,
                    "content": response.content,
                    "headers": dict(response.headers),
                    "status_code": response.status_code
                }
            
            # Handle JSON responses
            try:
                result = response.json()
                print("✅ Success!")
                return result
            except ValueError:
                # Handle non-JSON success responses
                print("✅ Success! (Non-JSON response)")
                return {"success": True, "status_code": response.status_code}
        else:
            print(f"❌ Error: {response.status_code}")
            try:
                error_detail = response.json()
                print(f"Error details: {json.dumps(error_detail, indent=2)}")
            except:
                print(f"Error text: {response.text}")
            return {"error": True, "status_code": response.status_code, "detail": response.text}
            
    except requests.exceptions.RequestException as e:
        print(f"❌ Request failed: {e}")
        return {"error": True, "exception": str(e)}

def print_response(response: Dict[str, Any], title: str = "Response"):
    """Pretty print API responses."""
    print(f"\n📋 {title}:")
    print("─" * 50)
    if "content" in response:  # File download response
        print(f"File size: {len(response['content'])} bytes")
        print(f"Headers: {response['headers']}")
    else:
        pprint(response)
    print("─" * 50)

def create_sample_file(filename: str, content: str = None, size_kb: int = None) -> str:
    """Create a sample file for upload testing."""
    if content is None:
        if size_kb:
            # Create file of specific size
            content = "Sample data line for workflow tutorial.\n" * (size_kb * 25)  # Approx 1KB per 25 lines
        else:
            content = f"""# Research Data File - {filename}
# Created: {datetime.now().isoformat()}
# Purpose: NDP EP S3-to-Dataset workflow tutorial

timestamp,temperature,humidity,pressure
2024-01-01T00:00:00Z,23.5,45.2,1013.25
2024-01-01T01:00:00Z,23.2,46.1,1013.15
2024-01-01T02:00:00Z,22.8,47.3,1012.98
2024-01-01T03:00:00Z,22.4,48.0,1012.75
2024-01-01T04:00:00Z,22.1,48.9,1012.60

# This is sample weather data for the tutorial
# In a real workflow, this would be your actual research data
"""
    
    with open(filename, 'w') as f:
        f.write(content)
    
    size_bytes = os.path.getsize(filename)
    print(f"📄 Created sample file: {filename} ({size_bytes} bytes)")
    return filename

## 2. API Connectivity Test

Let's verify that both S3 and dataset endpoints are accessible.

In [None]:
# Test API connectivity
print("🧪 Testing API connectivity...")
status_response = make_api_request("GET", "/status/")
print_response(status_response, "API Status")

if "error" not in status_response:
    print("🎉 API is accessible and responding correctly!")
    
    # Test S3 endpoints
    print("\n🧪 Testing S3 service...")
    buckets_response = make_api_request("GET", "/s3/buckets/")
    
    if "error" not in buckets_response:
        print("✅ S3 service is available and configured!")
        print(f"📦 Found {len(buckets_response.get('buckets', []))} existing buckets")
    else:
        print("⚠️ S3 service may not be configured")
else:
    print("⚠️ API connectivity issues. Please check your configuration.")

## 3. Step 1: Prepare and Upload Data to S3

First, we'll create sample research data and upload it to S3 storage.

### Create Sample Research Data

Let's create some sample files that represent typical research data.

In [None]:
# Generate unique identifiers for this workflow
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
workflow_id = f"workflow_{timestamp}"

print(f"🔬 Creating sample research data for workflow: {workflow_id}")

# Create different types of research files
sample_files = [
    {
        "filename": f"temperature_data_{timestamp}.csv",
        "content": f"""# Temperature Measurement Dataset
# Workflow ID: {workflow_id}
# Created: {datetime.now().isoformat()}
# Instrument: Weather Station Network
# Location: Research Site A

timestamp,temperature_celsius,quality_flag,station_id
2024-01-01T00:00:00Z,23.5,GOOD,WS001
2024-01-01T01:00:00Z,23.2,GOOD,WS001
2024-01-01T02:00:00Z,22.8,GOOD,WS001
2024-01-01T03:00:00Z,22.4,SUSPECT,WS001
2024-01-01T04:00:00Z,22.1,GOOD,WS001
2024-01-01T05:00:00Z,21.9,GOOD,WS001
2024-01-01T06:00:00Z,22.3,GOOD,WS001
2024-01-01T07:00:00Z,23.1,GOOD,WS001
2024-01-01T08:00:00Z,24.2,GOOD,WS001
2024-01-01T09:00:00Z,25.8,GOOD,WS001
""",
        "description": "Hourly temperature measurements with quality flags",
        "format": "CSV"
    },
    {
        "filename": f"analysis_results_{timestamp}.json",
        "content": json.dumps({
            "workflow_id": workflow_id,
            "analysis_type": "statistical_summary",
            "created": datetime.now().isoformat(),
            "data_sources": ["temperature_sensors", "humidity_sensors"],
            "results": {
                "temperature_stats": {
                    "mean": 23.21,
                    "std_dev": 1.15,
                    "min": 21.9,
                    "max": 25.8,
                    "n_observations": 10
                },
                "quality_assessment": {
                    "good_data_percentage": 90.0,
                    "suspect_data_count": 1,
                    "missing_data_count": 0
                }
            },
            "methodology": "Standard statistical analysis with outlier detection",
            "software_version": "Analysis Pipeline v2.1"
        }, indent=2),
        "description": "Statistical analysis results in JSON format",
        "format": "JSON"
    },
    {
        "filename": f"methodology_{timestamp}.md",
        "content": f"""# Research Methodology Documentation

**Workflow ID:** {workflow_id}  
**Created:** {datetime.now().isoformat()}  
**Study Type:** Environmental Monitoring

## Overview

This document describes the methodology used for collecting and analyzing environmental data as part of the NDP EP workflow tutorial.

## Data Collection

### Instruments
- Weather Station Network (Model: WS-2024)
- Temperature sensors: ±0.1°C accuracy
- Sampling interval: 1 hour
- Quality control: Automated + manual review

### Locations
- Research Site A: 40.7128°N, 74.0060°W
- Elevation: 10m above sea level
- Environment: Urban research station

## Data Processing

1. **Raw Data Collection**
   - Automated data logger retrieval
   - Timestamp validation
   - Initial quality flags

2. **Quality Control**
   - Range checks: -40°C to +50°C
   - Temporal consistency checks
   - Outlier detection using 3-sigma rule

3. **Statistical Analysis**
   - Descriptive statistics
   - Trend analysis
   - Quality metrics

## Data Management

- **Storage**: S3-compatible object storage
- **Format**: CSV for data, JSON for analysis results
- **Backup**: Automated daily backups
- **Access**: Controlled via NDP EP API

## References

- Environmental Monitoring Standards (EMS-2024)
- Data Quality Guidelines (DQG-v3.2)
- NDP EP Documentation
""",
        "description": "Comprehensive methodology documentation",
        "format": "Markdown"
    }
]

# Create the files
created_files = []
for file_info in sample_files:
    filename = create_sample_file(file_info["filename"], file_info["content"])
    created_files.append({
        "filename": filename,
        "description": file_info["description"],
        "format": file_info["format"]
    })

print(f"\n✅ Created {len(created_files)} sample files for the workflow")
for i, file_info in enumerate(created_files, 1):
    print(f"  {i}. {file_info['filename']} ({file_info['format']}) - {file_info['description']}")

### Create S3 Bucket for Research Data

Now let's create a dedicated bucket for our research workflow.

In [None]:
# Create a bucket for this workflow
bucket_name = f"research-data-{timestamp}"

print(f"🪣 Creating S3 bucket: {bucket_name}")

bucket_data = {
    "name": bucket_name,
    "region": "us-east-1"
}

bucket_response = make_api_request("POST", "/s3/buckets/", data=bucket_data)
print_response(bucket_response, "Bucket Creation")

if "error" not in bucket_response:
    print(f"\n✅ Bucket '{bucket_name}' created successfully!")
    workflow_bucket = bucket_name
else:
    print("❌ Failed to create bucket. Using existing bucket or manual creation required.")
    workflow_bucket = None

### Upload Research Files to S3

Let's upload our research files to the S3 bucket.

In [None]:
if workflow_bucket:
    print(f"📤 Uploading research files to bucket: {workflow_bucket}")
    
    uploaded_files = []
    
    for i, file_info in enumerate(created_files, 1):
        filename = file_info["filename"]
        print(f"\n📁 Uploading file {i}/{len(created_files)}: {filename}")
        
        # Upload file
        with open(filename, 'rb') as f:
            files = {"file": (filename, f, "text/plain")}
            form_data = {"object_key": filename}
            
            upload_response = make_api_request(
                "POST", 
                f"/s3/objects/{workflow_bucket}",
                data=form_data,
                files=files
            )
            
            if "error" not in upload_response:
                print(f"✅ Uploaded: {filename}")
                print(f"   📏 Size: {upload_response.get('size', 'N/A')} bytes")
                print(f"   🔑 Key: {upload_response.get('key', 'N/A')}")
                print(f"   📦 Bucket: {upload_response.get('bucket', 'N/A')}")
                
                uploaded_files.append({
                    "filename": filename,
                    "bucket": upload_response.get('bucket'),
                    "key": upload_response.get('key'),
                    "size": upload_response.get('size'),
                    "description": file_info["description"],
                    "format": file_info["format"]
                })
            else:
                print(f"❌ Failed to upload: {filename}")
                print_response(upload_response, "Upload Error")
        
        # Clean up local file
        try:
            os.remove(filename)
        except:
            pass
    
    print(f"\n📊 Upload Summary:")
    print(f"✅ Successfully uploaded: {len(uploaded_files)} files")
    print(f"📦 Bucket: {workflow_bucket}")
    print(f"💾 Total size: {sum(f.get('size', 0) for f in uploaded_files)} bytes")
    
else:
    print("⚠️ No bucket available for file upload")
    uploaded_files = []

## 4. Step 2: Generate Presigned URLs

Now we'll generate presigned URLs for our uploaded files. These URLs will be used as resource links in our dataset.

In [None]:
if uploaded_files:
    print("🔗 Generating presigned URLs for uploaded files...")
    
    # Generate presigned URLs (valid for 7 days - maximum allowed)
    presigned_request = {
        "expires_in": 604800  # 7 days in seconds (maximum)
    }
    
    files_with_urls = []
    
    for file_info in uploaded_files:
        bucket = file_info["bucket"]
        key = file_info["key"]
        
        print(f"\n🔗 Generating URL for: {key}")
        
        url_response = make_api_request(
            "POST",
            f"/s3/objects/{bucket}/{key}/presigned-download",
            data=presigned_request
        )
        
        if "error" not in url_response and "url" in url_response:
            print(f"✅ Generated presigned URL")
            print(f"   ⏰ Expires in: {url_response['expires_in']} seconds ({url_response['expires_in']//3600} hours)")
            
            # Add URL to file info
            file_info_with_url = file_info.copy()
            file_info_with_url["presigned_url"] = url_response["url"]
            file_info_with_url["url_expires_in"] = url_response["expires_in"]
            files_with_urls.append(file_info_with_url)
            
        else:
            print(f"❌ Failed to generate URL for: {key}")
            print_response(url_response, "URL Generation Error")
    
    print(f"\n📊 URL Generation Summary:")
    print(f"✅ Generated URLs for: {len(files_with_urls)} files")
    print(f"🔗 URLs valid for: {presigned_request['expires_in']//3600} hours")
    print(f"📅 URLs expire on: {datetime.fromtimestamp(time.time() + presigned_request['expires_in'])}")
    
else:
    print("⚠️ No uploaded files available for URL generation")
    files_with_urls = []

## 5. Step 3: Create Organization (if needed)

Before registering datasets, we need to ensure we have an organization.

In [None]:
# Check existing organizations in CKAN directly
print("🏢 Checking organizations in local CKAN...")

# Get organizations directly from CKAN API
try:
    ckan_orgs_response = requests.get("http://localhost:5000/api/3/action/organization_list")
    if ckan_orgs_response.status_code == 200:
        ckan_data = ckan_orgs_response.json()
        available_orgs = ckan_data.get('result', [])
        
        print(f"📈 Found {len(available_orgs)} organizations in local CKAN:")
        for i, org in enumerate(available_orgs, 1):
            print(f"  {i}. {org}")
        
        # Use test_raul if available, otherwise use the first one
        if "test_raul" in available_orgs:
            organization_name = "test_raul"
        elif available_orgs:
            organization_name = available_orgs[0]
        else:
            organization_name = None
            
        if organization_name:
            print(f"\n✅ Using organization: {organization_name}")
    else:
        print("❌ Could not retrieve organizations from CKAN")
        organization_name = None
        
except Exception as e:
    print(f"❌ Error connecting to CKAN: {e}")
    print("⚠️ Using fallback organization name")
    organization_name = "test_raul"  # Fallback to known working organization

if not organization_name:
    print("\n⚠️ No organizations available. Please create one manually first.")
    print("   The tutorial requires an existing organization to register datasets.")

print(f"\n🎯 Organization for dataset: {organization_name or 'None available'}")

## 6. Step 4: Register Dataset with S3 Resources

Now we'll create a comprehensive dataset that includes our S3-stored files as resources.

In [None]:
if organization_name and files_with_urls:
    print(f"📊 Creating dataset with S3 resources...")
    
    # Prepare resources from our uploaded files
    dataset_resources = []
    for file_info in files_with_urls:
        resource = {
            "url": file_info["presigned_url"],
            "name": file_info["filename"],
            "description": file_info["description"],
            "format": file_info["format"],
            "size": file_info["size"]
        }
        
        # Add mimetype based on format
        format_mimetypes = {
            "CSV": "text/csv",
            "JSON": "application/json",
            "Markdown": "text/markdown",
            "PDF": "application/pdf",
            "XML": "application/xml"
        }
        resource["mimetype"] = format_mimetypes.get(file_info["format"], "application/octet-stream")
        
        dataset_resources.append(resource)
    
    # Create comprehensive dataset payload
    dataset_payload = {
        # Required fields
        "name": f"environmental_monitoring_{timestamp}",
        "title": f"Environmental Monitoring Dataset - Workflow {workflow_id}",
        "owner_org": organization_name,
        
        # Descriptive metadata
        "notes": f"""This dataset contains environmental monitoring data collected as part of research workflow {workflow_id}. 

The dataset includes:
- Temperature measurements with quality control flags
- Statistical analysis results
- Comprehensive methodology documentation

All data files are stored in S3 object storage and accessible via presigned URLs. This demonstrates the complete workflow from data collection through storage to dataset registration in the NDP EP platform.

**Data Collection Period:** {datetime.now().strftime('%Y-%m-%d')}
**Quality Level:** Research Grade
**Access:** Open Access via presigned URLs""",
        
        # Categorization
        "tags": [
            "environmental-monitoring",
            "temperature",
            "research-data",
            "s3-workflow",
            "quality-controlled",
            "open-access",
            "tutorial"
        ],
        "groups": ["environmental", "research", "monitoring"],
        
        # Administrative metadata
        "license_id": "cc-by-4.0",
        "version": "1.0",
        "private": False,
        
        # Extended metadata using extras
        "extras": {
            "workflow_id": workflow_id,
            "creation_method": "S3-to-Dataset API Workflow",
            "storage_backend": "S3 Object Storage",
            "bucket_name": workflow_bucket,
            "data_collection_date": datetime.now().strftime('%Y-%m-%d'),
            "quality_control": "Automated + Manual Review",
            "data_format_standards": "CSV, JSON, Markdown",
            "access_method": "Presigned URLs",
            "url_expiration_hours": str(presigned_request['expires_in']//3600),
            "geographical_coverage": "Research Site A (40.7128°N, 74.0060°W)",
            "temporal_coverage": "2024-01-01 (sample data)",
            "instrument_type": "Weather Station Network",
            "measurement_frequency": "Hourly",
            "data_processing_level": "Level 2 - Quality Controlled",
            "contact_info": "NDP EP Tutorial",
            "methodology_reference": "See included methodology documentation",
            "software_version": "NDP EP API v1.0",
            "backup_location": f"S3 bucket: {workflow_bucket}",
            "checksum_algorithm": "MD5 (via S3 ETag)"
        },
        
        # S3-stored resources
        "resources": dataset_resources
    }
    
    print(f"📋 Dataset will include:")
    print(f"   📊 Name: {dataset_payload['name']}")
    print(f"   📝 Title: {dataset_payload['title']}")
    print(f"   🏢 Organization: {organization_name}")
    print(f"   🏷️ Tags: {len(dataset_payload['tags'])} tags")
    print(f"   📁 Resources: {len(dataset_resources)} S3-stored files")
    print(f"   📦 Storage: S3 bucket '{workflow_bucket}'")
    print(f"   🔗 Access: Presigned URLs (valid {presigned_request['expires_in']//3600} hours)")
    
    # Create the dataset
    print(f"\n🚀 Registering dataset...")
    dataset_response = make_api_request("POST", "/dataset", data=dataset_payload)
    print_response(dataset_response, "Dataset Registration")
    
    if "error" not in dataset_response and "id" in dataset_response:
        dataset_id = dataset_response["id"]
        print(f"\n🎉 Dataset registered successfully!")
        print(f"🆔 Dataset ID: {dataset_id}")
        print(f"📋 Dataset Name: {dataset_payload['name']}")
        print(f"📁 Resources: {len(dataset_resources)} files from S3")
        print(f"📦 S3 Bucket: {workflow_bucket}")
        print(f"🔗 All files accessible via dataset resources")
        
        workflow_dataset_id = dataset_id
    else:
        print("❌ Failed to register dataset")
        workflow_dataset_id = None
        
else:
    print("⚠️ Cannot create dataset - missing organization or S3 files")
    workflow_dataset_id = None

## 7. Verification and Testing

Let's verify that our complete workflow worked correctly by testing access to the data.

In [None]:
if workflow_dataset_id and files_with_urls:
    print("🧪 Verifying workflow completion...")
    
    # Test 1: Verify we can list objects in our S3 bucket
    print("\n1️⃣ Testing S3 bucket access...")
    bucket_objects = make_api_request("GET", f"/s3/objects/{workflow_bucket}")
    
    if "error" not in bucket_objects and "objects" in bucket_objects:
        objects = bucket_objects["objects"]
        print(f"✅ S3 bucket contains {len(objects)} objects")
        for obj in objects:
            print(f"   📄 {obj['key']} ({obj['size']} bytes)")
    else:
        print("❌ Could not access S3 bucket objects")
    
    # Test 2: Test one of the presigned URLs
    print("\n2️⃣ Testing presigned URL access...")
    if files_with_urls:
        test_file = files_with_urls[0]  # Test first file
        test_url = test_file["presigned_url"]
        
        print(f"🔗 Testing access to: {test_file['filename']}")
        
        try:
            # Test the presigned URL directly (without authentication)
            response = requests.get(test_url)
            if response.status_code == 200:
                print(f"✅ Presigned URL works! Downloaded {len(response.content)} bytes")
                
                # Show preview of content if it's text
                if test_file["format"] in ["CSV", "Markdown", "JSON"]:
                    content_preview = response.text[:200]
                    print(f"📄 Content preview:\n{content_preview}...")
            else:
                print(f"❌ Presigned URL failed: {response.status_code}")
        except Exception as e:
            print(f"❌ Error testing presigned URL: {e}")
    
    # Test 3: Verify dataset metadata
    print("\n3️⃣ Workflow verification summary...")
    print(f"✅ Created S3 bucket: {workflow_bucket}")
    print(f"✅ Uploaded {len(uploaded_files)} research files")
    print(f"✅ Generated {len(files_with_urls)} presigned URLs")
    print(f"✅ Registered dataset: {workflow_dataset_id}")
    print(f"✅ Dataset includes {len(dataset_resources)} S3 resources")
    
    total_size_mb = sum(f.get('size', 0) for f in uploaded_files) / (1024*1024)
    print(f"📊 Total data stored: {total_size_mb:.2f} MB")
    print(f"🔗 All files accessible via presigned URLs for {presigned_request['expires_in']//3600} hours")
    
else:
    print("⚠️ Workflow incomplete - cannot verify")

## 8. Alternative: Create Direct S3 URLs (Optional)

For reference, here's how to create "permanent" URLs using direct S3 access (requires bucket to be public).

In [None]:
if workflow_bucket and uploaded_files:
    print("📚 Alternative approach: Direct S3 URLs")
    print("(Note: This requires the bucket to be configured as public)")
    
    direct_urls = []
    for file_info in uploaded_files:
        # Direct S3 URL format (works if bucket is public)
        direct_url = f"http://localhost:9000/{file_info['bucket']}/{file_info['key']}"
        direct_urls.append({
            "filename": file_info["filename"],
            "direct_url": direct_url,
            "description": file_info["description"]
        })
        
        print(f"🔗 {file_info['filename']}: {direct_url}")
    
    print("\n💡 Direct URLs vs Presigned URLs:")
    print("   • Direct URLs: Permanent but require public bucket")
    print("   • Presigned URLs: Temporary but work with private buckets")
    print("   • For research data, presigned URLs are usually preferred for security")
    
else:
    print("⚠️ No files available for direct URL demonstration")