In [1]:
import yt.wrapper as yt
import uuid
import random

# YTsaurus configuration
yt.config["pickling"]["dynamic_libraries"]["enable_auto_collection"] = False
yt.config["pickling"]["ignore_system_modules"] = True
yt.config["pickling"]["safe_stream_mode"] = False

# Setup working directory
username = yt.get_user_name()
if yt.exists(f"//sys/users/{username}/@user_info/home_path"):
    home = yt.get(f"//sys/users/{username}/@user_info/home_path")
    working_dir = f"{home}/{uuid.uuid4().hex}"
else:
    working_dir = f"//home/orchestracto/public_registry/Hyperplane/{uuid.uuid4().hex}"

yt.create("map_node", working_dir)
print(f"Working directory: {working_dir}")



Working directory: //home/hyperplane/3aca1b5d2ac1476db73e0a637404902a


In [2]:
def prepare_tts_data():
    """Prepare TTS requests data with various examples"""
    table_path = f"{working_dir}/tts_requests"
    yt.create("table", table_path, force=True)

    # Add variety of requests with different voices and contexts
    tts_requests = [
        # Customer service examples
        {
            "request_id": "customer_welcome",
            "text": "Welcome to our customer service. How can I help you today?",
            "voice": "tara",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "order_status",
            "text": "I can help you track your order. Please provide your order number, and I'll look that up for you right away.",
            "voice": "leah",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "technical_support",
            "text": "I understand you're experiencing technical difficulties. Let me connect you with our specialized support team who can assist you better.",
            "voice": "leo",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # IVR/Phone system examples
        {
            "request_id": "phone_menu",
            "text": "Thank you for calling. Press 1 for sales, 2 for support, 3 for billing, or stay on the line to speak with a representative.",
            "voice": "zoe",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "hold_message",
            "text": "All of our agents are currently busy. Your call is important to us. Please stay on the line and you'll be connected shortly.",
            "voice": "dan",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Notification examples
        {
            "request_id": "appointment_reminder",
            "text": "This is a reminder that you have an appointment scheduled for tomorrow at 2:30 PM. Reply YES to confirm or NO to reschedule.",
            "voice": "mia",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "delivery_notification",
            "text": "Good news! Your package has been delivered and is waiting for you at your front door. Thank you for shopping with us.",
            "voice": "jess",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Voice assistant examples
        {
            "request_id": "weather_update",
            "text": "Today's weather forecast shows partly cloudy skies with a high of 72 degrees. There's a 20% chance of rain in the evening.",
            "voice": "zac",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "alarm_set",
            "text": "I've set your alarm for 7:00 AM tomorrow morning. Would you like me to set any additional reminders?",
            "voice": "tara",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # E-learning/Tutorial examples
        {
            "request_id": "tutorial_intro",
            "text": "Welcome to this tutorial on machine learning basics. In this lesson, we'll explore the fundamental concepts of neural networks.",
            "voice": "leo",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "course_complete",
            "text": "Congratulations! You've successfully completed the course. Your certificate has been emailed to you. Keep up the great work!",
            "voice": "leah",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Accessibility examples
        {
            "request_id": "screen_reader",
            "text": "New message received from John Smith. Subject: Meeting tomorrow. Would you like me to read the full message?",
            "voice": "mia",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Emergency/Alert examples
        {
            "request_id": "emergency_alert",
            "text": "This is an emergency broadcast. Please remain calm and follow the evacuation procedures. Move to the nearest exit immediately.",
            "voice": "dan",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Marketing/Advertisement examples
        {
            "request_id": "special_offer",
            "text": "Don't miss out on our special weekend sale! Get 50% off all items in store. This offer ends Sunday at midnight.",
            "voice": "zoe",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Different emotional tones
        {
            "request_id": "empathetic_response",
            "text": "I understand this must be frustrating for you. Let me personally ensure we resolve this issue as quickly as possible.",
            "voice": "jess",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "enthusiastic_greeting",
            "text": "Hey there! Welcome to our amazing community! We're so excited to have you join us on this incredible journey!",
            "voice": "zac",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Multi-language examples (if supported)
        {
            "request_id": "spanish_greeting",
            "text": "Hola! Bienvenido a nuestro servicio. ¿En qué puedo ayudarte hoy?",
            "voice": "tara",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "french_greeting",
            "text": "Bonjour! Bienvenue dans notre service. Comment puis-je vous aider aujourd'hui?",
            "voice": "leah",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        
        # Different speech rates/styles
        {
            "request_id": "slow_clear_speech",
            "text": "Please... listen... carefully... to... these... important... instructions.",
            "voice": "leo",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        },
        {
            "request_id": "natural_conversation",
            "text": "Oh, hey! Yeah, I totally get what you mean. Let me, uh, let me check that for you real quick.",
            "voice": "zac",
            "model_name": "canopylabs/orpheus-3b-0.1-ft"
        }
    ]
    
    yt.write_table(table_path, tts_requests)
    print(f"Created {len(tts_requests)} TTS requests in {table_path}")
    
    # Print summary by voice
    voice_counts = {}
    for req in tts_requests:
        voice = req["voice"]
        voice_counts[voice] = voice_counts.get(voice, 0) + 1
    
    print("\nRequests by voice:")
    for voice, count in sorted(voice_counts.items()):
        print(f"  {voice}: {count} requests")
    
    return table_path

In [4]:
from huggingface_hub import login
from getpass import getpass

hf_token = getpass("Paste your Hugging Face token: ")
login(hf_token)

In [3]:
# Fixed bulk TTS inference function adapted from working Colab code

import sys
import time
import io
import base64
import os

@yt.aggregator
def bulk_tts_inference(records):
    """Bulk TTS inference aggregator for YTsaurus"""
    import sys
    import logging
    import time
    import io
    import base64
    import os
    
    # Setup logging to stderr (required for YT jobs)
    logging.basicConfig(stream=sys.stderr, level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    try:
        # Install required packages in the job environment
        import subprocess
        logger.info("Installing required packages...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "snac==1.2.0", "-q"])
        subprocess.check_call([sys.executable, "-m", "pip", "install", "soundfile", "-q"])
        
        # Now import the modules
        from snac import SNAC
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import numpy as np
        import soundfile as sf
        from huggingface_hub import login
        
        logger.info("Packages installed successfully")
        
    except Exception as e:
        logger.error(f"Failed to install/import packages: {str(e)}")
        # Yield error for all records
        for record in records:
            yield {
                "request_id": record["request_id"],
                "success": False,
                "error": f"Package installation failed: {str(e)}",
                "audio_bytes": None,
                "duration_seconds": 0.0,
                "generation_time": 0.0
            }
        return
    
    
    # Initialize models (cached for batch processing)
    logger.info("Loading Orpheus TTS models...")
    
    try:
        # Load SNAC model for audio decoding
        snac_model = SNAC.from_pretrained("hubertsiuzdak/snac_24khz")
        snac_model = snac_model.to("cpu")  # Keep on CPU to save VRAM
        
        # Load Orpheus model
        model_name = "canopylabs/orpheus-3b-0.1-ft"
        
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            use_auth_token=hf_token
        )
        model.cuda()
        tokenizer = AutoTokenizer.from_pretrained(
            model_name,
            use_auth_token=hf_token
        )
        
        logger.info("Models loaded successfully")
        
    except Exception as e:
        logger.error(f"Failed to load models: {str(e)}")
        # Yield error for all records
        for record in records:
            yield {
                "request_id": record["request_id"],
                "success": False,
                "error": f"Model loading failed: {str(e)}",
                "audio_bytes": None,
                "duration_seconds": 0.0,
                "generation_time": 0.0
            }
        return
    
    def audio_to_bytes(audio_data, format="flac"):
        """Convert audio data to bytes"""
        buffer = io.BytesIO()
        sf.write(buffer, audio_data, 24000, format=format.upper())
        buffer.seek(0)
        return buffer.read()
    
    def redistribute_codes(code_list):
        """Redistribute audio codes for SNAC decoding - from original code"""
        layer_1 = []
        layer_2 = []
        layer_3 = []
        for i in range((len(code_list)+1)//7):
            if 7*i < len(code_list):
                layer_1.append(code_list[7*i])
            if 7*i+1 < len(code_list):
                layer_2.append(code_list[7*i+1]-4096)
            if 7*i+2 < len(code_list):
                layer_3.append(code_list[7*i+2]-(2*4096))
            if 7*i+3 < len(code_list):
                layer_3.append(code_list[7*i+3]-(3*4096))
            if 7*i+4 < len(code_list):
                layer_2.append(code_list[7*i+4]-(4*4096))
            if 7*i+5 < len(code_list):
                layer_3.append(code_list[7*i+5]-(5*4096))
            if 7*i+6 < len(code_list):
                layer_3.append(code_list[7*i+6]-(6*4096))
        codes = [torch.tensor(layer_1).unsqueeze(0),
               torch.tensor(layer_2).unsqueeze(0),
               torch.tensor(layer_3).unsqueeze(0)]
        audio_hat = snac_model.decode(codes)
        return audio_hat
    
    # Convert records to list for batch processing
    record_list = list(records)
    
    # Process records one by one (more stable for GPU memory)
    for record in record_list:
        try:
            request_id = record["request_id"]
            text = record["text"]
            voice = record["voice"]
            prompt = f"{voice}: {text}"
            
            logger.info(f"Processing {request_id}: {prompt[:50]}...")
            
            # Format prompt into correct template (from original code)
            input_ids = tokenizer(prompt, return_tensors="pt").input_ids
            
            start_token = torch.tensor([[128259]], dtype=torch.int64)  # Start of human
            end_tokens = torch.tensor([[128009, 128260]], dtype=torch.int64)  # End of text, End of human
            
            modified_input_ids = torch.cat([start_token, input_ids, end_tokens], dim=1)
            
            input_ids = modified_input_ids.to("cuda")
            attention_mask = torch.ones_like(input_ids).to("cuda")
            
            # Generate speech tokens
            start_time = time.time()
            
            with torch.no_grad():
                generated_ids = model.generate(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    max_new_tokens=2000,
                    do_sample=True,
                    temperature=0.6,
                    top_p=0.95,
                    repetition_penalty=1.1,
                    num_return_sequences=1,
                    eos_token_id=128258,
                )
            
            generation_time = time.time() - start_time
            
            # Parse output as speech (from original code)
            token_to_find = 128257
            token_to_remove = 128258
            
            # For single batch, generated_ids is 2D, so we use the same logic as original
            token_indices = (generated_ids == token_to_find).nonzero(as_tuple=True)
            
            if len(token_indices[1]) > 0:
                last_occurrence_idx = token_indices[1][-1].item()
                cropped_tensor = generated_ids[:, last_occurrence_idx+1:]
            else:
                cropped_tensor = generated_ids
            
            # Process the single row
            row = cropped_tensor[0]  # Get first (and only) row
            masked_row = row[row != token_to_remove]
            
            # Convert to code list
            row_length = masked_row.size(0)
            new_length = (row_length // 7) * 7
            trimmed_row = masked_row[:new_length]
            code_list = [t.item() - 128266 for t in trimmed_row]
            
            if len(code_list) > 0:
                # Generate audio
                samples = redistribute_codes(code_list)
                audio_data = samples.detach().squeeze().to("cpu").numpy()
                
                # Convert to bytes
                audio_bytes = audio_to_bytes(audio_data, "flac")
                
                # Calculate metrics
                duration_seconds = len(audio_data) / 24000  # 24kHz sample rate
                
                logger.info(f"✅ {request_id}: {duration_seconds:.2f}s audio generated in {generation_time:.2f}s")
                
                yield {
                    "request_id": request_id,
                    "success": True,
                    "text": text,
                    "voice": voice,
                    "audio_bytes": base64.b64encode(audio_bytes).decode('utf-8'),
                    "duration_seconds": duration_seconds,
                    "generation_time": generation_time,
                    "text_length": len(text),
                    "tokens_generated": len(code_list),
                    "error": None
                }
            else:
                yield {
                    "request_id": request_id,
                    "success": False,
                    "text": text,
                    "voice": voice,
                    "error": "No audio tokens generated",
                    "audio_bytes": None,
                    "duration_seconds": 0.0,
                    "generation_time": generation_time
                }
                
        except Exception as e:
            logger.error(f"❌ Error processing {record['request_id']}: {str(e)}")
            yield {
                "request_id": record["request_id"],
                "success": False,
                "text": record.get("text", ""),
                "voice": record.get("voice", ""),
                "error": str(e),
                "audio_bytes": None,
                "duration_seconds": 0.0,
                "generation_time": 0.0
            }

# Run function with proper error handling
def run_bulk_tts_inference():
    """Run bulk TTS inference with pre-installed dependencies"""
    
    # 1. Prepare TTS data
    table_path = prepare_tts_data()
    
    # 2. Create result path
    result_path = f"{working_dir}/results"
    
    print("🚀 Starting bulk TTS inference...")
    print(f"Input: {table_path}")
    print(f"Output: {result_path}")
    
    
    # run the inference
    yt.run_map(
        bulk_tts_inference,
        table_path,
        result_path,
        job_count=1,
        spec={
            "pool_trees": ["hyperplane-gpu-h100"],
            "mapper": {
                "gpu_limit": 1,
                "memory_limit": 32 * 1024 * 1024 * 1024,  # 32GB in bytes
                "cpu_limit": 12,
            }
        }
    )
    
    print("✅ Bulk TTS inference completed!")
    
    # Display results
    display_results(result_path)
    
    return result_path

def display_results(result_path):
    """Display results from TTS inference"""
    print("\n📊 Results:")
    successful = 0
    failed = 0
    total_duration = 0.0
    total_generation_time = 0.0
    
    for record in yt.read_table(result_path):
        if record["success"]:
            successful += 1
            total_duration += record["duration_seconds"]
            total_generation_time += record["generation_time"]
            print(f"✅ {record['request_id']}: {record['duration_seconds']:.2f}s audio ({record['voice']})")
        else:
            failed += 1
            print(f"❌ {record['request_id']}: {record['error']}")
    
    print(f"\n📈 Summary:")
    print(f"   Successful: {successful}")
    print(f"   Failed: {failed}")
    print(f"   Total audio generated: {total_duration:.1f} seconds")
    print(f"   Total processing time: {total_generation_time:.1f} seconds")
    if total_generation_time > 0:
        print(f"   Average RTF: {total_duration/total_generation_time:.2f}x")




In [5]:
# Helper to save audio from results
def save_audio_from_results(result_path, output_dir=None):
    """Save audio files from TTS results"""
    if output_dir is None:
        output_dir = f"{working_dir}/tts_output"
    
    # Create directory in YT if it doesn't exist
    if not yt.exists(output_dir):
        yt.create("map_node", output_dir)
    
    print(f"\n💾 Saving audio files to {output_dir}...")
    
    saved_count = 0
    for record in yt.read_table(result_path):
        if record["success"] and record.get("audio_bytes"):
            # Decode audio
            audio_bytes = base64.b64decode(record["audio_bytes"])
            
            # Save to YT file
            filename = f"{record['request_id']}_{record['voice']}.flac"
            filepath = f"{output_dir}/{filename}"
            
            yt.write_file(filepath, audio_bytes)
            
            print(f"✅ Saved: {filename}")
            saved_count += 1
    
    if saved_count == 0:
        print("⚠️ No audio files to save")
    else:
        print(f"\n📁 Audio files saved in YT at: {output_dir}")
        print(f"   To download locally, use: yt read-file {output_dir}/{filename}> {filename}")

# Helper to download audio files from YT to local machine


In [6]:
if __name__ == "__main__":
    try:
        # Set HF token if available
        # os.environ["HF_TOKEN"] = "your_token_here"  # Uncomment and add your token
        
        # Run the inference
        result_path = run_bulk_tts_inference()
        
        # Save audio files locally
        save_audio_from_results(result_path,f"{working_dir}/audio")

        print(f"\n🎉 All done! Results saved to: {result_path}")
        
    except Exception as e:
        print(f"❌ Error: {str(e)}")
        print("\n💡 Troubleshooting tips:")
        print("1. Make sure you have access to the Orpheus model")
        print("2. Set your HuggingFace token: export HF_TOKEN=your_token")
        print("3. Check GPU availability in the pool")
        print("4. Consider using an open-source model like Bark instead")

Created 20 TTS requests in //home/hyperplane/3aca1b5d2ac1476db73e0a637404902a/tts_requests

Requests by voice:
  dan: 2 requests
  jess: 2 requests
  leah: 3 requests
  leo: 3 requests
  mia: 2 requests
  tara: 3 requests
  zac: 3 requests
  zoe: 2 requests
🚀 Starting bulk TTS inference...
Input: //home/hyperplane/3aca1b5d2ac1476db73e0a637404902a/tts_requests
Output: //home/hyperplane/3aca1b5d2ac1476db73e0a637404902a/results


2025-08-26 13:54:19,861	INFO	Operation started: https://hyperplane.trtr.ai/sls-man/operations/92fbd785-a2f4ba99-1d6803e8-32268da8/details


2025-08-26 13:54:19,914	INFO	( 0 min) operation 92fbd785-a2f4ba99-1d6803e8-32268da8 starting


2025-08-26 13:54:20,478	INFO	( 0 min) operation 92fbd785-a2f4ba99-1d6803e8-32268da8 initializing


2025-08-26 13:54:21,672	INFO	( 0 min) Unrecognized spec: {'enable_partitioned_data_balancing': false, 'mapper': {'title': 'bulk_tts_inference'}}


2025-08-26 13:54:21,724	INFO	( 0 min) operation 92fbd785-a2f4ba99-1d6803e8-32268da8: running=0     completed=0     pending=1     failed=0     aborted=0     lost=0     total=1     blocked=0    


2025-08-26 13:54:58,322	INFO	( 0 min) operation 92fbd785-a2f4ba99-1d6803e8-32268da8 aborted


KeyboardInterrupt: 

In [50]:
from IPython.display import Audio, display

def play_audio_from_yt(audio_path):
    """Play audio file directly from YT storage"""
    files = yt.list(audio_path)
    
    downloaded = 0
    for file_info in files:
        if file_info.endswith(".flac"):
            yt_path = f"{audio_path}/{file_info}"
            # Read audio data from YT
            stream  = yt.read_file(yt_path)
            audio_data = stream.read()  # Convert stream to bytes

            # Display using IPython Audio
            display(Audio(audio_data, rate=24000))
            print(f"🎵 Playing: {audio_path}")

play_audio_from_yt(f"{working_dir}/audio")

🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


🎵 Playing: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/audio


In [44]:
# Run the bulk TTS inference
if __name__ == "__main__":
    # Option 1: Use sample data
    result_path = run_bulk_tts_inference()
    
    # Option 2: Use CSV file (uncomment to use)
    # result_path = run_csv_tts_inference("your_tts_requests.csv")
    
    print(f"\n🎉 All done! Results saved to: {result_path}")
    print(f"Audio files saved to: {working_dir}/audio/")

Created 1 TTS requests in //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/tts_requests
🚀 Starting bulk TTS inference...
Input: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/tts_requests
Output: //home/hyperplane/2dc81c3ebfb647548bf928e091d9d253/results


2025-07-25 08:54:07,865	INFO	Operation started: https://hyperplane.trtr.ai/sls-man/operations/a4e7e73d-92c9fdb0-1d6803e8-e9093e4b/details


2025-07-25 08:54:07,923	INFO	( 0 min) operation a4e7e73d-92c9fdb0-1d6803e8-e9093e4b starting


2025-07-25 08:54:08,477	INFO	( 0 min) operation a4e7e73d-92c9fdb0-1d6803e8-e9093e4b initializing


2025-07-25 08:54:09,126	INFO	( 0 min) Unrecognized spec: {'enable_partitioned_data_balancing': false, 'mapper': {'title': 'bulk_tts_inference'}}


2025-07-25 08:54:11,710	INFO	( 0 min) operation a4e7e73d-92c9fdb0-1d6803e8-e9093e4b aborted


KeyboardInterrupt: 