## Non-overlapping batches of 24 consecutive samples, 10 seconds rate for reading values, i.e.:

First batch: t1-t24,

Second batch: t25-t48,

Third batch: t49-t72,

etc.

Let’s build a modular producer that:

Scrapes Prometheus every 10 seconds (configurable).

Collects exactly 24 samples per batch (configurable).

After collecting a batch, publishes the whole batch as one Kafka message (instead of individual points).

Can be customized via ENV or function args.

Each time it publishes a batch, prints it as well for your checks.

**key features:**

No buffer per instance, no leftovers:
Each time, it collects 24 new points, sends, and then starts the next batch cleanly.

Prints each point as it’s scraped:
Like [01] 2025-09-18 19:52:40, 33.17233333

Batch printout:
Each time it publishes a batch, prints all 24 with nice formatting.

No overlap, no sliding, no duplication.

Always starts fresh:
Run script multiple times, each run collects new data starting from now.

In [None]:
# %pip install prometheus-api-client kafka-python

import os
import time
import json
import datetime
from typing import List
from prometheus_api_client import PrometheusConnect
from kafka import KafkaProducer

def fresh_batches_and_publish(
    prom_url: str = os.getenv("PROM_URL", "http://prometheus-operated.prometheus.svc.cluster.local:9090"),
    kafka_bootstrap: str = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka.apache-kafka.svc.cluster.local:9092"),
    topic: str = os.getenv("KAFKA_TOPIC", "cpu-batch"),
    prom_query: str = os.getenv(
        "PROM_QUERY",
        'sum by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[1m]))'
    ),
    target_instance: str = os.getenv("TARGET_INSTANCE", None),   # e.g. "172.19.0.3:9100"
    batch_size: int = int(os.getenv("BATCH_SIZE", "24")),
    interval_sec: int = int(os.getenv("INTERVAL_SEC", "10")),
    print_each_batch: bool = True,
):
    """
    Collect exactly batch_size points for each batch, publish, repeat. Start fresh every run.
    Runs infinitely!
    """
    prom = PrometheusConnect(url=prom_url, disable_ssl=True)
    producer = KafkaProducer(
        bootstrap_servers=[s.strip() for s in kafka_bootstrap.split(",") if s.strip()],
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    print("============ CONFIG ============")
    print(f"Prometheus: {prom_url}")
    print(f"Query:      {prom_query}")
    print(f"Instance:   {target_instance if target_instance else 'ALL'}")
    print(f"Kafka:      {kafka_bootstrap}")
    print(f"Topic:      {topic}")
    print(f"Interval:   {interval_sec} sec")
    print(f"Batch size: {batch_size}")
    print("================================\n")

    batches_sent = 0

    while True:
        batch_points: List[dict] = []
        print(f"[producer] Collecting batch #{batches_sent+1} ...")
        while len(batch_points) < batch_size:
            tick_ts = int(time.time())
            try:
                result = prom.custom_query(query=prom_query)
            except Exception as e:
                print(f"[producer] Prometheus query FAILED: {e}")
                time.sleep(interval_sec)
                continue

            found = False
            for sample in result:
                inst = sample["metric"].get("instance")
                if target_instance and inst != target_instance:
                    continue
                cpu_val = float(sample["value"][1])
                ts_human = datetime.datetime.fromtimestamp(tick_ts).strftime("%Y-%m-%d %H:%M:%S")
                print(f"[{len(batch_points)+1:02d}] {ts_human}, {cpu_val:.8f}")
                batch_points.append({
                    "ts": ts_human,
                    "cpu_pct": cpu_val,
                })
                found = True
                break  # only take first match for target_instance

            if not found:
                print("[producer] No data for instance, retrying...")
            if len(batch_points) < batch_size:
                time.sleep(interval_sec)  # wait for next point

        # Build message for Kafka
        msg = {
            "instance": target_instance,
            "batch_ts": [pt["ts"] for pt in batch_points],
            "batch_cpu_pct": [pt["cpu_pct"] for pt in batch_points],
            "batch_number": batches_sent + 1,
        }
        producer.send(topic, value=msg)
        producer.flush()
        if print_each_batch:
            print(f"\n[producer] Published batch #{batches_sent+1} for {target_instance}:")
            for i, (ts, v) in enumerate(zip(msg["batch_ts"], msg["batch_cpu_pct"]), 1):
                print(f"  [{i:02d}] {ts}, {v:.8f}")
            print("-" * 40)
        batches_sent += 1

        # --- Fix: Wait for next *new* interval before starting the next batch ---
        time.sleep(interval_sec)

    # (never exits, runs forever)



In [None]:
fresh_batches_and_publish(
    prom_url="http://prometheus-operated.prometheus.svc.cluster.local:9090",
    kafka_bootstrap="kafka.apache-kafka.svc.cluster.local:9092",
    topic="cpu-batch",
    prom_query='sum by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[1m]))',
    target_instance="172.19.0.3:9100",  # your node
    batch_size=24,
    interval_sec=10,
    print_each_batch=True,
)

### V2

**More robust:** track the last timestamp used, and always wait for the next real new point).
tracking the last timestamp used and always waiting for the next new point is the most robust and accurate solution, especially in distributed or cloud settings

