# S3 Exchange Library - Examples

This notebook demonstrates how to use the `s3exchange` library for managing S3 artifacts with manifest-based data exchange.

## Table of Contents
1. [Setup and Initialization](#setup)
2. [Basic Object Operations](#basic-ops)
3. [Manifest Operations](#manifests)
4. [Shard Archives](#shards)
5. [ManifestWriter - Incremental Writing](#manifest-writer)
6. [Listing and Filtering](#listing)
7. [Deletion Operations](#deletion)
8. [Error Handling](#errors)

## 1. Setup and Initialization {#setup}

In [None]:
import boto3
from botocore.client import Config
from s3exchange import S3ExchangeStore

from dotenv import load_dotenv
import os

load_dotenv()

S3_ACCESS_KEY_ID = os.getenv("S3_AWS_ACCESS_KEY_ID")
S3_SECRET_ACCESS_KEY = os.getenv("S3_AWS_SECRET_ACCESS_KEY")
S3_REGION = os.getenv("S3_REGION")
S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL")
S3_BUCKET = os.getenv("S3_BUCKET")

# Initialize S3 client
# For local development (e.g., Garage), use endpoint_url
# For AWS S3, omit endpoint_url
# s3_client = boto3.client('s3', endpoint_url='http://garage:3900')
s3_client = boto3.client(
    "s3",
    endpoint_url=S3_ENDPOINT_URL,
    region_name=S3_REGION,
    aws_access_key_id=S3_ACCESS_KEY_ID,
    aws_secret_access_key=S3_SECRET_ACCESS_KEY,
    config=Config(s3={"addressing_style": "path"}),
)


# Create store instance
store = S3ExchangeStore(
    s3_client=s3_client,
    bucket=S3_BUCKET,
    base_prefix='prod',  # Optional: prefix all keys
    default_vars={'service_name': 'training-service'},
)

print(f"Store initialized with bucket: {store.bucket}")
print(f"Base prefix: {store.base_prefix}")

Store initialized with bucket: wakeworx-artifacts
Base prefix: prod


### Key Templating

The library supports templating with placeholders:

In [2]:
# Resolve template with variables
key = store._resolve_key("training/{job_id}/samples", {"job_id": "123"})
print(f"Resolved key: {key}")

# Create a scoped store for convenience
scoped = store.scope(job_id="123", prefix="training")
key = scoped.key("{prefix}/{job_id}/samples")
print(f"Scoped key: {key}")

Resolved key: prod/training/123/samples
Scoped key: prod/training/123/samples


## 2. Basic Object Operations {#basic-ops}

### Writing Objects

In [3]:
# Put a single object with metadata
entry = store.put_object(
    key="training/123/samples/file.wav",
    data=b"fake audio data...",
    id="file-001",
    meta={"sr": 16000, "length": 15342},
    content_type="audio/wav",
)

print(f"Created entry: {entry}")
print(f"Entry kind: {entry['kind']}")
print(f"Entry key: {entry['key']}")
print(f"Entry metadata: {entry.get('meta', {})}")

Created entry: {'kind': 'file', 'key': 'training/123/samples/file.wav', 'id': 'file-001', 'meta': {'sr': 16000, 'length': 15342}, 'content_type': 'audio/wav', 'size_bytes': 18, 'etag': '548baa8d1efffaaae5d59f4bedcf09b3'}
Entry kind: file
Entry key: training/123/samples/file.wav
Entry metadata: {'sr': 16000, 'length': 15342}


In [8]:
entry = store.put_object(
    key="training/123/samples/file1.wav",
    data=b"fake audio data...",
    id="file-001",
    meta={"sr": 16000, "length": 15342},
    content_type="audio/wav",
)

entry = store.put_object(
    key="training/123/samples/file2.wav",
    data=b"fake audio data...",
    id="file-001",
    meta={"sr": 16000, "length": 15342},
    content_type="audio/wav",
)

### Reading Objects

In [4]:
# Read a single object
stream = store.get_object("training/123/samples/file.wav")
data = stream.read()
print(f"Read {len(data)} bytes")
stream.close()

Read 18 bytes


## 3. Manifest Operations {#manifests}

### Writing Manifests

Manifests track collections of objects. You can write in `overwrite` or `append_parts` mode.

In [5]:
# Create initial manifest entries
entries = [
    {"kind": "file", "key": "training/123/samples/file1.wav", "id": "001"},
    {"kind": "file", "key": "training/123/samples/file2.wav", "id": "002"},
]

# Write manifest in overwrite mode
store.write_manifest(
    key="training/123/samples/manifest.jsonl",
    entries=entries,
    mode="overwrite",
)

print("Manifest written successfully")

Manifest written successfully


In [None]:
# Append new entries using append_parts mode (recommended for updates)
new_entries = [
    {"kind": "file", "key": "training/123/samples/file3.wav", "id": "003"},
]

store.write_manifest(
    key="training/123/samples/manifest.jsonl",
    entries=new_entries,
    mode="append_parts",  # Creates a part file and updates root manifest
)

print("New entries appended to manifest")

### Reading from Manifests

In [6]:
# Iterate over manifest entries without fetching objects
print("Manifest entries:")
for entry in store.iter_manifest_entries("training/123/samples/manifest.jsonl"):
    print(f"  - {entry['kind']}: {entry.get('key', entry.get('archive_key', 'N/A'))}")

Manifest entries:
  - file: training/123/samples/file1.wav
  - file: training/123/samples/file2.wav


In [9]:
# Iterate over objects in manifest (lazy iteration)
print("Reading objects from manifest:")
for stream, entry in store.iter_objects("training/123/samples/manifest.jsonl"):
    print(f"  Reading {entry['key']} (id: {entry.get('id', 'N/A')})")
    data = stream.read()
    print(f"    Size: {len(data)} bytes")
    stream.close()

Reading objects from manifest:
  Reading training/123/samples/file1.wav (id: 001)
    Size: 18 bytes
  Reading training/123/samples/file2.wav (id: 002)
    Size: 18 bytes


## 4. Shard Archives {#shards}

Shard archives are tar/tar.gz files containing multiple files with an internal manifest. They're useful for efficiently storing many small files.

In [None]:
from s3exchange import create_shards
import tempfile
from pathlib import Path

# Create some temporary files for demonstration
temp_dir = Path(tempfile.mkdtemp())
print(f"Creating test files in: {temp_dir}")

# Create sample files
for i in range(5):
    file_path = temp_dir / f"file_{i:03d}.txt"
    file_path.write_text(f"Content of file {i}\n" * 100)
    print(f"  Created {file_path.name} ({file_path.stat().st_size} bytes)")

Creating test files in: /tmp/tmpmazylr0x
  Created file_000.txt (1800 bytes)
  Created file_001.txt (1800 bytes)
  Created file_002.txt (1800 bytes)
  Created file_003.txt (1800 bytes)
  Created file_004.txt (1800 bytes)


In [17]:
# Prepare items for sharding
items = [
    {
        "source": str(temp_dir / f"file_{i:03d}.txt"),
        "member_path": f"file_{i:03d}.txt",
        "id": f"file-{i:03d}",
        "meta": {"index": i, "type": "text"},
        "size_bytes": (temp_dir / f"file_{i:03d}.txt").stat().st_size,
    }
    for i in range(5)
]

# Split into shards (max 3 entries or 1MB per shard for demo)
shard_batches = list(create_shards(
    items,
    max_entries=3,
    max_bytes=1024 * 1024,  # 1 MB
))

print(f"Created {len(shard_batches)} shard batch(es)")
for i, batch in enumerate(list(shard_batches)):
    print(f"  Shard {i}: {len(batch)} items")

print(list(shard_batches))

Created 2 shard batch(es)
  Shard 0: 3 items
  Shard 1: 2 items
[[{'source': '/tmp/tmpmazylr0x/file_000.txt', 'member_path': 'file_000.txt', 'id': 'file-000', 'meta': {'index': 0, 'type': 'text'}, 'size_bytes': 1800}, {'source': '/tmp/tmpmazylr0x/file_001.txt', 'member_path': 'file_001.txt', 'id': 'file-001', 'meta': {'index': 1, 'type': 'text'}, 'size_bytes': 1800}, {'source': '/tmp/tmpmazylr0x/file_002.txt', 'member_path': 'file_002.txt', 'id': 'file-002', 'meta': {'index': 2, 'type': 'text'}, 'size_bytes': 1800}], [{'source': '/tmp/tmpmazylr0x/file_003.txt', 'member_path': 'file_003.txt', 'id': 'file-003', 'meta': {'index': 3, 'type': 'text'}, 'size_bytes': 1800}, {'source': '/tmp/tmpmazylr0x/file_004.txt', 'member_path': 'file_004.txt', 'id': 'file-004', 'meta': {'index': 4, 'type': 'text'}, 'size_bytes': 1800}]]


In [18]:
# Upload each shard
shard_entries = []
for i, batch in enumerate(shard_batches):
    archive_key = f"training/123/samples/shards/shard-{i:05d}.tar.gz"
    shard_entry = store.put_shard_archive(
        archive_key=archive_key,
        shard_items=batch,
        format="tar",
        compression="gzip",
    )
    shard_entries.append(shard_entry)
    print(f"Uploaded shard: {archive_key}")
    print(f"  Entry count: {shard_entry.get('count', 0)}")
    print(f"  Size: {shard_entry.get('size_bytes', 0)} bytes")

Uploaded shard: training/123/samples/shards/shard-00000.tar.gz
  Entry count: 3
  Size: 296 bytes
Uploaded shard: training/123/samples/shards/shard-00001.tar.gz
  Entry count: 2
  Size: 270 bytes


In [19]:
# Write shard entries to manifest
store.put_sharded(
    manifest_key="training/123/samples/manifest.jsonl",
    shard_entries=shard_entries,
    update_mode="append_parts",
)

print("Shard entries added to manifest")

Shard entries added to manifest


### Reading from Shards

Shards are automatically expanded when iterating objects:

## 5. ManifestWriter - Incremental Writing {#manifest-writer}

The `ManifestWriter` allows you to write manifests incrementally, automatically flushing parts to S3 and managing shard archives. This is ideal for large-scale data pipelines.

### Basic Usage with Loose Files

Write manifest entries incrementally. The writer buffers entries locally and flushes them as part manifests when thresholds are exceeded.

In [None]:
from s3exchange import ManifestWriter

# Open a manifest writer (use as context manager)
manifest_key = "training/123/samples/manifest-writer.jsonl"

with store.open_manifest_writer(
    manifest_key,
    mode="append_parts",  # Flush parts incrementally
    part_max_entries=10,  # Small threshold for demo
    part_max_bytes=1024,  # Small threshold for demo
) as writer:
    # Add existing files (assumes objects already exist in S3)
    writer.add_file(
        key="training/123/samples/file1.wav",
        id="file-001",
        meta={"sr": 16000},
    )
    
    # Put new objects and add to manifest
    writer.put_object(
        key="training/123/samples/new_file1.wav",
        data=b"new audio data 1",
        id="new-001",
        meta={"sr": 16000},
    )
    
    writer.put_object(
        key="training/123/samples/new_file2.wav",
        data=b"new audio data 2",
        id="new-002",
        meta={"sr": 16000},
    )
    
    # Check stats
    stats = writer.stats()
    print(f"Writer stats: {stats}")

# On exit, root manifest is automatically updated with part references
print(f"\nManifest written: {manifest_key}")

# Verify manifest
print("\nManifest entries:")
for entry in store.iter_manifest_entries(manifest_key):
    print(f"  - {entry['kind']}: {entry.get('key', entry.get('archive_key', 'N/A'))}")

Writer stats: {'part_entries': 3, 'part_bytes': 485, 'parts_uploaded': 0, 'shard_entries': 0, 'shard_bytes': 0, 'shards_created': 0, 'is_closed': False}

Manifest written: training/123/samples/manifest-writer.jsonl

Manifest entries:
  - file: training/123/samples/file1.wav
  - file: training/123/samples/new_file1.wav
  - file: training/123/samples/new_file2.wav


### Using ManifestWriter with Shards

The writer can automatically create shard archives when shard size policies are configured.

In [None]:
from s3exchange import ShardSizePolicy
import tempfile
from pathlib import Path

# Create temporary files for sharding demo
temp_dir = Path(tempfile.mkdtemp())
print(f"Creating test files in: {temp_dir}")

for i in range(8):
    file_path = temp_dir / f"shard_file_{i:03d}.txt"
    file_path.write_text(f"Content of shard file {i}\n" * 50)
    print(f"  Created {file_path.name} ({file_path.stat().st_size} bytes)")

# Open writer with shard support
manifest_key_shards = "training/123/samples/manifest-writer-shards.jsonl"

with store.open_manifest_writer(
    manifest_key_shards,
    mode="overwrite",
    shard_size=ShardSizePolicy(
        max_entries=3,  # Flush shard after 3 items
        max_bytes=5000,  # Or after 5KB
    ),
    shard_format="tar",
    # shard_compression="gzip",
    shard_compression=None,
) as writer:
    # Add files to shards
    for i in range(8):
        writer.add_to_shard(
            member_path=f"shard_file_{i:03d}.txt",
            source=str(temp_dir / f"shard_file_{i:03d}.txt"),
            id=f"shard-{i:03d}",
            meta={"index": i, "type": "text"},
        )
    
    # Can also mix loose files and shards
    writer.put_object(
        key="training/123/samples/loose_file.wav",
        data=b"loose audio data",
        id="loose-001",
    )
    
    stats = writer.stats()
    print(f"\nWriter stats: {stats}")
    print(f"  Shards created: {stats['shards_created']}")
    print(f"  Shard entries: {stats['shard_entries']}")

# Cleanup temp files
import shutil
shutil.rmtree(temp_dir)

# Verify manifest
print(f"\nManifest entries in {manifest_key_shards}:")
for entry in store.iter_manifest_entries(manifest_key_shards):
    kind = entry['kind']
    if kind == 'shard':
        print(f"  - {kind}: {entry.get('archive_key')} ({entry.get('count', 0)} items)")
    elif kind == 'file':
        print(f"  - {kind}: {entry.get('key')}")
    elif kind == 'manifest_ref':
        print(f"  - {kind}: {entry.get('key')}")

Creating test files in: /tmp/tmpze3edvqy
  Created shard_file_000.txt (1200 bytes)
  Created shard_file_001.txt (1200 bytes)
  Created shard_file_002.txt (1200 bytes)
  Created shard_file_003.txt (1200 bytes)
  Created shard_file_004.txt (1200 bytes)
  Created shard_file_005.txt (1200 bytes)
  Created shard_file_006.txt (1200 bytes)
  Created shard_file_007.txt (1200 bytes)

Writer stats: {'part_entries': 3, 'part_bytes': 643, 'parts_uploaded': 0, 'shard_entries': 2, 'shard_bytes': 2400, 'shards_created': 2, 'is_closed': False}
  Shards created: 2
  Shard entries: 2

Manifest entries in training/123/samples/manifest-writer-shards.jsonl:
  - shard: training/123/samples/shards/shard-000001.tar (3 items)
  - shard: training/123/samples/shards/shard-000002.tar (3 items)
  - file: training/123/samples/loose_file.wav
  - shard: training/123/samples/shards/shard-000003.tar (2 items)


In [3]:
# Read tar file from training/123/samples/shards/shard-000001.tar and iter the existing file names
stream = store.get_object("training/123/samples/shards/shard-000001.tar")

raw_data = stream.read()

import io
import tarfile

# Read the tar file from the raw data
tar = tarfile.open(fileobj=io.BytesIO(raw_data))

# Iter the existing file names
for member in tar.getmembers():
    print(f"  - {member.name}")

# Print the content of __manifest__.jsonl
manifest_data = tar.extractfile("__manifest__.jsonl").read()
print(manifest_data)



  - __manifest__.jsonl
  - shard_file_000.txt
  - shard_file_001.txt
  - shard_file_002.txt
b'{"kind":"file","key":"training/123/samples/shards/shard-000001.tar#shard_file_000.txt","member_path":"shard_file_000.txt","id":"shard-000","meta":{"index":0,"type":"text"},"size_bytes":1200}\n{"kind":"file","key":"training/123/samples/shards/shard-000001.tar#shard_file_001.txt","member_path":"shard_file_001.txt","id":"shard-001","meta":{"index":1,"type":"text"},"size_bytes":1200}\n{"kind":"file","key":"training/123/samples/shards/shard-000001.tar#shard_file_002.txt","member_path":"shard_file_002.txt","id":"shard-002","meta":{"index":2,"type":"text"},"size_bytes":1200}\n'


In [4]:
# Read from manitest
for stream, entry in store.iter_objects(manifest_key_shards):
# for stream, entry in store.iter_objects("training/123/samples/shards/shard-000002.tar"):
    # print(f"  - {entry['kind']}: {entry['key']}")
    print(f"  - {entry["id"]}")
    data = stream.read()
    print(f"    Size: {len(data)} bytes")
    stream.close()

<ExFileObject name='__manifest__.jsonl'> <class 'tarfile.ExFileObject'>
  - shard-000
    Size: 1200 bytes
  - shard-001
    Size: 1200 bytes
  - shard-002
    Size: 1200 bytes
<ExFileObject name='__manifest__.jsonl'> <class 'tarfile.ExFileObject'>
  - shard-003
    Size: 1200 bytes
  - shard-004
    Size: 1200 bytes
  - shard-005
    Size: 1200 bytes
  - loose-001
    Size: 16 bytes
<ExFileObject name='__manifest__.jsonl'> <class 'tarfile.ExFileObject'>
  - shard-006
    Size: 1200 bytes
  - shard-007
    Size: 1200 bytes


In [6]:
store.get_object("training/123/samples/shards/shard-000001.tar#shard_file_000.txt").read()

b'Content of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0\nContent of shard file 0

### Overwrite Mode

In overwrite mode, all entries are buffered and written as a single manifest on close.

In [None]:
manifest_key_overwrite = "training/123/samples/manifest-writer-overwrite.jsonl"

with store.open_manifest_writer(
    manifest_key_overwrite,
    mode="overwrite",  # Single manifest on close
    part_max_entries=10,  # Ignored in overwrite mode
) as writer:
    for i in range(5):
        writer.put_object(
            key=f"training/123/samples/overwrite_file_{i}.wav",
            data=f"data {i}".encode(),
            id=f"overwrite-{i:03d}",
        )
    
    stats = writer.stats()
    print(f"Entries written: {stats['part_entries']}")
    print(f"Parts uploaded: {stats['parts_uploaded']}")  # Should be 0 in overwrite mode

print(f"\nManifest written: {manifest_key_overwrite}")

# Verify - should be a single manifest file (no parts)
print("\nManifest entries:")
for entry in store.iter_manifest_entries(manifest_key_overwrite, resolve_refs=False):
    print(f"  - {entry['kind']}: {entry.get('key', entry.get('archive_key', 'N/A'))}")

### Error Handling

By default, if an exception occurs, the root manifest is not published (preventing partial/incomplete manifests). You can enable `publish_on_error=True` for debugging.

In [None]:
manifest_key_error = "training/123/samples/manifest-writer-error.jsonl"

# Default behavior: don't publish on error
try:
    with store.open_manifest_writer(manifest_key_error) as writer:
        writer.put_object(
            key="training/123/samples/error_file.wav",
            data=b"data",
            id="error-001",
        )
        # Simulate an error
        raise ValueError("Something went wrong!")
except ValueError as e:
    print(f"Caught error: {e}")

# Check if manifest exists (should not exist - not published)
exists = store.exists(manifest_key_error)
print(f"Manifest exists after error: {exists}")

# With publish_on_error=True, manifest would be published even on error
manifest_key_error_publish = "training/123/samples/manifest-writer-error-publish.jsonl"
try:
    with store.open_manifest_writer(
        manifest_key_error_publish,
        publish_on_error=True,  # Publish even on error
    ) as writer:
        writer.put_object(
            key="training/123/samples/error_file_publish.wav",
            data=b"data",
            id="error-publish-001",
        )
        raise ValueError("Something went wrong!")
except ValueError as e:
    print(f"\nCaught error: {e}")

# Check if manifest exists (should exist - published on error)
exists = store.exists(manifest_key_error_publish)
print(f"Manifest exists after error (publish_on_error=True): {exists}")

### Manual Flush Control

You can manually flush parts or shards if needed.

In [None]:
manifest_key_manual = "training/123/samples/manifest-writer-manual.jsonl"

with store.open_manifest_writer(
    manifest_key_manual,
    mode="append_parts",
    part_max_entries=100,  # High threshold - won't auto-flush
) as writer:
    # Add some entries
    for i in range(5):
        writer.put_object(
            key=f"training/123/samples/manual_file_{i}.wav",
            data=f"data {i}".encode(),
            id=f"manual-{i:03d}",
        )
    
    # Manually flush part
    part_key = writer.flush_part()
    if part_key:
        print(f"Manually flushed part: {part_key}")
    
    # Add more entries
    for i in range(5, 10):
        writer.put_object(
            key=f"training/123/samples/manual_file_{i}.wav",
            data=f"data {i}".encode(),
            id=f"manual-{i:03d}",
        )
    
    stats = writer.stats()
    print(f"\nFinal stats: {stats}")
    print(f"  Parts uploaded: {stats['parts_uploaded']}")

print(f"\nManifest written: {manifest_key_manual}")

In [None]:
# Shards are automatically expanded when iterating objects
print("Reading from shards:")
for stream, entry in store.iter_objects("training/123/samples/manifest.jsonl"):
    if entry.get('archive_key'):
        # This is a shard member
        print(f"  Shard member: {entry['member_path']}")
        print(f"    Archive: {entry['archive_key']}")
        print(f"    Virtual key: {entry['key']}")
    else:
        # Regular file
        print(f"  File: {entry['key']}")
    
    data = stream.read()
    print(f"    Size: {len(data)} bytes")
    stream.close()

## 6. Listing and Filtering {#listing}

In [None]:
# List S3 keys
print("S3 keys with prefix 'training/123/':")
for key in store.list_keys(prefix="training/123/"):
    print(f"  {key}")

In [None]:
# List files in manifest (including shard members)
print("Files in manifest:")
for entry in store.list_manifest_files(
    manifest="training/123/samples/manifest.jsonl",
    include_shards=True,
):
    key = entry.get('key', entry.get('member_path', 'N/A'))
    print(f"  {key}")

In [None]:
# Filter manifest entries by prefix
print("Filtered entries (prefix 'training/123/samples/shards'):")
for entry in store.list_by_manifest_prefix(
    manifest="training/123/samples/manifest.jsonl",
    prefix_filter="training/123/samples/shards",
):
    print(f"  {entry.get('key', entry.get('archive_key', 'N/A'))}")

## 7. Deletion Operations {#deletion}

In [None]:
# Delete a single object
# Note: Uncomment to actually delete
# store.delete_key("training/123/samples/file.wav")
# print("Deleted single object")

In [None]:
# Delete by prefix (with optional regex filter)
# Note: This is commented out to avoid deleting all demo data
# count = store.delete_prefix(
#     prefix="training/123/samples/",
#     regex=r".*\.wav$",  # Only delete .wav files
# )
# print(f"Deleted {count} objects")

In [None]:
# Delete by manifest (recursive)
# Note: This is commented out to avoid deleting all demo data
# report = store.delete_manifest(
#     manifest="training/123/samples/manifest.jsonl",
#     delete_manifests=True,  # Also delete manifest files
#     dedupe=True,  # Avoid double-deletion
# )
# print(f"Deleted {report['deleted_object_count']} objects")
# print(f"Deleted {report['deleted_archive_count']} archives")
# print(f"Deleted {report['deleted_manifest_count']} manifests")

## 8. Error Handling {#errors}

In [None]:
from s3exchange import (
    ObjectNotFoundError,
    ManifestNotFoundError,
    MissingPlaceholderError,
    InvalidManifestError,
    ShardReadError,
)

# Handle object not found
try:
    stream = store.get_object("nonexistent.wav")
except ObjectNotFoundError as e:
    print(f"Object not found: {e.key}")

# Handle missing placeholder
try:
    key = store._resolve_key("training/{job_id}/samples", {})  # Missing job_id
except MissingPlaceholderError as e:
    print(f"Missing placeholder: {e.placeholder}")

# Handle manifest not found
try:
    for entry in store.iter_manifest_entries("nonexistent/manifest.jsonl"):
        pass
except ManifestNotFoundError as e:
    print(f"Manifest not found: {e.manifest_key}")

## 9. Manifest Compaction

Flatten a manifest with many parts into a single clean manifest:

In [None]:
# Compact manifest (combines all parts into one)
# Note: This is commented out to avoid modifying demo data
# report = store.compact_manifest(
#     src_manifest_key="training/123/samples/manifest.jsonl",
#     dst_manifest_key="training/123/samples/manifest-compact.jsonl",
#     resolve_refs=True,  # Resolve all manifest_ref entries
#     expand_shards=False,  # Keep shard entries (set True to expand into files)
# )
# print(f"Compacted {report['total_entries']} entries")

In [None]:
from s3exchange.settings import S3Settings
from s3exchange.store import S3ExchangeStore


settings = S3Settings()

store = S3ExchangeStore(settings)

In [13]:
store.delete_prefix("")

45

In [2]:
list(store.list_keys(""))

['job/1/samples/augmented/manifest.jsonl',
 'job/1/samples/augmented/manifest_0.jsonl',
 'job/1/samples/augmented/manifest_1.jsonl',
 'job/1/samples/augmented/manifest_10.jsonl',
 'job/1/samples/augmented/manifest_11.jsonl',
 'job/1/samples/augmented/manifest_12.jsonl',
 'job/1/samples/augmented/manifest_13.jsonl',
 'job/1/samples/augmented/manifest_14.jsonl',
 'job/1/samples/augmented/manifest_15.jsonl',
 'job/1/samples/augmented/manifest_2.jsonl',
 'job/1/samples/augmented/manifest_3.jsonl',
 'job/1/samples/augmented/manifest_4.jsonl',
 'job/1/samples/augmented/manifest_5.jsonl',
 'job/1/samples/augmented/manifest_6.jsonl',
 'job/1/samples/augmented/manifest_7.jsonl',
 'job/1/samples/augmented/manifest_8.jsonl',
 'job/1/samples/augmented/manifest_9.jsonl',
 'job/1/samples/augmented/shards_0/shard-000001.tar',
 'job/1/samples/augmented/shards_1/shard-000001.tar',
 'job/1/samples/augmented/shards_10/shard-000001.tar',
 'job/1/samples/augmented/shards_11/shard-000001.tar',
 'job/1/sampl

In [12]:
len(list(store.get_manifest("job/1/samples/piper/manifest.jsonl").iter_entries()))

1

In [11]:
len(list(store.get_manifest("job/1/samples/piper/manifest.jsonl").iter_objects()))


1000

In [None]:
list(store.get_manifest("job/1/samples/piper/manifest.jsonl").iter_entries())[0]


{'kind': 'shard',
 'archive_key': 'job/1/samples/piper/shards/shard-000001.tar',
 'format': 'tar',
 'compression': None,
 'internal_manifest_path': '__manifest__.jsonl',
 'count': 1000,
 'size_bytes': 39188480,
 'meta': {'etag': '1ac779a9f261c81ed3ec23a87435a9eb'}}

In [2]:
keys = list(store.get_manifest("job/1/samples/piper/manifest.jsonl").get_keys())
len(keys)


1000

In [7]:
total_read = 0
for stream, entry in store.get_manifest("job/1/samples/piper/manifest.jsonl").iter_objects():
    content = stream.read()
    total_read += len(content)

print(total_read)


34116958


In [8]:
total_read = 0
for stream, entry in store.iter_objects_from_keys(store.get_manifest("job/1/samples/piper/manifest.jsonl").get_keys()):
    content = stream.read()
    total_read += len(content)

print(total_read)



34116958


In [None]:
store.get_object("job/1/samples/piper/manifest.jsonl").read()

b'{"kind":"shard","archive_key":"job/1/samples/piper/shards/shard-000001.tar","format":"tar","compression":null,"internal_manifest_path":"__manifest__.jsonl","count":1000,"size_bytes":39188480,"meta":{"etag":"1ac779a9f261c81ed3ec23a87435a9eb"}}\n'

In [4]:
keys = list(store.get_manifest("job/1/samples/augmented/manifest.jsonl").get_keys())
keys

['job/1/samples/augmented/shards_0/shard-000001.tar#0_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#1_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#2_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#3_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#4_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#5_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#6_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#7_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#8_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#9_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#10_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#11_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#12_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#13_aug_0.wav',
 'job/1/samples/augmented/shards_0/shard-000001.tar#14_aug_0.wav',
 'job

In [11]:
store.get_object_metadata(keys[0])

{'ResponseMetadata': {'HTTPStatusCode': 200,
  'HTTPHeaders': {'last-modified': 'Sun, 08 Feb 2026 01:28:38 GMT',
   'accept-ranges': 'bytes',
   'etag': '"80183da40d35c66d179b242b15500ea2"',
   'content-type': 'application/x-ndjson',
   'content-length': '248',
   'date': 'Sun, 08 Feb 2026 01:35:35 GMT'},
  'RetryAttempts': 0},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2026, 2, 8, 1, 28, 38, tzinfo=tzutc()),
 'ContentLength': 248,
 'ETag': '"80183da40d35c66d179b242b15500ea2"',
 'ContentType': 'application/x-ndjson',
 'Metadata': {}}