In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.3,org.apache.kafka:kafka-clients:3.5.1") \
    .getOrCreate()

# Read the stream from Kafka
kafka_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dcgm-metrics") \
    .load()

# Convert the Kafka message to string
processed_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# Output the processed dataframe to the console
query = processed_df.writeStream.outputMode("append").format("console").start()

# Wait for the termination of the streaming query
query.awaitTermination()

In [3]:
from flask import Flask, Response
import secrets
import time
import math
import multiprocessing
from multiprocessing import Process, Queue
from gunicorn.app.base import BaseApplication
import os

# Configuration
NUM_NODES = 8  # Increased to 5096
GPUS_PER_NODE = 4
GPU_MODEL = 'Tesla V100-SXM2-16GB'
GPU_SPECS = {
    'sm_clock': (1230, 1380),
    'mem_clock': 877,
    'power': (300, 350),
    'memory': 16384
}

class GPUSimulator:
    def __init__(self, node_id, gpu_id):
        self.node_id = node_id
        self.gpu_id = gpu_id
        self.uuid = f"GPU-{secrets.token_hex(16)}"
        self.pci_bus_id = f"{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}.0"
        self.driver_version = "450.51.06"
        self.start_time = time.time()
        self.workload_phase = secrets.randbelow(600) / 600

    def simulate_metrics(self):
        elapsed_time = time.time() - self.start_time
        self.workload_phase = ((elapsed_time / 60 + self.workload_phase) % 10) / 10
        workload = (math.sin(self.workload_phase * 2 * math.pi) + 1) / 2
        workload = min(1, max(0, workload + (secrets.randbelow(20) - 10) / 100))
        sm_clock = int(GPU_SPECS['sm_clock'][0] + (GPU_SPECS['sm_clock'][1] - GPU_SPECS['sm_clock'][0]) * workload)
        gpu_temp = 30 + int(55 * workload)
        power_usage = GPU_SPECS['power'][0] + (GPU_SPECS['power'][1] - GPU_SPECS['power'][0]) * workload
        gpu_util = int(100 * workload)
        mem_util = int(90 * workload)
        fb_used = GPU_SPECS['memory'] * mem_util / 100
        fb_free = GPU_SPECS['memory'] - fb_used
        return [
            ('DCGM_FI_DEV_SM_CLOCK', sm_clock, 'SM clock frequency (in MHz)'),
            ('DCGM_FI_DEV_GPU_TEMP', gpu_temp, 'GPU temperature (in C)'),
            ('DCGM_FI_DEV_POWER_USAGE', round(power_usage, 2), 'Power draw (in W)'),
            ('DCGM_FI_DEV_GPU_UTIL', gpu_util, 'GPU utilization (in %)'),
            ('DCGM_FI_DEV_MEM_COPY_UTIL', mem_util, 'Memory utilization (in %)'),
            ('DCGM_FI_DEV_FB_FREE', round(fb_free, 2), 'Frame buffer memory free (in MB)'),
            ('DCGM_FI_DEV_FB_USED', round(fb_used, 2), 'Frame buffer memory used (in MB)')
        ]

def generate_gpu_metrics(node_id, gpu_simulator):
    metrics = gpu_simulator.simulate_metrics()
    formatted_metrics = []
    for name, value, help_text in metrics:
        formatted_metrics.extend([
            f'# HELP {name} {help_text}',
            f'# TYPE {name} gauge',
            f'{name}{{node="{node_id}",gpu="{gpu_simulator.gpu_id}",UUID="{gpu_simulator.uuid}",'
            f'pci_bus_id="{gpu_simulator.pci_bus_id}",device="nvidia{gpu_simulator.gpu_id}",'
            f'modelName="{GPU_MODEL}",Hostname="node{node_id:04d}",'
            f'DCGM_FI_DRIVER_VERSION="{gpu_simulator.driver_version}"}} {value}'
        ])
    return '\n'.join(formatted_metrics)

def get_gpu_metrics(node_id):
    gpu_simulators = [GPUSimulator(node_id, gpu_id) for gpu_id in range(GPUS_PER_NODE)]
    all_metrics = []
    for gpu_simulator in gpu_simulators:
        all_metrics.append(generate_gpu_metrics(node_id, gpu_simulator))
    return '\n'.join(all_metrics)

def worker(node_range, queue):
    app = Flask(__name__)

    @app.route('/metrics/<int:node_id>', methods=['GET'])
    def metrics(node_id):
        if node_id in node_range:
            metrics_data = get_gpu_metrics(node_id)
            return Response(metrics_data, mimetype='text/plain')
        return "Node not found", 404

    class FlaskApplication(BaseApplication):
        def __init__(self, app, options=None):
            self.application = app
            self.options = options or {}
            super().__init__()

        def load_config(self):
            for key, value in self.options.items():
                if key in self.cfg.settings and value is not None:
                    self.cfg.set(key, value)

        def load(self):
            return self.application

    options = {
        'bind': f'0.0.0.0:{queue.get()}',
        'workers': 4,
    }
    FlaskApplication(app, options).run()