**Why “wait for new timestamp” is more robust:**

Network delays or Prometheus lag:
If you just sleep(interval_sec), the next point you get may have the same timestamp as before (especially if the scrape interval, query interval, or system clock drifts).

System time may not be perfectly in sync
If you run this for a long time, clocks can slip, and you might see repeated values.

You want each batch to have strictly unique, increasing timestamps.

This also handles the case where you miss a point (Prometheus slow, or system hiccup):
You’ll wait until a truly new value arrives, and then record it—so your batches always have 24 unique timestamps.

**How to implement:**

Keep track of the timestamp of the last collected point.

In each loop, after you fetch from Prometheus,

If the timestamp is different from the previous one, accept and store it.

If not, sleep a bit and try again (until you see a new timestamp).

Result: You’ll never collect two points with the same timestamp, even if Prometheus scrapes are delayed or your code resumes early

**Advantages of this approach:**

No duplicate points ever, regardless of query speed or Prometheus delays.

Handles missing data: waits until new point is available, no assumption about timing.

Maximum data integrity for downstream ML use.

**How this works:**

It only accepts a new sample when the Prometheus timestamp is exactly a multiple of your interval (interval_sec).

If you want 10s intervals, you'll only get times like 20:10:00, 20:10:10, 20:10:20, etc.

No duplicates: it skips timestamps it already saw.

Runs forever, producing one clean batch after another.

No sleep for 10s needed; it waits for the next aligned data point automatically.

If your Prometheus is "late" or "skips" a sample, you'll just have a bigger gap (never duplicate)

In [1]:
# %pip install prometheus-api-client kafka-python

import os
import time
import json
import datetime
from typing import List

from prometheus_api_client import PrometheusConnect
from kafka import KafkaProducer

