# Example Hub and Local Operation Collections

This notebook demonstrates the complete workflow for uploading and downloading operations collections using the 
`HubAnalysisOpManager` and loading local operations via `IT_ANALYSIS_OP_PATHS`. The workflow includes:

1. Setting up local op collection path via IT_ANALYSIS_OP_PATHS
2. Copying the current hub_op_collection folder to /tmp/
3. Uploading operations to HuggingFace Hub as a private repository
4. Downloading the uploaded collection to the default cache
5. Re-importing interpretune to verify both hub and local operations are available
6. Testing the loaded operations
7. Cleaning up downloaded operations and re-importing
8. Verifying only local operations remain available
9. Final cleanup of the local operations collection

```python

**Note**: This example requires HuggingFace Hub authentication and will create a private repository.
```

## Setup and Imports

In [None]:
import os
from pathlib import Path

# Import interpretune components
import interpretune
from interpretune.analysis.ops.hub_manager import HubAnalysisOpManager
from interpretune.analysis import IT_ANALYSIS_CACHE, IT_ANALYSIS_HUB_CACHE, IT_ANALYSIS_OP_PATHS, IT_MODULES_CACHE
from interpretune.base.components.cli import IT_BASE

# Import utility functions for op collection demo setup/cleanup
import it_examples.notebooks.example_op_collections.op_collection_demo_utils as op_demo_utils

example_op_collections_dir = Path(IT_BASE / "notebooks" / "example_op_collections")
example_hub_op_collection_dir = Path(example_op_collections_dir / "hub_op_collection")
example_local_op_collection_dir = Path(example_op_collections_dir / "local_op_collection")

# Print environment summary
op_demo_utils.print_env_summary(interpretune.version, IT_ANALYSIS_CACHE, IT_MODULES_CACHE, IT_ANALYSIS_HUB_CACHE,
                                 IT_ANALYSIS_OP_PATHS, example_hub_op_collection_dir, example_local_op_collection_dir)

## Step 1: Stage example local op collections to a temporary directory

Copy the local_op_collection to /tmp/ and add it to IT_ANALYSIS_OP_PATHS so local operations are loaded.

In [None]:
# Define source and destination paths for local ops
source_local_op_collection = example_local_op_collection_dir
tmp_local_op_collection = Path("/tmp/local_op_collection")

# copy our local op collection to `tmp_local_op_collection` and that path to our IT_ANALYSIS_OP_PATHS env var
original_op_paths_env, new_op_paths = op_demo_utils.setup_local_op_collection(
    source_local_op_collection=source_local_op_collection,
    tmp_local_op_collection=tmp_local_op_collection
)

## Step 2: Copy hub op_collection to /tmp/

Copy the hub op_collection folder to /tmp/ for upload to the hub.

In [None]:
# Define source and destination paths for hub ops
source_op_collection = example_hub_op_collection_dir
tmp_op_collection = Path("/tmp/hub_op_collection")

# Stage a hub op collection using utility function
op_demo_utils.setup_hub_op_collection(source_op_collection=source_op_collection, tmp_op_collection=tmp_op_collection)

## Step 3: Upload operations to HuggingFace Hub

Upload the hub op_collection to HuggingFace Hub as a private repository named "trivial_op_repo".

In [None]:
from huggingface_hub import whoami
current_user = whoami()['name']

# Initialize the hub manager
hub_manager = HubAnalysisOpManager()

# Repository configuration
repo_name = "trivial_op_repo"
private = True

print("Uploading op_collection to HuggingFace Hub...")
print(f"Current HF user: {current_user}")
print(f"Repository: {repo_name}")
print(f"Private: {private}")
print(f"Source folder: {tmp_op_collection}")

# Ensure the user is authenticated
repo_id = f'{current_user}/{repo_name}'
try:
    # Upload operations to hub
    # 1. This will create the specified repository if it doesn't exist
    # 2. If the repo exists, it will clean existing operations and upload the new ones in a single commit
    #    - If no files have changed, it will skip the commit and leave the repository unchanged

    upload_result = hub_manager.upload_ops(
        local_dir=tmp_op_collection, repo_id=repo_id, private=private, clean_existing=True)

    print(f"✓ Successfully uploaded operations (if necessary) to {repo_name}")
    print(f"Upload result (new or latest op repo commit sha): {upload_result}")

