# Resilient Object Streaming in AIStore

The following demo shows how to use `ObjectFile` (`aistore.sdk.obj.object_file`) to stream large objects amidst potential instances of `ChunkedEncodingError` due to momentary issues with the cluster or its availability mid-read:

In [None]:
# Step 0: Import Necessary Libraries

import os
import tarfile
import requests
import urllib3

from aistore.sdk.client import Client

In [None]:
# Step 1: Initialize AIStore Client with Retries

AIS_ENDPOINT = "http://localhost:8080"

# Define custom retry logic for requests to AIS. This will also be used when re-establishing streams (in the case of object.get().as_file()).
# If you want to retry in the case of total pod failure, be sure to force retries on specific HTTP response codes that are not typically retried
# In particular, 400 and 404 are what you might see as the client attempts to redirect requests to an object on a missing target
# The timing on each retry is determined by (backoff_factor * 2^retry_count) -- here the last and longest retry waits 512 seconds
retry = urllib3.Retry(total=10, backoff_factor=0.5, status_forcelist=[400, 404])
client = Client(AIS_ENDPOINT, retry=retry)

In [None]:
# Step 2: Prepare Bucket w/ ASR Tar File

LIBRISPEECH_URL = "http://www.openslr.org/resources/12/dev-clean.tar.gz"
DOWNLOADED_FILE_PATH = "./dev-clean.tar.gz"
EXTRACT_PATH = "./librispeech_extract"
OBJECT_NAME = "librispeech-dev-clean.tar.gz"
BUCKET_NAME = "test-librispeech-bucket"

# Step 2a: Download the compressed tar.gz file
if not os.path.exists(DOWNLOADED_FILE_PATH):
    response = requests.get(LIBRISPEECH_URL, stream=True, timeout=10)
    with open(DOWNLOADED_FILE_PATH, "wb") as f:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                print(f"\rDownloading {LIBRISPEECH_URL}... {f.tell() / 1024 / 1024:.2f}MB", end="")
                f.write(chunk)

# Step 2b: Upload the tar.gz file to AIStore directly
client.bucket(BUCKET_NAME).create(exist_ok=True)
client.bucket(BUCKET_NAME).object(OBJECT_NAME).put_file(DOWNLOADED_FILE_PATH)


The `ObjectFile` implementation catches instances of `ChunkedEncodingError` mid-read and retries a new object stream from the last known position to resume safely, where `max_resume` dictates the number of resumes we will allow for a single read-operation:

In [None]:
# Step 3: Open the Object File & Read

# Step 3a: Stream the object file and use tarfile.open to extract
with client.bucket(BUCKET_NAME).object(OBJECT_NAME).get().as_file(max_resume=3) as file_obj:
    with tarfile.open(fileobj=file_obj, mode='r|*') as tar:
        if not os.path.exists(EXTRACT_PATH):
            os.makedirs(EXTRACT_PATH)
        tar.extractall(path=EXTRACT_PATH)

# Step 3b: Print the directory structure
def print_directory_tree(startpath, indent="|-- "):
    """Prints the directory tree structure in a simple way."""
    for root, _, files in os.walk(startpath):
        level = root.replace(startpath, "").count(os.sep)
        indent_str = " " * 4 * level + indent
        print(f"{indent_str}{os.path.basename(root)}/")
        subindent = " " * 4 * (level + 1) + indent
        for f in files:
            print(f"{subindent}{f}")

print("Extracted Directory Structure:")
print_directory_tree(EXTRACT_PATH)

In [None]:
# Step 5 (Optional): Clean Up

client.bucket(BUCKET_NAME).delete(missing_ok=True)
os.remove(DOWNLOADED_FILE_PATH)
if os.path.exists(EXTRACT_PATH):
    os.system(f"rm -rf {EXTRACT_PATH}")

For more information, please refer to the [Python SDK documentation](https://github.com/NVIDIA/aistore/blob/main/docs/python_sdk.md#object_file).