In [1]:
!pip install kafka

Collecting kafka
  Downloading kafka-1.3.5-py2.py3-none-any.whl.metadata (6.9 kB)
Downloading kafka-1.3.5-py2.py3-none-any.whl (207 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.2/207.2 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: kafka
Successfully installed kafka-1.3.5


In [2]:
!pip install confluent-kafka

Collecting confluent-kafka
  Downloading confluent_kafka-2.4.0-cp311-cp311-manylinux_2_28_aarch64.whl.metadata (2.3 kB)
Downloading confluent_kafka-2.4.0-cp311-cp311-manylinux_2_28_aarch64.whl (14.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.2/14.2 MB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-2.4.0


In [4]:
import sys
import json
import logging
import confluent_kafka
from confluent_kafka import KafkaError, KafkaException


class KafkaObject(object):
    def __init__(
            self,
            bootstrap_servers,
            buffering_max_messages=2000000,
            session_timeout=1740000,
            max_pol_interval_ms=1750000,
            heartbeat_interval_ms=30000,
            connections_max_handle_ms=54000000,
            off_set_reset='earliest'
    ):
        self.bootstrap_servers = bootstrap_servers
        self.producer_conf = {
            'bootstrap.servers': self.bootstrap_servers,
            'queue.buffering.max.messages': buffering_max_messages
        }
        self.consumer_conf = {
            'bootstrap.servers': self.bootstrap_servers,
            'session.timeout.ms': session_timeout,
            'heartbeat.interval.ms': heartbeat_interval_ms,
            'connections.max.idle.ms': connections_max_handle_ms,
            'max.poll.interval.ms': max_pol_interval_ms,
            'fetch.wait.max.ms': 1000,
            'socket.keepalive.enable': 'true',
            'default.topic.config': {
                'auto.offset.reset': off_set_reset
            }
        }
        logging.basicConfig(level=logging.DEBUG)
        self.logger = logging.getLogger('kafka-object')


    def handler(self, msg):
        json_result = json.loads(msg.value().decode())
        # dict_keys(['labels', 'name', 'timestamp', 'value'])
        result = json_result["labels"]
        metric_name = result["__name__"]
        if metric_name == "kube_pod_container_status_restarts_total":
            self.logger.info(json_result["labels"]["pod"])
            self.logger.info(json_result["timestamp"])
            self.logger.info(json_result["value"])

    def producer(
            self,
            msg,
            topic
    ):
        messages_overflow = 0
        producer = confluent_kafka.Producer(**self.producer_conf)
        try:
            producer.produce(topic, value=json.dumps(msg))
        except BufferError as e:
            messages_overflow += 1

        # checking for overflow
        self.logger.error(f'BufferErrors: {messages_overflow}')
        producer.flush()

    def consumer(
            self,
            list_of_topics,
            group_id
    ):
        consumer_config = self.consumer_conf
        consumer_config['group.id'] = group_id
        consumer = confluent_kafka.Consumer(**consumer_config)
        consumer.subscribe(list_of_topics)

        while True:
            msg = consumer.poll()
            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write(
                        '%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    # Error
                    raise KafkaException(msg.error())
            else:
                self.handler(msg)

In [5]:
import os
KAFKA_HOST = os.environ.get("KAFKA_HOST", "broker")
KAFKA_PORT = os.environ.get("KAFKA_PORT", 29092)
GROUP_ID = os.environ.get("GROUP_ID", "temp8089")
TARGET_TOPICS = ["metrics"]

# TSCALE SETTINGS
TSCALE_HOST = os.environ.get("TSCALE_HOST", "timescaledb")
TSCALE_USER = os.environ.get("TSCALE_NAME", "aces")
TSCALE_DB = os.environ.get("TSCALE_DB", "aces")
TSCALE_PASS = os.environ.get("TSCALE_PASS", "aces")

# NEO4J SETTINGS
NEO4J_HOST = os.environ.get("NEO4J_HOST", "neo4j")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASS = os.environ.get("NEO4J_PASS", "neo4j290292")


kafka_obj = KafkaObject(
    bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}'
)

In [6]:
kafka_obj.consumer(
    TARGET_TOPICS,
    GROUP_ID
)

INFO:kafka-object:metrics-catalogue-79595fbf84-jzxls
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-object:notebook-76d88568cf-swc9j
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-object:prometheus-prometheus-node-exporter-gtlmx
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:3
INFO:kafka-object:zookeeper-656f66f76d-4smcj
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-object:coredns-76f75df574-rqp5p
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-object:kube-apiserver-docker-desktop
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-object:broker-64c5757fcc-rcfvq
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:1
INFO:kafka-object:prometheus-kube-state-metrics-7b74466fbb-57rmd
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:1
INFO:kafka-object:kube-controller-manager-docker-desktop
INFO:kafka-object:2024-06-07T10:54:48Z
INFO:kafka-object:0
INFO:kafka-o

KeyboardInterrupt: 

In [1]:
! pip install psycopg2-binary



In [7]:
import datetime
import os
import psycopg2


class TimeScaleDB(object):

    @staticmethod
    def construct_uri(
            host,
            username,
            password,
            database,
            port=5432
    ):
        this_uri = f"postgres://{username}:{password}@{host}:{port}/{database}"
        return this_uri

    def __init__(
            self,
            host,
            username,
            password,
            database
    ):
        self.conn = psycopg2.connect(
            self.construct_uri(host=host, username=username, password=password, database=database)
        )
        self.cursor = self.conn.cursor()

        def close_client(
            self
        ):
            self.conn.commit()

    def create_temp_table(self, table_name="temp_table"):
        table_creation_query = f"""
            CREATE TABLE {table_name} ( 
                time TIMESTAMPTZ NOT NULL,
                pod TEXT,
                num_of_restarts INTEGER
            )"""
        create_hyper_table = f"""SELECT create_hypertable('{table_name}', by_range('time'))"""
        self.cursor.execute(table_creation_query)
        self.cursor.execute(create_hyper_table)
        self.close_client()

In [114]:
# TSCALE SETTINGS
TSCALE_HOST = os.environ.get("TSCALE_HOST", "timescaledb")
TSCALE_USER = os.environ.get("TSCALE_NAME", "aces")
TSCALE_DB = os.environ.get("TSCALE_DB", "aces")
TSCALE_PASS = os.environ.get("TSCALE_PASS", "aces")

In [115]:
tdb = TimeScaleDB(
    host=TSCALE_HOST,
    username=TSCALE_USER,
    database=TSCALE_DB,
    password=TSCALE_PASS
)

In [116]:
# tdb.create_temp_table()

In [117]:
def upsert_num_of_restarts(pod_id, num_of_restarts, time, tdb_client):
    target_metric = 'kube_pod_container_status_restarts_total'
    target_node = 'node1'
    tdb_client.cursor.execute(
        f"""
        SELECT time, pod, value FROM metrics_values
        WHERE pod='{pod_id}' AND metric='{target_metric}'
        """
    )
    entry_exists_for_pod = tdb.cursor.fetchone()
    if entry_exists_for_pod:
        print("record exists")
        print(entry_exists_for_pod)
        num_of_restarts_old = entry_exists_for_pod[2]
        if num_of_restarts_old == num_of_restarts:
            print("it is the same do not update")
        else:
            print("update")
            tdb.cursor.execute(
                f"""
                UPDATE metrics_values 
                SET value={num_of_restarts}, time='{time}'
                WHERE pod='{pod_id}' AND metric='{target_metric}'
                """
            )
            tdb.conn.commit()
    else:
        print("fresh insert")
        tdb.cursor.execute(
            f"INSERT INTO metrics_values (time, metric, node, pod, value) VALUES (%s, %s, %s, %s, %s);",
            (time, target_metric, target_node, pod_id, num_of_restarts)
        )
        tdb.conn.commit()
    tdb.conn.close()

In [118]:
upsert_num_of_restarts(pod_id="neo4j-0", num_of_restarts=19, time="2024-06-05 08:37:39.000000 +00:00", tdb_client=tdb)

record exists
(datetime.datetime(2024, 6, 5, 8, 36, 39, tzinfo=datetime.timezone.utc), 'neo4j-0', 8.0)
update