def main():
    num_processes = min(multiprocessing.cpu_count(), 16)  # Limit to 16 processes
    nodes_per_process = NUM_NODES // num_processes
    remaining_nodes = NUM_NODES % num_processes

    processes = []
    queue = Queue()

    print(f"Starting simulation for {NUM_NODES} nodes using {num_processes} processes...")

    for i in range(num_processes):
        start_node = i * nodes_per_process
        end_node = start_node + nodes_per_process
        if i == num_processes - 1:
            end_node += remaining_nodes
        node_range = range(start_node, end_node)
        port = 51000 + i
        queue.put(port)
        p = Process(target=worker, args=(node_range, queue))
        processes.append(p)
        p.start()

    try:
        for p in processes:
            p.join()
    except KeyboardInterrupt:
        print("Simulation stopped.")
        for p in processes:
            p.terminate()

if __name__ == '__main__':
    main()

Starting simulation for 8 nodes using 10 processes...


[2024-10-23 14:44:05 +0530] [31989] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [31990] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [31989] [INFO] Listening at: http://0.0.0.0:51000 (31989)
[2024-10-23 14:44:05 +0530] [31991] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [31989] [INFO] Using worker: sync
[2024-10-23 14:44:05 +0530] [31994] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [32005] [INFO] Booting worker with pid: 32005
[2024-10-23 14:44:05 +0530] [31990] [INFO] Listening at: http://0.0.0.0:51001 (31990)
[2024-10-23 14:44:05 +0530] [31999] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [31990] [INFO] Using worker: sync
[2024-10-23 14:44:05 +0530] [32004] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [32010] [INFO] Starting gunicorn 23.0.0
[2024-10-23 14:44:05 +0530] [31994] [INFO] Listening at: http://0.0.0.0:51003 (31994)
[2024-10-23 14:44:05 +0530] [32020] [INFO] Booting worker with pid: 

Simulation stopped.