except Exception as e:
    print(f"❌ Error uploading operations: {e}")
    raise

## Step 4: Download operations to default hub cache

Download the uploaded operations collection to the default `IT_ANALYSIS_HUB_CACHE` location.

In [None]:
print(f"Downloading operations from {repo_id} to default cache...")
print(f"Cache location: {IT_ANALYSIS_HUB_CACHE}")

# Initialize download_result to None so we can safely check it in cleanup step
download_result = None

try:
    # Download operations from hub to default cache
    download_result = hub_manager.download_ops(repo_id=repo_id) # no cache_dir default IT_ANALYSIS_HUB_CACHE is used

    print("✓ Successfully downloaded operations to cache")
    print(f"Download result: {download_result}")

    # Check what was downloaded
    cache_path = Path(IT_ANALYSIS_HUB_CACHE)
    if cache_path.exists():
        print("\nContents of hub cache:")
        for item in cache_path.rglob("*"):
            if item.is_file():
                rel_path = item.relative_to(cache_path)
                print(f"  - {rel_path}")

except Exception as e:
    print(f"❌ Error downloading operations: {e}")
    raise

## Step 5: Re-import interpretune and verify hub and local operations

Re-import interpretune to pick up both hub and local operations and verify they are available.

In [None]:
print("Re-importing interpretune to pick up hub and local operations...")
# Remove interpretune modules from sys.modules to force reimport
op_demo_utils.purge_it_modules_from_sys()

# ruff: noqa: E402

# Re-import interpretune
import interpretune as it
from interpretune import DISPATCHER

print("✓ Interpretune re-imported")

# Get operation definitions and generate summary
operation_definitions = DISPATCHER.registered_ops
op_demo_utils.generate_op_summary(operation_definitions)

# Show operations by type
canonical_ops, alias_map, hub_ops, local_ops, composed_ops, builtin_ops = \
    op_demo_utils.categorize_operations(operation_definitions)

# Demo lazy operation instantiation
op_demo_utils.demo_lazy_op_instantiation(it, hub_ops, local_ops)

## Step 6: Test executing the loaded operations

Test executing simple hub and local operations both individually executed and as part of a composite operation to ensure loading and execution works correctly.

In [None]:
print("\n🧪 Testing loaded operations with demo data...")

# Import required components
from interpretune import trivial_test_op, trivial_local_test_op, composite_trivial_test_op

NUM_BATCHES = 2  # Number of test batches to generate
VERBOSE_OP_OUTPUTS = False  # Set to True to log operation outputs

# Test the operations
print(f"\n📋 Testing operation pipeline parity of composite vs individual component ops (over {NUM_BATCHES} batches):")
individual_op_output_batches = []
composite_op_output_batches = []

for batch_name, individual_test_batch, composite_test_batch in op_demo_utils.generate_test_batches(NUM_BATCHES):
    print("\nComposite op execution...")
    if VERBOSE_OP_OUTPUTS:
        print(f"\n--- {batch_name} ---")
        print(f"Input batch: {individual_test_batch}")
    composite_output_batch = composite_trivial_test_op(analysis_batch=composite_test_batch)
    op_demo_utils.maybe_print_output(f"Composite op output batch: {composite_output_batch}", VERBOSE_OP_OUTPUTS)
    composite_op_output_batches.append(composite_output_batch)

    print("\nRe-running with individual component ops...")
    local_batch_output = trivial_local_test_op(analysis_batch=individual_test_batch)
    op_demo_utils.maybe_print_output(f"Local op batch output: {local_batch_output}", VERBOSE_OP_OUTPUTS)
    individual_output_batch = trivial_test_op(analysis_batch=local_batch_output)
    op_demo_utils.maybe_print_output(f"Hub output batch: {individual_output_batch}", VERBOSE_OP_OUTPUTS)
    individual_op_output_batches.append(individual_output_batch)

# Compare outputs using utility function
all_match = op_demo_utils.compare_operation_outputs(individual_op_output_batches, composite_op_output_batches)

## Step 7: Clean up hub operations and re-import

Delete the downloaded hub operations folder and re-import interpretune to verify only local operations remain.

In [None]:
print("Cleaning up downloaded hub operations...")

# Remove only the specific repository we downloaded, not the entire hub cache
op_demo_utils.cleanup_hub_repository(download_result)