def fresh_batches_and_publish(
    prom_url: str = os.getenv("PROM_URL", "http://prometheus-operated.prometheus.svc.cluster.local:9090"),
    kafka_bootstrap: str = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka.apache-kafka.svc.cluster.local:9092"),
    topic: str = os.getenv("KAFKA_TOPIC", "cpu-batch"),
    prom_query: str = os.getenv(
        "PROM_QUERY",
        'sum by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[1m]))'
    ),
    target_instance: str = os.getenv("TARGET_INSTANCE", None),
    batch_size: int = int(os.getenv("BATCH_SIZE", "24")),
    interval_sec: int = int(os.getenv("INTERVAL_SEC", "10")),
    print_each_batch: bool = True,
):
    """
    Collects only points where the timestamp is aligned (multiple of interval_sec).
    Ensures precise 10s (or any interval) spacing for your ML pipeline.
    Runs infinitely!
    """
    prom = PrometheusConnect(url=prom_url, disable_ssl=True)
    producer = KafkaProducer(
        bootstrap_servers=[s.strip() for s in kafka_bootstrap.split(",") if s.strip()],
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )

    print("============ CONFIG ============")
    print(f"Prometheus: {prom_url}")
    print(f"Query:      {prom_query}")
    print(f"Instance:   {target_instance if target_instance else 'ALL'}")
    print(f"Kafka:      {kafka_bootstrap}")
    print(f"Topic:      {topic}")
    print(f"Interval:   {interval_sec} sec")
    print(f"Batch size: {batch_size}")
    print("================================\n")

    batches_sent = 0
    last_seen_ts = None

    while True:
        batch_points: List[dict] = []
        print(f"[producer] Collecting batch #{batches_sent+1} ...")
        while len(batch_points) < batch_size:
            try:
                result = prom.custom_query(query=prom_query)
            except Exception as e:
                print(f"[producer] Prometheus query FAILED: {e}")
                time.sleep(2)
                continue

            found = False
            for sample in result:
                inst = sample["metric"].get("instance")
                if target_instance and inst != target_instance:
                    continue
                cpu_val = float(sample["value"][1])
                prom_ts = int(float(sample["value"][0]))
                # Only accept aligned timestamps (e.g., every 10 seconds: :00, :10, :20, ...)
                if prom_ts % interval_sec != 0:
                    continue
                # Skip duplicate timestamps
                if last_seen_ts is not None and prom_ts == last_seen_ts:
                    continue
                ts_human = datetime.datetime.fromtimestamp(prom_ts).strftime("%Y-%m-%d %H:%M:%S")
                print(f"[{len(batch_points)+1:02d}] {ts_human}, {cpu_val:.8f}")
                batch_points.append({
                    "ts": ts_human,
                    "cpu_pct": cpu_val,
                })
                last_seen_ts = prom_ts
                found = True
                break  # take only one sample per query (for target_instance)

            if not found or (len(batch_points) < batch_size):
                time.sleep(1)  # Check again soon

        msg = {
            "instance": target_instance,
            "batch_ts": [pt["ts"] for pt in batch_points],
            "batch_cpu_pct": [pt["cpu_pct"] for pt in batch_points],
            "batch_number": batches_sent + 1,
        }
        producer.send(topic, value=msg)
        producer.flush()
        if print_each_batch:
            print(f"\n[producer] Published batch #{batches_sent+1} for {target_instance}:")
            for i, (ts, v) in enumerate(zip(msg["batch_ts"], msg["batch_cpu_pct"]), 1):
                print(f"  [{i:02d}] {ts}, {v:.8f}")
            print("-" * 40)
        batches_sent += 1


In [None]:
fresh_batches_and_publish(
    prom_url="http://prometheus-operated.prometheus.svc.cluster.local:9090",
    kafka_bootstrap="kafka.apache-kafka.svc.cluster.local:9092",
    topic="cpu-batch",
    prom_query='sum by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[1m]))',
    target_instance="172.19.0.3:9100",  # ISI cluster node
    batch_size=24,
    interval_sec=10,
    print_each_batch=True,
)


Prometheus: http://prometheus-operated.prometheus.svc.cluster.local:9090
Query:      sum by (instance) (rate(node_cpu_seconds_total{mode!="idle"}[1m]))
Instance:   172.19.0.3:9100
Kafka:      kafka.apache-kafka.svc.cluster.local:9092
Topic:      cpu-batch
Interval:   10 sec
Batch size: 24

[producer] Collecting batch #1 ...
[01] 2025-09-18 20:31:50, 29.61666667
[02] 2025-09-18 20:32:00, 29.61666667
[03] 2025-09-18 20:32:10, 29.61666667
[04] 2025-09-18 20:32:20, 29.53633333
