## **Nugen Intelligence**
<img src="https://nugen.in/logo.png" alt="Nugen Logo" width="200"/>

Domain-aligned foundational models at industry leading speeds and zero-data retention! To learn more, visit [Nugen](https://docs.nugen.in/introduction)


## **LLM Parallelization**

**Understanding LLM Parallelization**

**What is LLM Parallelization?**

LLM parallelization is like having multiple experts work on the same problem simultaneously rather than asking one expert sequentially. Instead of waiting for one language model to process and respond before moving to the next, we send requests to multiple models concurrently and then combine their insights.

**Why Use Parallelization?**

1. **Diversity of Responses**: Different models may approach problems differently, providing varied perspectives and insights.

2. **Improved Reliability**: By aggregating multiple responses, we can reduce the impact of any single model's biases or errors.

3. **Enhanced Performance**: Parallel processing can significantly reduce total response time compared to sequential processing.

## **Code Analysis: ParallelLLMProcessor**

**Class Overview**

The ParallelLLMProcessor class serves as a comprehensive solution for parallel LLM processing. Let's examine each component:

**Part 1: Core Concepts and Architecture**

Understanding Parallel Processing in LLMs

In traditional LLM processing, we send a query to one model and wait for its response. However, parallel processing allows us to:

1. Send the same query to multiple models simultaneously
2. Get different perspectives on the same problem
3. Combine these perspectives into a more comprehensive answer

Think of it like consulting multiple experts at once rather than asking them one at a time. This approach not only saves time but often leads to more well-rounded answers.

**Import and Installations**

In [15]:
import asyncio
import aiohttp
import requests
from typing import List, Tuple, Optional

## **Part 1: Key Components and Their Functions**

Response Cleaning
The response cleaning mechanism is like having an editor who polishes raw text:

In [17]:
def clean_response(response: str) -> str:
    """
    Clean and normalize model responses by removing repetition and keeping essential content.
    
    Args:
        response: Raw text response from the language model
        
    Returns:
        Cleaned and normalized response string
    """
    # Split into lines and remove empty ones
    lines = [line.strip() for line in response.split('\n') if line.strip()]
    
    # Remove duplicates while preserving order
    seen = set()
    unique_lines = []
    for line in lines:
        # Skip questions and non-meaningful content
        if (line not in seen and 
            not line.endswith('?') and 
            not line.startswith('What') and
            not line.startswith('How')):
            seen.add(line)
            unique_lines.append(line)
    
    # Take only the first few meaningful lines
    relevant_lines = unique_lines[:3]
    
    return ' '.join(relevant_lines)

**This function performs several important cleanup tasks:**

- Removes empty lines and extra spaces
- Eliminates duplicate content
- Filters out questions and redundant text
- Keeps only the most relevant information

Think of it as a newspaper editor who takes a rough draft and turns it into a concise, clear article.

**Payload Creation**

The payload creation process is like preparing a standardized questionnaire:

In [18]:
def create_payload(prompt: str, model: str, system_prompt: Optional[str] = None) -> dict:
    """
    Create the API request payload with appropriate parameters.
    
    Args:
        prompt: The user's input prompt
        model: Name of the model to use
        system_prompt: Optional system-level instructions
        
    Returns:
        Dictionary containing the formatted API request payload
    """
    full_prompt = f"{system_prompt}\n\n{prompt}" if system_prompt else prompt
    return {
        "max_tokens": 1000,
        "model": model,
        "prompt": full_prompt,
        "temperature": 0.1,
        "stop": ["\n\n", "###"]
    }

### **Part 2: The Parallel Workflow**

**Asynchronous LLM Calls**

The async functionality is like having multiple phone lines to talk to different experts simultaneously:


In [19]:
async def run_llm_parallel(api_key: str,
                          user_prompt: str,
                          model: str,
                          system_prompt: Optional[str] = None) -> str:
    """
    Make an asynchronous call to the Nugen API with retry logic and response handling.
    
    Args:
        api_key: Authentication key for the API
        user_prompt: The prompt to send to the model
        model: Name of the model to use
        system_prompt: Optional system-level instructions
        
    Returns:
        Cleaned response from the model
    """
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    payload = create_payload(user_prompt, model, system_prompt)
    
    for retry_attempt in range(3):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    "https://api.nugen.in/inference/completions",
                    json=payload,
                    headers=headers
                ) as response:
                    # Handle rate limiting
                    if response.status == 429:
                        wait_time = 2 ** retry_attempt
                        await asyncio.sleep(wait_time)
                        continue
                        
                    response.raise_for_status()
                    result = await response.json()
                    raw_response = result["choices"][0]["text"]
                    
                    return clean_response(raw_response)
                    
        except aiohttp.ClientError as e:
            if retry_attempt == 2:
                print(f"Failed after 3 attempts: {e}")
                raise
            await asyncio.sleep(2 ** retry_attempt)


This function:

- Makes non-blocking API calls to the models
- Handles retries if a call fails
- Implements exponential backoff for rate limits
- Processes and cleans the responses

Think of it as having an efficient assistant who can make multiple calls at once and handle any communication issues that arise.

This process works like a panel discussion:

- Multiple experts (proposer models) receive the same question
- They provide their answers independently and simultaneously
- A moderator (aggregator model) synthesizes their responses
- The final answer combines the best insights from all participants

**Parallel Workflow**

The function acts like a conductor in an orchestra, coordinating multiple AI models to work together on a single task. It first gathers individual perspectives (proposer models) and then combines them into a harmonious final response (aggregator model).

In [28]:
async def parallel_workflow(api_key: str,
                          prompt: str,
                          proposer_models: List[str],
                          aggregator_model: str,
                          aggregator_prompt: str) -> Tuple[str, List[str]]:
    """
    Orchestrate parallel LLM calls and aggregate their responses.
    
    Args:
        api_key: Authentication key for the API
        prompt: The main prompt to process
        proposer_models: List of models to generate initial responses
        aggregator_model: Model to use for combining responses
        aggregator_prompt: Instructions for aggregating responses
        
    Returns:
        Tuple containing (final aggregated response, list of individual responses)
    """
    # Add instructions to prevent repetition
    enhanced_prompt = f"{prompt}\nProvide a clear, concise answer without repetition."
    
    # Get responses from all proposer models in parallel
    proposed_responses = await asyncio.gather(
        *[run_llm_parallel(api_key, enhanced_prompt, model) 
          for model in proposer_models]
    )
    
    # Create enhanced aggregator prompt
    enhanced_aggregator_prompt = f"""
    {aggregator_prompt}
    Important: Provide a single, clear answer without repeating the question.
    Synthesize these responses into one coherent answer:
    """
    
    # Format the intermediate responses
    formatted_responses = "\n".join(
        f"{i+1}. {response}" 
        for i, response in enumerate(proposed_responses)
    )
    
    # Get final aggregated response
    final_output = await run_llm_parallel(
        api_key=api_key,
        user_prompt=prompt,
        model=aggregator_model,
        system_prompt=f"{enhanced_aggregator_prompt}\n{formatted_responses}"
    )
    
    return final_output, proposed_responses


**Purpose: Orchestrates parallel LLM API calls and aggregates multiple model responses into one final output
Core Components:**

1. Enhances original prompt to prevent repetition
2. Runs multiple models in parallel using asyncio.gather
3. Aggregates responses using a designated aggregator model


**Flow:**

1. Creates enhanced prompt with anti-repetition instruction
2. Gets parallel responses from proposer models
3. Formats intermediate responses numerically
4. Generates final aggregated output using aggregator model


**Key Features:**

1. Asynchronous execution for improved performance
2. Structured response formatting
3. Two-stage processing (propose → aggregate)
4. Returns both final and individual responses

### **Part 3: Practical Usage and Examples**

Here's how to initialize and use the system:

In [27]:
async def main():
    api_key = "nugen-CnStpNdbBczk3d8SZMhmnw"
    user_prompt = """Jenna and her mother picked some apples from their apple farm.
    Jenna picked half as many apples as her mom. If her mom got 20 apples, how many apples did they both pick?"""
    
    reference_models = [
        "nugen-flash-instruct",  # Replace with actual Nugen model names
        "llama-v3p2-3b-instruct",
        "llama-v3p1-8b-instruct",
        "llama-v3p1-405b-instruct"
    ]
    
    answer, intermediate_responses = await parallel_workflow(
        api_key=api_key,
        prompt=user_prompt,
        proposer_models=reference_models,
        aggregator_model="nugen-flash-instruct",
        aggregator_prompt="Synthesize these responses into a single clear answer."
    )
    
    # Print cleaned results
    for i, response in enumerate(intermediate_responses):
        print(f"Intermediate Response {i+1}:\n{response}\n")
    print(f"Final Answer: {answer}")

if __name__ == "__main__":
    await main()

Intermediate Response 1:
## Step 1: Determine the number of apples Jenna picked. Jenna picked half as many apples as her mom. Since her mom picked 20 apples, Jenna picked 20 / 2 = 10 apples. ## Step 2: Calculate the total number of apples they both picked.

Intermediate Response 2:
Jenna picked 10 apples. Her mom picked 20 apples. Together they picked 30 apples. The answer is 30.

Intermediate Response 3:
## Step 1: Determine the number of apples Jenna's mom picked. Jenna's mom picked 20 apples. ## Step 2: Calculate the number of apples Jenna picked.

Intermediate Response 4:
Use proper grammar and spelling. ## Step 1: Calculate the number of apples Jenna picked. Jenna picked half as many apples as her mom. Since her mom picked 20 apples, Jenna picked 20 / 2 = 10 apples.

Final Answer: The answer is 30. The final answer is: $\boxed{30}$
