In [None]:
import sys
sys.path.append('/home/jovyan/api')

from datetime import datetime
from jobs.assets.base import AssetProcessor
from utils.db_utils import init_mongo
import requests
from langfuse import Langfuse
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Initialize connections
db = init_mongo()

In [None]:
def setup_debug_session(file_hash: str):
    """Setup a new debug processing session for an asset"""
    run_id = f"debug-{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
    
    # Update asset with debug run_id
    db["raw_assets"].update_one(
        {"file_hash": file_hash},
        {"$set": {"current_run_id": run_id}}
    )
    
    # Initialize trace
    trace = Langfuse().trace(
        name="asset-processing-debug",
        id=run_id,
        metadata={
            "file_hash": file_hash,
            "debug_session": True,
            "timestamp": datetime.now().isoformat()
        }
    )
    
    return run_id, trace

In [None]:
def execute_single_processor(file_hash: str, processor_type: str, run_id: str):
    """
    Execute a single processor and return its results
    """
    processor = AssetProcessor(file_hash, processor_type)
    
    # Create span for this processor
    span = processor.trace.span(
        name=f"{processor_type}_processing_debug",
        metadata={
            "processor_type": processor_type,
            "dependencies": AssetProcessor.PROCESSOR_REGISTRY[processor_type],
            "debug_execution": True
        }
    )
    
    try:
        # Make API call to processor endpoint
        headers = {"X-Span-ID": span.id, "X-Run-ID": run_id}
        response = requests.post(
            f"http://nginx:80/assets/process_{processor_type}/{file_hash}",
            headers=headers
        )
        
        if not response.ok:
            raise Exception(f"Processing failed with status {response.status_code}: {response.text}")
            
        result = response.json()
        
        span.event(
            name=f"{processor_type}_completed",
            metadata={"result": result}
        )
        
        return result
        
    except Exception as e:
        span.event(
            name=f"{processor_type}_error",
            metadata={"error": str(e)},
            level="error"
        )
        raise
    finally:
        span.end()

In [None]:
def check_processor_dependencies(processor_type: str):
    """Check and display dependencies for a processor"""
    deps = AssetProcessor.PROCESSOR_REGISTRY[processor_type]
    print(f"Processor: {processor_type}")
    print(f"Dependencies: {deps if deps else 'None'}")
    return deps

In [None]:
def display_processing_dag():
    """Display the full processing DAG"""
    print("Processing DAG:")
    print("Initial processors (no dependencies):")
    for proc, deps in AssetProcessor.PROCESSOR_REGISTRY.items():
        if not deps:
            print(f"  - {proc}")
    
    print("\nDependent processors:")
    for proc, deps in AssetProcessor.PROCESSOR_REGISTRY.items():
        if deps:
            print(f"  - {proc} (depends on: {deps})")

In [None]:
# Set your file hash and display DAG
file_hash = "your_file_hash_here"  # Replace with actual file hash
display_processing_dag()

In [None]:
# Setup debug session
run_id, trace = setup_debug_session(file_hash)
print(f"Debug session initialized with run_id: {run_id}")

In [None]:
# Process tables (no dependencies)
try:
    print("Processing tables...")
    check_processor_dependencies("tables")
    result_tables = execute_single_processor(file_hash, "tables", run_id)
    print("Tables processing complete!")
except Exception as e:
    print(f"Error processing tables: {str(e)}")

In [None]:
# Process images (no dependencies)
try:
    print("Processing images...")
    check_processor_dependencies("images")
    result_images = execute_single_processor(file_hash, "images", run_id)
    print("Images processing complete!")
except Exception as e:
    print(f"Error processing images: {str(e)}")

In [None]:
# Process refined (no dependencies)
try:
    print("Processing refined...")
    check_processor_dependencies("refined")
    result_refined = execute_single_processor(file_hash, "refined", run_id)
    print("Refined processing complete!")
except Exception as e:
    print(f"Error processing refined: {str(e)}")

In [None]:
# Process refined_metadata (depends on refined)
try:
    print("Processing refined_metadata...")
    check_processor_dependencies("refined_metadata")
    result_refined_metadata = execute_single_processor(file_hash, "refined_metadata", run_id)
    print("Refined metadata processing complete!")
except Exception as e:
    print(f"Error processing refined_metadata: {str(e)}")