# Re-import interpretune again
print("\nRe-importing interpretune after cleanup...")

# Capture stdout and stderr during import to check for the expected warning
stdout_output, stderr_output, DISPATCHER = op_demo_utils.reimport_interpretune_with_capture()

op_demo_utils.inspect_err_for_composite_op_warning(stderr_output)

print("\n ✓ Interpretune re-imported after cleanup")

## Step 8: Verify only local operations remain

Verify that only the local and built-in operations are available after hub cleanup.

In [None]:
print("Verifying operations after cleanup...")

# Get operation definitions after cleanup and verify cleanup status
operation_definitions_after = DISPATCHER.registered_ops
op_demo_utils.verify_cleanup_status(operation_definitions_after)

## Cleanup temporary files

Clean up the temporary files created during this example.

In [None]:
# Clean up using utility function
op_demo_utils.cleanup_op_collections(tmp_op_collection=tmp_op_collection,
                                     tmp_local_op_collection=tmp_local_op_collection,
                                     original_op_paths_env=original_op_paths_env)

print("\n🎉 Hub and Local operations workflow example completed successfully!")
print("\nSummary of what was demonstrated:")
print("1. ✓ Setup local op collection path via IT_ANALYSIS_OP_PATHS environment variable")
print("2. ✓ Copied hub op_collection to /tmp/ with overwrite warning")
print("3. ✓ Uploaded operations to HuggingFace Hub as private repo")
print("4. ✓ Downloaded operations to default hub cache")
print("5. ✓ Re-imported interpretune and verified both hub and local operations")
print("6. ✓ Tested operation instantiation and execution with demo data")
print("7. ✓ Cleaned up hub operations and re-imported")
print("8. ✓ Verified only local and built-in operations remain available")
print("9. ✓ Restored original IT_ANALYSIS_OP_PATHS environment variable")

## Step 9: Final verification after environment cleanup

Re-import interpretune one final time to verify that local operations are no longer available after unsetting IT_ANALYSIS_OP_PATHS.

In [None]:
print("Final verification: Re-importing interpretune after environment cleanup...")

# Remove interpretune modules from sys.modules to force reimport
op_demo_utils.purge_it_modules_from_sys()

# Re-import interpretune one final time
import interpretune
from interpretune import DISPATCHER

print("✓ Interpretune re-imported after environment cleanup")

# Get operation definitions after complete cleanup and generate final summary
operation_definitions_final = DISPATCHER.registered_ops
canonical_ops_final, alias_map_final, hub_ops_final, local_ops_final, composed_ops_final, builtin_ops = \
    op_demo_utils.categorize_operations(operation_definitions_final)

print("\n📊 Final Operation Summary (after complete cleanup):")
print(f"  Total registered names: {len(operation_definitions_final)}")
print(f"  Unique operations: {len(canonical_ops_final)}")
print(f"  Hub operations: {len(hub_ops_final)}")
print(f"  Local operations: {len(local_ops_final)}")
print(f"  Composed operations: {len(composed_ops_final)}")
print(f"  Built-in operations: {len(builtin_ops)}")

# Verify complete cleanup
if len(hub_ops_final) == 0 and len(local_ops_final) == 0:
    print("\n🎯 Perfect! Complete cleanup successful - only built-in and composed operations remain!")
elif len(hub_ops_final) == 0:
    print(f"\n⚠️ Hub operations cleaned up, but {len(local_ops_final)} local operations still present:")
    for op_name, op_def in local_ops_final.items():
        aliases = alias_map_final.get(op_name, [])
        all_names = [op_name] + aliases
        print(f"    - {op_name} (accessible as: {', '.join(all_names)})")
elif len(local_ops_final) == 0:
    print(f"\n⚠️ Local operations cleaned up, but {len(hub_ops_final)} hub operations still present:")
    for op_name, op_def in hub_ops_final.items():
        aliases = alias_map_final.get(op_name, [])
        all_names = [op_name] + aliases
        print(f"    - {op_name} (accessible as: {', '.join(all_names)})")
else:
    print(f"\n❌ Cleanup incomplete: {len(hub_ops_final)} hub ops and {len(local_ops_final)} local ops still present")

print("\nEnvironment verification:")
print(f"  Current IT_ANALYSIS_OP_PATHS env var: '{os.environ.get('IT_ANALYSIS_OP_PATHS', 'Not set')}'")