# Quantization model "facebook/opt-350m" with load_in_4bit, float16, quant_type of "nf4" with BitsAndBytes, pushed model to HuggingFace and deployed Endpoint in runpod.

In [3]:
import requests
import time
import json
from typing import Generator, Dict, Any, List

# RunPod API configuration
RUNPOD_API_URL = "https://api.runpod.ai/v2/eqm3i8nntmabro/run"
RUNPOD_STREAM_URL = "https://api.runpod.ai/v2/eqm3i8nntmabro/stream"
API_KEY = ""  # Replace with your actual API key

# Prepare the prompt using an Alpaca-style format
alpaca_prompt = """Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{}

### Response:
"""

class RunPodStreamer:
    """Custom streamer class for RunPod API."""

    def __init__(self, job_id: str, api_key: str):
        self.job_id = job_id
        self.api_key = api_key
        self.headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {api_key}'
        }
        self.url = f"{RUNPOD_STREAM_URL}/{job_id}"
        self._stop = False
        self._seen_tokens = set()

    def _extract_tokens(self, data: Dict) -> List[str]:
        """Extract tokens from the RunPod response structure."""
        tokens = []

        try:
            # Navigate through the nested structure
            if 'stream' in data and data['stream']:
                for stream_item in data['stream']:
                    if 'output' in stream_item:
                        output = stream_item['output']
                        if 'choices' in output:
                            for choice in output['choices']:
                                if 'tokens' in choice:
                                    tokens.extend(choice['tokens'])
        except Exception as e:
            print(f"Error extracting tokens: {e}")

        return tokens

    def __iter__(self) -> Generator[str, None, None]:
        """Iterator that yields text chunks as they arrive from the RunPod stream."""
        all_tokens = []
        last_yielded_index = 0

        while not self._stop:
            try:
                response = requests.get(self.url, headers=self.headers)
                response.raise_for_status()
                result = response.json()

                status = result.get('status')

                if status == 'IN_PROGRESS':
                    # Extract tokens from the current response
                    new_tokens = self._extract_tokens(result)
                    if new_tokens:
                        # Only yield new tokens that haven't been yielded before
                        if len(new_tokens) > len(all_tokens):
                            # Yield only the new tokens
                            for token in new_tokens[len(all_tokens):]:
                                yield token
                                all_tokens.append(token)

                elif status == 'COMPLETED':
                    self._stop = True
                    break

                elif status == 'FAILED':
                    raise Exception(f"Job failed: {result.get('error', 'Unknown error')}")

                time.sleep(0.1)  # Small delay between polls

            except Exception as e:
                print(f"Streaming error: {e}")
                self._stop = True
                break

def send_runpod_request(prompt: str) -> str:
    """Sends a request to the RunPod API endpoint and returns the job ID."""
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {API_KEY}'
    }

    data = {
        "input": {
            "prompt": prompt,
            "max_new_tokens": 100,
            # Add other parameters as needed by your RunPod endpoint
            "temperature": 0.7,
            "top_p": 0.9,
        }
    }

    response = requests.post(RUNPOD_API_URL, headers=headers, json=data)
    response.raise_for_status()
    result = response.json()
    return result['id']

# Main execution
if __name__ == "__main__":
    # Prepare input text
    prompt_text = alpaca_prompt.format("What is the importance of using renewable energy?")

    # Initialize variables for time measurements
    start_time = time.time()
    token_times = []
    first_token_time = None
    model_output = ""

    try:
        # Send the request to RunPod
        print("Sending request to RunPod...")
        job_id = send_runpod_request(prompt_text)
        print(f"Job ID: {job_id}")

        # Create streamer
        streamer = RunPodStreamer(job_id, API_KEY)

        # Start streaming in main thread
        print("\nResponse:")
        for i, new_text in enumerate(streamer):
            model_output += new_text
            print(new_text, end='', flush=True)

            # Measure time for the first token
            if i == 0:
                first_token_time = time.time()

            # Measure time for each token/chunk
            token_times.append(time.time())

        # Calculate end-to-end latency
        end_time = time.time()
        end_to_end_latency = end_time - start_time

        # Calculate Time To First Token (TTFT)
        ttft = first_token_time - start_time if first_token_time else 0

        # Calculate Inter-Token Latency (ITL)
        itl = sum(x - y for x, y in zip(token_times[1:], token_times[:-1])) / (len(token_times) - 1) if len(token_times) > 1 else 0

        # Calculate Throughput
        # Count tokens - RunPod returns them as a list
        token_count = len(model_output.split()) if ' ' in model_output else len([t for t in model_output.split('\n') if t])
        throughput = token_count / end_to_end_latency if model_output else 0

        print("\n\nPerformance Metrics:")
        print(f"Time To First Token (TTFT): {ttft:.3f} seconds")
        print(f"Inter-token latency (ITL): {itl:.3f} seconds")
        print(f"End-to-end Latency: {end_to_end_latency:.3f} seconds")
        print(f"Throughput: {throughput:.3f} tokens/second")
        print(f"Total tokens: {token_count}")

    except Exception as e:
        print(f"Error: {e}")

Sending request to RunPod...
Job ID: b17fddbe-18e8-4dc4-8af2-6270c7f4460e-u1

Response:
What is accurate?
What can be reitled? What can be delegated? Do we need to have a single response?
What potential problems might arise? Why should I keep sending messages?
From Master Total Array subscribers

### Instruction:
minimal text messages is easy, straightforward methods
comparing simple and valid content
If I send a relevant a message and receive a response, I do not need to read/print the whole message. Note: If I send and receive in

Performance Metrics:
Time To First Token (TTFT): 1.393 seconds
Inter-token latency (ITL): 0.000 seconds
End-to-end Latency: 1.597 seconds
Throughput: 47.598 tokens/second
Total tokens: 76
