[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sci-ndp/ndp-ep-py/blob/main/docs/source/tutorials/s3_to_dataset_integration.ipynb)
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/sci-ndp/ndp-ep-py/main?filepath=docs%2Fsource%2Ftutorials%2Fs3_to_dataset_integration.ipynb)

# S3 Object to Dataset Integration Tutorial

Welcome to the comprehensive tutorial on integrating S3 objects with dataset registration!

## 🎯 What You'll Learn

This tutorial demonstrates a complete data publishing workflow:

- **📤 S3 Object Upload**: Upload data files to S3 buckets
- **🔗 URL Generation**: Generate secure access URLs for S3 objects
- **📊 Metadata Extraction**: Extract and enrich object metadata
- **📋 Dataset Registration**: Register datasets with S3 URLs and metadata
- **🔍 Resource Discovery**: Search and verify registered datasets
- **🧹 Resource Management**: Proper cleanup and lifecycle management

## 🔧 Use Cases

Perfect for:
- **Data Publishing**: Publishing research datasets with metadata
- **Data Lake Integration**: Connecting S3 storage with catalog systems
- **Automated Workflows**: ETL pipelines with dataset registration
- **Data Sharing**: Creating discoverable data resources
- **Research Data Management**: Academic data publication workflows

In [None]:
# Install required libraries\n
!pip install ndp-ep\n\n
# Import required modules\n
import io\n
import json\n
import time\n
import getpass\n
import hashlib\n
from datetime import datetime, timezone\n
from typing import List, Dict, Any, Optional\n
from ndp_ep import APIClient\n\n
print(\"✅ Libraries installed and imported successfully!\")\n
print(\"📚 Ready to start S3 to Dataset integration tutorial\")

## 1. 🔐 Setup and Configuration

In [None]:
# Configuration\n
api_url = input(\"Enter API base URL [http://localhost:8000]: \").strip()\n
if not api_url:\n
    api_url = \"http://localhost:8000\"\n\n
api_token = getpass.getpass(\"API Token: \")\n\n
if not api_token.strip():\n
    raise ValueError(\"❌ API token is required\")\n\n
print(\"✅ Configuration complete\")

In [None]:
# Initialize client\n
client = APIClient(base_url=api_url, token=api_token)\n\n
# Test functionality\n
system_status = client.get_system_status()\n
buckets = client.list_buckets()\n
orgs = client.list_organizations(server=\"local\")\n\n
print(\"✅ Client initialized successfully\")\n
print(f\"🌐 Connected to: {api_url}\")\n
print(f\"🪣 S3 buckets available: {len(buckets)}\")\n
print(f\"🏢 Organizations available: {len(orgs)}\")

## 2. 📤 Step 1: Upload File to S3 with Metadata

In [None]:
# Workflow configuration\n
WORKFLOW_PREFIX = \"s3integration\"\n
TIMESTAMP = int(time.time())\n
WORKFLOW_BUCKET = f\"{WORKFLOW_PREFIX}{TIMESTAMP}\"\n
DATASET_ORG = orgs[0] if orgs else \"example_org_name\"\n\n
print(f\"📊 Workflow Configuration:\")\n
print(f\"   Bucket: {WORKFLOW_BUCKET}\")\n
print(f\"   Organization: {DATASET_ORG}\")

In [None]:
# Create S3 bucket\n
print(f\"🪣 Creating S3 bucket: {WORKFLOW_BUCKET}\")\n\n
try:\n
    bucket_result = client.create_bucket(WORKFLOW_BUCKET, name=WORKFLOW_BUCKET)\n
    print(f\"✅ Bucket created successfully\")\n
    print(f\"📍 Bucket name: {WORKFLOW_BUCKET}\")\n
except Exception as e:\n
    print(f\"❌ Bucket creation failed: {e}\")\n
    raise

In [None]:
# Sample research data
sample_data = {
    "file_name": "research_data.csv",
    "content": """id,name,measurement,timestamp
1,Sample A,23.5,2024-01-01T10:00:00Z
2,Sample B,45.2,2024-01-01T11:00:00Z
3,Sample C,67.8,2024-01-01T12:00:00Z
4,Sample D,12.3,2024-01-01T13:00:00Z
5,Sample E,89.1,2024-01-01T14:00:00Z""",
    "description": "Research dataset with experimental measurements",
    "content_type": "text/csv",
    "tags": ["research", "measurements", "csv", "experiment"]
}

print(f"📄 Sample data prepared:")
print(f"   File: {sample_data['file_name']}")
print(f"   Description: {sample_data['description']}")
print(f"   Type: {sample_data['content_type']}")
print(f"   Size: {len(sample_data['content'].encode('utf-8'))} bytes")

In [None]:
# Upload file to S3 with metadata extraction\n
print(f\"📤 Uploading file to S3...\")\n\n
file_data = sample_data['content'].encode('utf-8')\n
object_key = f\"data/{sample_data['file_name']}\"\n\n
# Extract comprehensive metadata\n
file_hash = hashlib.sha256(file_data).hexdigest()\n
file_size = len(file_data)\n
created_at = datetime.now(timezone.utc).isoformat()\n\n
# CSV-specific metadata\n
csv_lines = sample_data['content'].split('\\n')\n
csv_rows = len(csv_lines) - 1  # Exclude header\n
csv_columns = len(csv_lines[0].split(',')) if csv_lines else 0\n\n
metadata = {\n
    \"file_name\": sample_data['file_name'],\n
    \"file_size\": file_size,\n
    \"content_type\": sample_data['content_type'],\n
    \"sha256_hash\": file_hash,\n
    \"description\": sample_data['description'],\n
    \"tags\": sample_data['tags'],\n
    \"created_at\": created_at,\n
    \"format\": \"CSV\",\n
    \"csv_rows\": csv_rows,\n
    \"csv_columns\": csv_columns,\n
    \"workflow_id\": f\"{WORKFLOW_PREFIX}_{TIMESTAMP}\"\n
}\n\n
try:\n
    upload_result = client.upload_object(\n
        bucket_name=WORKFLOW_BUCKET,\n
        object_key=object_key,\n
        file_data=file_data,\n
        content_type=sample_data['content_type']\n
    )\n
    \n
    print(f\"✅ File uploaded successfully\")\n
    print(f\"📍 S3 URI: s3://{WORKFLOW_BUCKET}/{object_key}\")\n
    print(f\"📊 Size: {file_size} bytes\")\n
    print(f\"🔒 SHA-256: {file_hash[:16]}...\")\n
    print(f\"📋 Rows: {csv_rows}, Columns: {csv_columns}\")\n
    \n
except Exception as e:\n
    print(f\"❌ Upload failed: {e}\")\n
    raise

## 3. 🔗 Step 2: Generate Secure Access URL

In [None]:
# Generate presigned download URL\n
print(f\"🔗 Generating secure access URL...\")\n\n
URL_EXPIRATION = 604800  # 7 days\n\n
try:\n
    url_data = client.generate_presigned_download_url(\n
        bucket_name=WORKFLOW_BUCKET,\n
        object_key=object_key,\n
        expiration=URL_EXPIRATION\n
    )\n
    \n
    download_url = url_data.get('url', '')\n
    url_generated_at = datetime.now(timezone.utc).isoformat()\n
    \n
    print(f\"✅ Presigned URL generated successfully\")\n
    print(f\"🔗 URL: {download_url[:60]}...\")\n
    print(f\"⏰ Expires in: {URL_EXPIRATION // 86400} days\")\n
    print(f\"📅 Generated at: {url_generated_at[:19]}Z\")\n
    \n
    # Add URL info to metadata\n
    metadata.update({\n
        \"download_url\": download_url,\n
        \"url_expires_days\": URL_EXPIRATION // 86400,\n
        \"url_generated_at\": url_generated_at\n
    })\n
    \n
except Exception as e:\n
    print(f\"❌ URL generation failed: {e}\")\n
    download_url = \"\"\n
    # Continue without URL

## 4. 📋 Step 3: Register Dataset with S3 References

In [None]:
# Create comprehensive dataset with S3 metadata
print(f"📋 Registering dataset with S3 references...")

dataset_name = f"{WORKFLOW_PREFIX}_dataset_{TIMESTAMP}"
dataset_title = f"{metadata['description']} - {metadata['file_name']}"

# Create rich dataset description
dataset_description = f"""Data Integration Workflow Dataset

{metadata['description']}

File Information:
- Original file: {metadata['file_name']}
- Size: {metadata['file_size']} bytes
- Format: {metadata['format']}
- Content type: {metadata['content_type']}
- SHA-256 hash: {metadata['sha256_hash']}

S3 Storage:
- Bucket: {WORKFLOW_BUCKET}
- Object key: {object_key}
- S3 URI: s3://{WORKFLOW_BUCKET}/{object_key}

CSV Details:
- Data rows: {metadata['csv_rows']}
- Columns: {metadata['csv_columns']}
- Header: id, name, measurement, timestamp

Access Information:
- Download URL expires in {metadata.get('url_expires_days', 'N/A')} days
- Created: {metadata['created_at'][:19]}Z
- Workflow ID: {metadata['workflow_id']}"""

print(f"📝 Dataset name: {dataset_name}")
print(f"📖 Title: {dataset_title}")
print(f"🏢 Organization: {DATASET_ORG}")

In [None]:
# Register dataset with comprehensive metadata\n
dataset_data = {\n
    \"name\": dataset_name,\n
    \"title\": dataset_title,\n
    \"owner_org\": DATASET_ORG,\n
    \"notes\": dataset_description,\n
    \"tags\": metadata['tags'],\n
    \"license_id\": \"cc-by\",\n
    \"private\": False,\n
    \"extras\": {\n
        # File metadata\n
        \"file_name\": metadata['file_name'],\n
        \"file_size\": str(metadata['file_size']),\n
        \"content_type\": metadata['content_type'],\n
        \"sha256_hash\": metadata['sha256_hash'],\n
        \"format\": metadata['format'],\n
        \n
        # S3 location\n
        \"s3_bucket\": WORKFLOW_BUCKET,\n
        \"s3_object_key\": object_key,\n
        \"s3_uri\": f\"s3://{WORKFLOW_BUCKET}/{object_key}\",\n
        \n
        # CSV-specific metadata\n
        \"csv_rows\": str(metadata['csv_rows']),\n
        \"csv_columns\": str(metadata['csv_columns']),\n
        \n
        # Workflow tracking\n
        \"workflow_id\": metadata['workflow_id'],\n
        \"created_at\": metadata['created_at']\n
    }\n
}\n\n
# Add URL information if available\n
if download_url:\n
    dataset_data['extras'].update({\n
        \"download_url\": metadata['download_url'],\n
        \"url_expires_days\": str(metadata['url_expires_days']),\n
        \"url_generated_at\": metadata['url_generated_at']\n
    })\n\n
try:\n
    registration_result = client.register_general_dataset(dataset_data, server=\"local\")\n
    dataset_id = registration_result.get('id', 'unknown')\n
    \n
    print(f\"✅ Dataset registered successfully\")\n
    print(f\"🆔 Dataset ID: {dataset_id}\")\n
    print(f\"📊 Metadata fields: {len(dataset_data['extras'])}\")\n
    print(f\"🔗 S3 integration: Complete\")\n
    print(f\"🏷️  Tags: {', '.join(metadata['tags'])}\")\n
    \n
except Exception as e:\n
    print(f\"❌ Dataset registration failed: {e}\")\n
    raise

## 5. 🔍 Step 4: Verify Integration and Discovery

In [None]:
# Verify dataset through search\n
print(f\"🔍 Verifying dataset integration...\")\n\n
try:\n
    # Search by workflow prefix\n
    search_results = client.search_datasets(\n
        terms=[WORKFLOW_PREFIX],\n
        server=\"local\"\n
    )\n
    \n
    workflow_datasets = [\n
        ds for ds in search_results \n
        if ds.get('name', '').startswith(WORKFLOW_PREFIX)\n
    ]\n
    \n
    print(f\"✅ Search verification successful\")\n
    print(f\"📊 Found {len(workflow_datasets)} workflow datasets\")\n
    \n
    if workflow_datasets:\n
        dataset = workflow_datasets[0]\n
        print(f\"\n📋 Dataset Details:\")\n
        print(f\"   Name: {dataset.get('name', 'Unknown')}\")\n
        print(f\"   Title: {dataset.get('title', 'No title')}\")\n
        print(f\"   Organization: {dataset.get('organization', {}).get('name', 'Unknown')}\")\n
        \n
        # Show tags\n
        tags = dataset.get('tags', [])\n
        if tags:\n
            tag_names = [tag.get('name', tag) if isinstance(tag, dict) else str(tag) for tag in tags]\n
            print(f\"   Tags: {', '.join(tag_names)}\")\n
    \n
except Exception as e:\n
    print(f\"❌ Search verification failed: {e}\")

In [None]:
# Advanced search demonstrations
print(f"\n🔎 Advanced Search Demonstrations")
print("=" * 40)

search_queries = [
    {"name": "By tag", "terms": ["research"]},
    {"name": "By format", "terms": ["csv"]},
    {"name": "By organization", "terms": [DATASET_ORG]}
]

for query in search_queries:
    try:
        results = client.search_datasets(terms=query["terms"], server="local")
        workflow_matches = [r for r in results if r.get('name', '').startswith(WORKFLOW_PREFIX)]
        
        print(f"🔍 {query['name']}: {len(results)} total, {len(workflow_matches)} workflow matches")
        
    except Exception as e:
        print(f"❌ {query['name']} search failed: {e}")

## 6. 🧹 Resource Cleanup

In [None]:
# Cleanup confirmation\n
print(\"🧹 CLEANUP CONFIRMATION\")\n
print(\"=\" * 30)\n
print(f\"Resources to delete:\")\n
print(f\"   📋 Dataset: {dataset_name}\")\n
print(f\"   📁 S3 object: {object_key}\")\n
print(f\"   🪣 S3 bucket: {WORKFLOW_BUCKET}\")\n\n
confirmation = input(\"Proceed with cleanup? (yes/no): \").strip().lower()\n\n
if confirmation != 'yes':\n
    print(\"🚫 Cleanup cancelled - resources remain for exploration\")\n
else:\n
    print(\"✅ Proceeding with cleanup...\")

In [None]:
# Execute cleanup if confirmed
if confirmation == 'yes':
    print(f"\n🗑️  Cleaning up resources...")
    
    # Delete dataset
    try:
        client.delete_resource_by_name(dataset_name, server="local")
        print(f"✅ Dataset deleted: {dataset_name}")
    except Exception as e:
        print(f"❌ Dataset deletion failed: {e}")
    
    # Delete S3 object
    try:
        client.delete_object(WORKFLOW_BUCKET, object_key)
        print(f"✅ S3 object deleted: {object_key}")
    except Exception as e:
        print(f"❌ S3 object deletion failed: {e}")
    
    # Delete S3 bucket
    try:
        client.delete_bucket(WORKFLOW_BUCKET)
        print(f"✅ S3 bucket deleted: {WORKFLOW_BUCKET}")
    except Exception as e:
        print(f"❌ S3 bucket deletion failed: {e}")
    
    print(f"\n🎉 Cleanup completed successfully!")
else:
    print(f"\n💡 Resources remain available for further exploration:")
    print(f"   🪣 S3 bucket: {WORKFLOW_BUCKET}")
    print(f"   📁 S3 object: s3://{WORKFLOW_BUCKET}/{object_key}")
    print(f"   📋 Dataset: {dataset_name} (ID: {dataset_id})")
    if download_url:
        print(f"   🔗 Download URL: Available for {metadata.get('url_expires_days', 'N/A')} days")