[2024-10-23 14:45:01 +0530] [31999] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [32030] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [32023] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [31989] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [31994] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [32004] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [32017] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [31990] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [31991] [INFO] Handling signal: term
[2024-10-23 14:45:01 +0530] [32010] [INFO] Handling signal: term
[2024-10-23 14:45:02 +0530] [32164] [INFO] Worker exiting (pid: 32164)
[2024-10-23 14:45:02 +0530] [32155] [INFO] Worker exiting (pid: 32155)
[2024-10-23 14:45:02 +0530] [32134] [INFO] Worker exiting (pid: 32134)
[2024-10-23 14:45:02 +0530] [32084] [INFO] Worker exiting (pid: 32084)
[2024-10-23 14:45:02 +0530] [32176] [INFO] Worker exiting (pid: 32

In [7]:
p = multiprocessing.Process(target=run_simulator, args=(node_range, port))
        simulator_processes.append(p)
        p.start()

RuntimeError: This event loop is already running

In [9]:
import asyncio
import json
import logging
import aiohttp
import uuid
import time
import secrets
import math
import multiprocessing
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from datetime import datetime, timedelta
from flask import Flask, Response
from gunicorn.app.base import BaseApplication

# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['10.180.8.24:9092', '10.180.8.24:9093', '10.180.8.24:9094']
DCGM_KAFKA_TOPIC = 'dcgm-metrics-test'
FETCH_INTERVAL = 1  # Seconds
RETRIES = 3
RETRY_DELAY = 5  # Seconds
MAX_NODES = 5096
GPUS_PER_NODE = 4
GPU_MODEL = 'Tesla V100-SXM2-16GB'
GPU_SPECS = {
    'sm_clock': (1230, 1380),
    'mem_clock': 877,
    'power': (300, 350),
    'memory': 16384
}
BENCHMARK_DURATION = 600  # 10 minutes in seconds

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class GPUSimulator:
    def __init__(self, node_id, gpu_id):
        self.node_id = node_id
        self.gpu_id = gpu_id
        self.uuid = f"GPU-{secrets.token_hex(16)}"
        self.pci_bus_id = f"{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}.0"
        self.driver_version = "450.51.06"
        self.start_time = time.time()
        self.workload_phase = secrets.randbelow(600) / 600

    def simulate_metrics(self):
        elapsed_time = time.time() - self.start_time
        self.workload_phase = ((elapsed_time / 60 + self.workload_phase) % 10) / 10
        workload = (math.sin(self.workload_phase * 2 * math.pi) + 1) / 2
        workload = min(1, max(0, workload + (secrets.randbelow(20) - 10) / 100))
        sm_clock = int(GPU_SPECS['sm_clock'][0] + (GPU_SPECS['sm_clock'][1] - GPU_SPECS['sm_clock'][0]) * workload)
        gpu_temp = 30 + int(55 * workload)
        power_usage = GPU_SPECS['power'][0] + (GPU_SPECS['power'][1] - GPU_SPECS['power'][0]) * workload
        gpu_util = int(100 * workload)
        mem_util = int(90 * workload)
        fb_used = GPU_SPECS['memory'] * mem_util / 100
        fb_free = GPU_SPECS['memory'] - fb_used
        return [
            ('DCGM_FI_DEV_SM_CLOCK', sm_clock, 'SM clock frequency (in MHz)'),
            ('DCGM_FI_DEV_GPU_TEMP', gpu_temp, 'GPU temperature (in C)'),
            ('DCGM_FI_DEV_POWER_USAGE', round(power_usage, 2), 'Power draw (in W)'),
            ('DCGM_FI_DEV_GPU_UTIL', gpu_util, 'GPU utilization (in %)'),
            ('DCGM_FI_DEV_MEM_COPY_UTIL', mem_util, 'Memory utilization (in %)'),
            ('DCGM_FI_DEV_FB_FREE', round(fb_free, 2), 'Frame buffer memory free (in MB)'),
            ('DCGM_FI_DEV_FB_USED', round(fb_used, 2), 'Frame buffer memory used (in MB)')
        ]

def generate_gpu_metrics(node_id, gpu_simulator):
    metrics = gpu_simulator.simulate_metrics()
    formatted_metrics = []
    for name, value, help_text in metrics:
        formatted_metrics.extend([
            f'# HELP {name} {help_text}',
            f'# TYPE {name} gauge',
            f'{name}{{node="{node_id}",gpu="{gpu_simulator.gpu_id}",UUID="{gpu_simulator.uuid}",'
            f'pci_bus_id="{gpu_simulator.pci_bus_id}",device="nvidia{gpu_simulator.gpu_id}",'
            f'modelName="{GPU_MODEL}",Hostname="node{node_id:04d}",'
            f'DCGM_FI_DRIVER_VERSION="{gpu_simulator.driver_version}"}} {value}'
        ])
    return '\n'.join(formatted_metrics)

def get_gpu_metrics(node_id):
    gpu_simulators = [GPUSimulator(node_id, gpu_id) for gpu_id in range(GPUS_PER_NODE)]
    all_metrics = []
    for gpu_simulator in gpu_simulators:
        all_metrics.append(generate_gpu_metrics(node_id, gpu_simulator))
    return '\n'.join(all_metrics)

def create_flask_app(node_range):
    app = Flask(__name__)

    @app.route('/metrics/<int:node_id>', methods=['GET'])
    def metrics(node_id):
        if node_id in node_range:
            metrics_data = get_gpu_metrics(node_id)
            return Response(metrics_data, mimetype='text/plain')
        return "Node not found", 404

    return app

class GunicornApp(BaseApplication):
    def __init__(self, app, options=None):
        self.application = app
        self.options = options or {}
        super().__init__()

    def load_config(self):
        for key, value in self.options.items():
            if key in self.cfg.settings and value is not None:
                self.cfg.set(key, value)

    def load(self):
        return self.application

def run_simulator(node_range, port):
    app = create_flask_app(node_range)
    options = {
        'bind': f'127.0.0.1:{port}',
        'workers': 4,
    }
    GunicornApp(app, options).run()

# Kafka Producer Functions
async def create_producer():
    try:
        producer = AIOKafkaProducer(
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k is not None else None
        )
        await producer.start()
        return producer
    except Exception as e:
        logger.error(f"Failed to create Kafka producer: {e}")
        raise

async def shutdown_producer(producer):
    logger.info("Shutting down producer...")
    await producer.stop()
    logger.info("Producer shutdown complete.")

def parse_dcgm_metrics(metrics):
    lines = metrics.strip().split('\n')
    parsed_metrics = []
    for line in lines:
        if line.startswith("#"):
            continue
        try:
            metric_name, rest = line.split('{', 1)
            labels, value = rest.rsplit('}', 1)
            value = value.strip()
            metric_name = metric_name.strip()

            label_dict = {}
            for label in labels.split(','):
                key, val = label.split('=')
                label_dict[key.strip()] = val.strip('"')

            key = f"{metric_name}_gpu{label_dict.get('gpu', 'unknown')}"

            message = {
                "metric_name": metric_name,
                "value": float(value),
                "timestamp": datetime.now().isoformat(),
                "UUID": str(uuid.uuid4()),
                "gpu": label_dict.get('gpu', 'unknown'),
                "pci_bus_id": label_dict.get('pci_bus_id', 'unknown'),
                "device": label_dict.get('device', 'unknown'),
                "modelName": label_dict.get('modelName', 'unknown'),
                "Hostname": label_dict.get('Hostname', 'unknown'),
                "DCGM_FI_DRIVER_VERSION": label_dict.get('DCGM_FI_DRIVER_VERSION', 'unknown'),
                "err_code": "N/A",
                "err_msg": "N/A"
            }

            parsed_metrics.append((key, message))
        except ValueError:
            logger.error(f"Failed to parse DCGM line: {line}")
    return parsed_metrics

async def fetch_metrics(session, url):
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.text()

async def fetch_and_send_metrics(producer, session, node_range, port, end_time):
    messages_sent = 0
    while datetime.now() < end_time:
        for node_id in node_range:
            url = f"http://127.0.0.1:{port}/metrics/{node_id}"
            try:
                metrics = await fetch_metrics(session, url)
                parsed_metrics = parse_dcgm_metrics(metrics)
                for attempt in range(RETRIES):
                    try:
                        for key, message in parsed_metrics:
                            await producer.send_and_wait(DCGM_KAFKA_TOPIC, key=key, value=message)
                            messages_sent += 1
                        break
                    except Exception as e:
                        logger.warning(f"Failed to send metrics from node {node_id}, retrying... Attempt {attempt+1}/{RETRIES}")
                        await asyncio.sleep(RETRY_DELAY)
                else:
                    logger.error(f"Failed to send metrics from node {node_id} after multiple attempts.")
            except aiohttp.ClientError as e:
                logger.error(f"Error fetching metrics from node {node_id}: {e}")
            except Exception as e:
                logger.error(f"Unexpected error processing node {node_id}: {e}")
        await asyncio.sleep(FETCH_INTERVAL)
    return messages_sent

# Kafka Consumer Functions
async def create_consumer():
    consumer = AIOKafkaConsumer(
        DCGM_KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id='benchmark-consumer-group',
        auto_offset_reset='earliest'
    )
    await consumer.start()
    return consumer

async def shutdown_consumer(consumer):
    logger.info("Shutting down consumer...")
    await consumer.stop()
    logger.info("Consumer shutdown complete.")

async def consume_metrics(consumer, end_time):
    metrics = {
        'messages_received': 0,
        'bytes_received': 0,
        'processing_time': timedelta(0),
        'errors': 0
    }
    start_time = datetime.now()
    last_log_time = start_time

    try:
        async for msg in consumer:
            if datetime.now() >= end_time:
                break
            metrics['messages_received'] += 1
            metrics['bytes_received'] += len(msg.value)
            try:
                process_start = time.monotonic()
                message_value = json.loads(msg.value.decode('utf-8'))
                # Process message_value here if needed
                process_end = time.monotonic()
                metrics['processing_time'] += timedelta(seconds=process_end - process_start)
            except json.JSONDecodeError:
                metrics['errors'] += 1
                logger.error(f"Failed to decode message: {msg.value}")
            
            # Periodic Logging (every minute)
            now = datetime.now()
            if (now - last_log_time).total_seconds() >= 60:
                log_metrics(metrics, start_time, now)
                last_log_time = now
    except Exception as e:
        logger.error(f"An error occurred during consumption: {e}")
        metrics['errors'] += 1
    
    return metrics

def log_metrics(metrics, start_time, current_time):
    elapsed_time = current_time - start_time
    throughput = metrics['messages_received'] / elapsed_time.total_seconds() if elapsed_time.total_seconds() > 0 else 0
    logger.info(f"[{current_time}] Messages Received: {metrics['messages_received']}, "
                f"Throughput: {throughput:.2f} msg/s, Errors: {metrics['errors']}")

def log_summary(metrics, start_time, end_time):
    elapsed_time = end_time - start_time
    throughput = metrics['messages_received'] / elapsed_time.total_seconds() if elapsed_time.total_seconds() > 0 else 0
    avg_processing_time = metrics['processing_time'].total_seconds() / metrics['messages_received'] if metrics['messages_received'] > 0 else 0
    logger.info("\n--- Summary ---")
    logger.info(f"Start Time: {start_time}")
    logger.info(f"End Time: {end_time}")
    logger.info(f"Elapsed Time: {elapsed_time}")
    logger.info(f"Messages Received: {metrics['messages_received']}")
    logger.info(f"Bytes Received: {metrics['bytes_received']}")
    logger.info(f"Throughput: {throughput:.2f} msg/s")
    logger.info(f"Average Processing Time: {avg_processing_time:.4f} s/msg")
    logger.info(f"Errors: {metrics['errors']}")

async def run_benchmark(num_nodes):
    num_processes = min(multiprocessing.cpu_count(), 16)
    nodes_per_process = num_nodes // num_processes
    remaining_nodes = num_nodes % num_processes

    # Start GPU simulators
    simulator_processes = []
    for i in range(num_processes):
        start_node = i * nodes_per_process
        end_node = start_node + nodes_per_process + (remaining_nodes if i == num_processes - 1 else 0)
        node_range = range(start_node, end_node)
        port = 51000 + i
        p = multiprocessing.Process(target=run_simulator, args=(node_range, port))
        simulator_processes.append(p)
        p.start()

    # Wait for simulators to start
    await asyncio.sleep(5)

    # Start Kafka producer
    producer = await create_producer()
    consumer = await create_consumer()
    
    end_time = datetime.now() + timedelta(seconds=BENCHMARK_DURATION)
    
    # Start consumer task
    consumer_task = asyncio.create_task(consume_metrics(consumer, end_time))

    # Start producer tasks
    producer_tasks = []
    async with aiohttp.ClientSession() as session:
        for i in range(num_processes):
            start_node = i * nodes_per_process
            end_node = start_node + nodes_per_process + (remaining_nodes if i == num_processes - 1 else 0)
            node_range = range(start_node, end_node)
            port = 51000 + i
            task = asyncio.create_task(fetch_and_send_metrics(producer, session, node_range, port, end_time))
            producer_tasks.append(task)
        
        # Wait for producer tasks to complete
        await asyncio.gather(*producer_tasks)

    # Wait for consumer task to complete and get metrics
    consumer_metrics = await consumer_task

    # Shutdown Kafka producer and consumer
    await shutdown_producer(producer)
    await shutdown_consumer(consumer)

    # Terminate simulator processes
    for p in simulator_processes:
        p.terminate()
        p.join()

    # Log summary
    log_summary(consumer_metrics, end_time - timedelta(seconds=BENCHMARK_DURATION), end_time)

    return consumer_metrics['messages_received'] / BENCHMARK_DURATION

async def main():
    node_counts = [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 5096]
    results = {}

    for num_nodes in node_counts:
        logger.info(f"Running benchmark for {num_nodes} nodes...")
        messages_per_second = await run_benchmark(num_nodes)
        results[num_nodes] = messages_per_second
        logger.info(f"Benchmark result for {num_nodes} nodes: {messages_per_second:.2f} messages/second")

    logger.info("Final Benchmark Results:")
    for num_nodes, messages_per_second in results.items():
        logger.info(f"{num_nodes} nodes: {messages_per_second:.2f} messages/second")

if __name__ == "__main__":
    asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [1]:
# optimized_gpu_simulator.py
import asyncio
import json
import logging
import aiohttp
import uuid
import time
import secrets
import math
import multiprocessing
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from datetime import datetime, timedelta
from flask import Flask, Response
from gunicorn.app.base import BaseApplication
from dataclasses import dataclass
from typing import List, Dict, Tuple
import concurrent.futures
import numpy as np

# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['10.180.8.24:9092', '10.180.8.24:9093', '10.180.8.24:9094']
DCGM_KAFKA_TOPIC = 'dcgm-metrics-test'
FETCH_INTERVAL = 1  # Seconds
BATCH_SIZE = 100  # Number of nodes to process in one batch
MAX_CONCURRENT_REQUESTS = 50
CONNECTION_TIMEOUT = 30
REQUEST_TIMEOUT = 10

@dataclass
class GPUSpecs:
    sm_clock: Tuple[int, int] = (1230, 1380)
    mem_clock: int = 877
    power: Tuple[int, int] = (300, 350)
    memory: int = 16384
    model: str = 'Tesla V100-SXM2-16GB'

class GPUSimulator:
    def __init__(self, node_id: int, gpu_id: int):
        self.node_id = node_id
        self.gpu_id = gpu_id
        self.uuid = f"GPU-{secrets.token_hex(16)}"
        self.pci_bus_id = f"{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}.0"
        self.driver_version = "450.51.06"
        self.specs = GPUSpecs()
        # Use numpy for faster calculations
        self.workload_phase = np.random.random()
        self.noise = np.random.normal(0, 0.05, size=1000)  # Pre-generate noise
        self.noise_idx = 0

    def get_noise(self) -> float:
        self.noise_idx = (self.noise_idx + 1) % len(self.noise)
        return self.noise[self.noise_idx]

    def simulate_metrics(self) -> List[Tuple[str, float, str]]:
        self.workload_phase = (self.workload_phase + 0.1) % 1.0
        workload = (np.sin(self.workload_phase * 2 * np.pi) + 1) / 2
        workload = np.clip(workload + self.get_noise(), 0, 1)

        sm_clock = int(self.specs.sm_clock[0] + (self.specs.sm_clock[1] - self.specs.sm_clock[0]) * workload)
        gpu_temp = 30 + int(55 * workload)
        power_usage = self.specs.power[0] + (self.specs.power[1] - self.specs.power[0]) * workload
        gpu_util = int(100 * workload)
        mem_util = int(90 * workload)
        fb_used = self.specs.memory * mem_util / 100
        fb_free = self.specs.memory - fb_used

        return [
            ('DCGM_FI_DEV_SM_CLOCK', sm_clock, 'SM clock frequency (in MHz)'),
            ('DCGM_FI_DEV_GPU_TEMP', gpu_temp, 'GPU temperature (in C)'),
            ('DCGM_FI_DEV_POWER_USAGE', round(power_usage, 2), 'Power draw (in W)'),
            ('DCGM_FI_DEV_GPU_UTIL', gpu_util, 'GPU utilization (in %)'),
            ('DCGM_FI_DEV_MEM_COPY_UTIL', mem_util, 'Memory utilization (in %)'),
            ('DCGM_FI_DEV_FB_FREE', round(fb_free, 2), 'Frame buffer memory free (in MB)'),
            ('DCGM_FI_DEV_FB_USED', round(fb_used, 2), 'Frame buffer memory used (in MB)')
        ]

class MetricsBatchProcessor:
    def __init__(self, producer, batch_size=BATCH_SIZE):
        self.producer = producer
        self.batch_size = batch_size
        self.batch = []

    async def add_metrics(self, key: str, value: Dict):
        self.batch.append((key, value))
        if len(self.batch) >= self.batch_size:
            await self.flush()

    async def flush(self):
        if not self.batch:
            return
        
        try:
            # Use gather with return_exceptions=True to handle partial failures
            tasks = [
                self.producer.send_and_wait(DCGM_KAFKA_TOPIC, key=key, value=value)
                for key, value in self.batch
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Handle any exceptions
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    logging.error(f"Failed to send message {i}: {result}")
                    
        except Exception as e:
            logging.error(f"Batch processing error: {e}")
        finally:
            self.batch.clear()

async def benchmark_worker(worker_id: int, num_nodes: int, port: int) -> Dict:
    metrics = {
        'messages_processed': 0,
        'errors': 0,
        'processing_time': 0
    }

    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT)) as session:
        start_node = worker_id * (num_nodes // multiprocessing.cpu_count())
        end_node = start_node + (num_nodes // multiprocessing.cpu_count())
        
        for node_id in range(start_node, end_node):
            try:
                url = f"http://127.0.0.1:{port}/metrics/{node_id}"
                async with session.get(url) as response:
                    if response.status == 200:
                        metrics['messages_processed'] += 1
                    else:
                        metrics['errors'] += 1
            except Exception as e:
                metrics['errors'] += 1
                logging.error(f"Worker {worker_id} error processing node {node_id}: {e}")

    return metrics

async def run_benchmark(num_nodes: int, duration: int = 60) -> Dict:
    num_workers = min(multiprocessing.cpu_count(), 16)
    ports = list(range(51000, 51000 + num_workers))
    
    # Start simulators
    simulator_processes = []
    for i, port in enumerate(ports):
        node_range = range(i * (num_nodes // num_workers), (i + 1) * (num_nodes // num_workers))
        p = multiprocessing.Process(target=run_simulator, args=(node_range, port))
        simulator_processes.append(p)
        p.start()

    await asyncio.sleep(5)  # Wait for simulators to start

    # Run benchmark
    tasks = [
        benchmark_worker(i, num_nodes, port)
        for i, port in enumerate(ports)
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Cleanup
    for p in simulator_processes:
        p.terminate()
        p.join()

    # Aggregate results
    total_metrics = {
        'messages_processed': sum(r['messages_processed'] for r in results),
        'errors': sum(r['errors'] for r in results),
        'processing_time': sum(r['processing_time'] for r in results)
    }

    return total_metrics

In [6]:
import asyncio
import json
import logging
import aiohttp
import uuid
import time
import secrets
import math
import multiprocessing
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from datetime import datetime, timedelta
from flask import Flask, Response
from gunicorn.app.base import BaseApplication
import os

# Configuration
KAFKA_BOOTSTRAP_SERVERS = ['10.180.8.24:9092', '10.180.8.24:9093', '10.180.8.24:9094']
DCGM_KAFKA_TOPIC = 'dcgm-metrics-test'
FETCH_INTERVAL = 1  # Seconds
RETRIES = 3
RETRY_DELAY = 5  # Seconds
MAX_NODES = 8
GPUS_PER_NODE = 4
GPU_MODEL = 'Tesla V100-SXM2-16GB'
GPU_SPECS = {
    'sm_clock': (1230, 1380),
    'mem_clock': 877,
    'power': (300, 350),
    'memory': 16384
}
BENCHMARK_DURATION = 600  # 10 minutes in seconds
LOG_FILE = "benchmark_results.log"

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Setup file handler for logging results to file
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)

class GPUSimulator:
    def __init__(self, node_id, gpu_id):
        self.node_id = node_id
        self.gpu_id = gpu_id
        self.uuid = f"GPU-{secrets.token_hex(16)}"
        self.pci_bus_id = f"{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}:{secrets.randbelow(256):02X}.0"
        self.driver_version = "450.51.06"
        self.start_time = time.time()
        self.workload_phase = secrets.randbelow(600) / 600

    def simulate_metrics(self):
        elapsed_time = time.time() - self.start_time
        self.workload_phase = ((elapsed_time / 60 + self.workload_phase) % 10) / 10
        workload = (math.sin(self.workload_phase * 2 * math.pi) + 1) / 2
        workload = min(1, max(0, workload + (secrets.randbelow(20) - 10) / 100))
        sm_clock = int(GPU_SPECS['sm_clock'][0] + (GPU_SPECS['sm_clock'][1] - GPU_SPECS['sm_clock'][0]) * workload)
        gpu_temp = 30 + int(55 * workload)
        power_usage = GPU_SPECS['power'][0] + (GPU_SPECS['power'][1] - GPU_SPECS['power'][0]) * workload
        gpu_util = int(100 * workload)
        mem_util = int(90 * workload)
        fb_used = GPU_SPECS['memory'] * mem_util / 100
        fb_free = GPU_SPECS['memory'] - fb_used
        return [
            ('DCGM_FI_DEV_SM_CLOCK', sm_clock, 'SM clock frequency (in MHz)'),
            ('DCGM_FI_DEV_GPU_TEMP', gpu_temp, 'GPU temperature (in C)'),
            ('DCGM_FI_DEV_POWER_USAGE', round(power_usage, 2), 'Power draw (in W)'),
            ('DCGM_FI_DEV_GPU_UTIL', gpu_util, 'GPU utilization (in %)'),
            ('DCGM_FI_DEV_MEM_COPY_UTIL', mem_util, 'Memory utilization (in %)'),
            ('DCGM_FI_DEV_FB_FREE', round(fb_free, 2), 'Frame buffer memory free (in MB)'),
            ('DCGM_FI_DEV_FB_USED', round(fb_used, 2), 'Frame buffer memory used (in MB)')
        ]

def generate_gpu_metrics(node_id, gpu_simulator):
    metrics = gpu_simulator.simulate_metrics()
    formatted_metrics = []
    for name, value, help_text in metrics:
        formatted_metrics.extend([
            f'# HELP {name} {help_text}',
            f'# TYPE {name} gauge',
            f'{name}{{node="{node_id}",gpu="{gpu_simulator.gpu_id}",UUID="{gpu_simulator.uuid}",'
            f'pci_bus_id="{gpu_simulator.pci_bus_id}",device="nvidia{gpu_simulator.gpu_id}",'
            f'modelName="{GPU_MODEL}",Hostname="node{node_id:04d}",'
            f'DCGM_FI_DRIVER_VERSION="{gpu_simulator.driver_version}"}} {value}'
        ])
    return '\n'.join(formatted_metrics)

def get_gpu_metrics(node_id):
    gpu_simulators = [GPUSimulator(node_id, gpu_id) for gpu_id in range(GPUS_PER_NODE)]
    all_metrics = []
    for gpu_simulator in gpu_simulators:
        all_metrics.append(generate_gpu_metrics(node_id, gpu_simulator))
    return '\n'.join(all_metrics)

def create_flask_app(node_range):
    app = Flask(__name__)

    @app.route('/metrics/<int:node_id>', methods=['GET'])
    def metrics(node_id):
        if node_id in node_range:
            metrics_data = get_gpu_metrics(node_id)
            return Response(metrics_data, mimetype='text/plain')
        return "Node not found", 404

    return app

class GunicornApp(BaseApplication):
    def __init__(self, app, options=None):
        self.application = app
        self.options = options or {}
        super().__init__()

    def load_config(self):
        for key, value in self.options.items():
            if key in self.cfg.settings and value is not None:
                self.cfg.set(key, value)

    def load(self):
        return self.application

def run_simulator(node_range, port):
    app = create_flask_app(node_range)
    options = {
        'bind': f'127.0.0.1:{port}',
        'workers': 4,
    }
    GunicornApp(app, options).run()

async def create_producer():
    try:
        producer = AIOKafkaProducer(
            bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k is not None else None
        )
        await producer.start()
        return producer
    except Exception as e:
        logger.error(f"Failed to create Kafka producer: {e}")
        raise

async def shutdown_producer(producer):
    logger.info("Shutting down producer...")
    await producer.stop()
    logger.info("Producer shutdown complete.")

async def fetch_metrics(session, url):
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.text()

def log_summary(metrics, start_time, end_time):
    elapsed_time = end_time - start_time
    throughput = metrics['messages_received'] / elapsed_time.total_seconds() if elapsed_time.total_seconds() > 0 else 0
    avg_processing_time = metrics['processing_time'].total_seconds() / metrics['messages_received'] if metrics['messages_received'] > 0 else 0

    summary = (
        f"\n--- Summary ---\n"
        f"Start Time: {start_time}\n"
        f"End Time: {end_time}\n"
        f"Elapsed Time: {elapsed_time}\n"
        f"Messages Received: {metrics['messages_received']}\n"
        f"Bytes Received: {metrics['bytes_received']}\n"
        f"Throughput: {throughput:.2f} msg/s\n"
        f"Average Processing Time: {avg_processing_time:.4f} s/msg\n"
        f"Errors: {metrics['errors']}\n"
    )
    
    logger.info(summary)
    
    # Write summary to a log file
    with open("benchmark_summary.txt", "a") as f:
        f.write(summary)

async def run_benchmark(num_nodes):
    num_processes = min(multiprocessing.cpu_count(), 16)
    nodes_per_process = num_nodes // num_processes
    remaining_nodes = num_nodes % num_processes

    simulator_processes = []
    for i in range(num_processes):
        start_node = i * nodes_per_process
        end_node = start_node + nodes_per_process + (remaining_nodes if i == num_processes - 1 else 0)
        node_range = range(start_node, end_node)
        port = 51000 + i
        p = multiprocessing.Process(target=run_simulator, args=(node_range, port))
        simulator_processes.append(p)
        p.start()

    await asyncio.sleep(5)

    producer = await create_producer()
    consumer = await create_consumer()
    
    end_time = datetime.now() + timedelta(seconds=BENCHMARK_DURATION)
    
    consumer_task = asyncio.create_task(consume_metrics(consumer, end_time))

    producer_tasks = []
    async with aiohttp.ClientSession() as session:
        for i in range(num_processes):
            start_node = i * nodes_per_process
            end_node = start_node + nodes_per_process + (remaining_nodes if i == num_processes - 1 else 0)
            node_range = range(start_node, end_node)
            port = 51000 + i
            task = asyncio.create_task(fetch_and_send_metrics(producer, session, node_range, port, end_time))
            producer_tasks.append(task)
        
        await asyncio.gather(*producer_tasks)

    consumer_metrics = await consumer_task

    await shutdown_producer(producer)
    await shutdown_consumer(consumer)

    for p in simulator_processes:
        p.terminate()
        p.join()

    log_summary(consumer_metrics, end_time - timedelta(seconds=BENCHMARK_DURATION), end_time)

    return consumer_metrics['messages_received'] / BENCHMARK_DURATION

async def main():
    node_counts = [2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 5096]
    results = {}

    for num_nodes in node_counts:
        logger.info(f"Running benchmark for {num_nodes} nodes...")
        messages_per_second = await run_benchmark(num_nodes)
        results[num_nodes] = messages_per_second
        logger.info(f"Benchmark result for {num_nodes}: {messages_per_second:.2f} messages/second")

    logger.info("Final Benchmark Results:")
    with open("benchmark_summary.txt", "a") as f:
        for num_nodes, messages_per_second in results.items():
            result_line = f"{num_nodes} nodes: {messages_per_second:.2f} messages/second\n"
            logger.info(result_line)
            f.write(result_line)

if __name__ == "__main__":
    asyncio.run(main())


RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
import requests
import multiprocessing

def check_node(node_id, base_port):
    url = f"http://localhost:{base_port + node_id}/metrics"
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        return node_id, True, ""
    except requests.exceptions.RequestException as e:
        return node_id, False, str(e)

def main():
    num_nodes = 1024  # Replace with your number of nodes
    base_port = 51000  # Replace with your base port

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:  # Use all CPU cores
        results = pool.starmap(check_node, [(node_id, base_port) for node_id in range(num_nodes)])

    down_servers = []
    all_nodes_ok = True
    for node_id, is_up, error_message in results:
        if not is_up:
            all_nodes_ok = False
            down_servers.append(f"Node {node_id}: {error_message}")

    if all_nodes_ok:
        print("\nAll nodes are running and responding.")
    else:
        print("\nThe following nodes are not running or responding:")
        for server in down_servers:
            print(server)
main()