In [1]:
import requests

API_KEY = '<API_KEY>'
API_SECRET = '<API_SECRET>'
CLUSTER_ID = '<CLUSTER_ID>'

## List the available metrics

In [2]:
response = requests.get('https://api.telemetry.confluent.cloud/v1/metrics/cloud/descriptors', auth=(API_KEY, API_SECRET))
if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json()['data'])

[{'name': 'io.confluent.kafka.server/received_bytes', 'description': 'The delta count of bytes received from the network. Each sample is the number of bytes received since the previous data sample. The count is sampled every 60 seconds.', 'type': 'COUNTER_INT64', 'unit': 'By', 'lifecycle_stage': 'GENERAL_AVAILABILITY', 'labels': [{'description': 'ID of the Kafka cluster', 'key': 'cluster_id'}, {'description': 'Name of the Kafka topic', 'key': 'topic'}, {'description': 'Partition number of the Kafka topic', 'key': 'partition'}]}, {'name': 'io.confluent.kafka.server/sent_bytes', 'description': 'The delta count of bytes sent over the network. Each sample is the number of bytes sent since the previous data point. The count is sampled every 60 seconds.', 'type': 'COUNTER_INT64', 'unit': 'By', 'lifecycle_stage': 'GENERAL_AVAILABILITY', 'labels': [{'description': 'ID of the Kafka cluster', 'key': 'cluster_id'}, {'description': 'Name of the Kafka topic', 'key': 'topic'}, {'description': 'Parti

## List the available topics for a given metric in a specified interval

In [3]:
payload = {
    "filter": {
        "field": "metric.label.cluster_id",
        "op": "EQ",
        "value": CLUSTER_ID
    },
    "group_by": [
        "metric.label.topic"
    ],
    "intervals": [
        "2020-01-13T10:30:00-05:00/2020-01-13T11:00:00-05:00"
    ],
    "limit": 25,
    "metric": "io.confluent.kafka.server/sent_bytes/delta"
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/attributes',
                        auth=(API_KEY, API_SECRET),
                       json=payload)
print(response)
if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

<Response [200]>
{'data': [], 'meta': {'pagination': {'page_size': 25}}}


## Query for bytes sent to consumers per minute grouped by topic

In [4]:
payload = {
    "aggregations": [
        {
            "agg": "SUM",
            "metric": "io.confluent.kafka.server/sent_bytes"
        }
    ],
    "filter": {
        "filters": [
            {
                "field": "metric.label.cluster_id",
                "op": "EQ",
                "value": CLUSTER_ID
            }
        ],
        "op": "AND"
    },
    "granularity": "PT1M",
    "group_by": [
        "metric.label.topic"
    ],
    "intervals": [
        "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00"
    ],
    "limit": 25
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/query',
                        auth=(API_KEY, API_SECRET),
                       json=payload)

if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

{'data': []}


## Query for bytes sent by producers per minute grouped by topic

In [5]:
payload = {
    "aggregations": [
        {
            "agg": "SUM",
            "metric": "io.confluent.kafka.server/received_bytes"
        }
    ],
    "filter": {
        "filters": [
            {
                "field": "metric.label.cluster_id",
                "op": "EQ",
                "value": CLUSTER_ID
            }
        ],
        "op": "AND"
    },
    "granularity": "PT1M",
    "group_by": [
        "metric.label.topic"
    ],
    "intervals": [
        "2019-12-19T11:00:00-05:00/2019-12-19T11:05:00-05:00"
    ],
    "limit": 25
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/query',
                        auth=(API_KEY, API_SECRET),
                       json=payload)

if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

{'data': []}


## Query for max retained bytes per hour over 2 hours for topic named test-topic

In [6]:
payload = {
    "aggregations": [
        {
            "agg": "SUM",
            "metric": "io.confluent.kafka.server/retained_bytes"
        }
    ],
    "filter": {
        "filters": [
            {
                 "field": "metric.label.topic",
                 "op": "EQ",
                 "value": "test-topic"
            },
            {
                "field": "metric.label.cluster_id",
                "op": "EQ",
                "value": CLUSTER_ID
            }
        ],
        "op": "AND"
    },
    "granularity": "PT1M",
    "group_by": [
        "metric.label.topic"
    ],
    "intervals": [
        "2019-12-19T11:00:00-05:00/P0Y0M0DT2H0M0S"
    ],
    "limit": 25
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/query',
                        auth=(API_KEY, API_SECRET),
                       json=payload)

if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

{'data': []}


## Query for max retained bytes per hour over 2 hours for a cluster lkc-XXXX

In [7]:
payload = {
    "aggregations": [
        {
            "agg": "SUM",
            "metric": "io.confluent.kafka.server/retained_bytes"
        }
    ],
    "filter": {
        "filters": [
            {
                "field": "metric.label.cluster_id",
                "op": "EQ",
                "value": CLUSTER_ID
            }
        ],
        "op": "AND"
    },
    "granularity": "PT1H",
    "group_by": [
        "metric.label.cluster_id"
    ],
    "intervals": [
        "2019-12-19T11:00:00-05:00/P0Y0M0DT2H0M0S"
    ],
    "limit": 5
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/query',
                        auth=(API_KEY, API_SECRET),
                       json=payload)

if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

{'data': []}


## Query for metrics currently available for a cluster

In [8]:
payload = {
  "filter": {
    "field": "metric.label.cluster_id",
    "op": "EQ",
    "value": CLUSTER_ID
  }
}

response = requests.post('https://api.telemetry.confluent.cloud/v1/metrics/cloud/available',
                        auth=(API_KEY, API_SECRET),
                       json=payload)

if response.status_code != 200:
    print('oops, something went wrong!!')
else:
    print(response.json())

{'data': [{'metric': 'io.confluent.kafka.server/retained_bytes'}, {'metric': 'io.confluent.kafka.server/active_connection_count'}, {'metric': 'io.confluent.kafka.server/partition_count'}, {'metric': 'io.confluent.kafka.server/request_count'}], 'meta': {'pagination': {'page_size': 100}}}
