Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

PrometheusReader implementation #923

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions llama_hub/library.json
Original file line number Diff line number Diff line change
Expand Up @@ -876,5 +876,13 @@
"Capella",
"NoSQL"
]
},
"PrometheusReader":{
"id": "prometheus",
"author": "mjimenezp",
"keywords":[
"prometheus",
"metrics"
]
}
}
35 changes: 35 additions & 0 deletions llama_hub/prometheus/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Prometheus Loader

This loader loads metrics from Prometheus. The user specifies a Prometheus instance to initialize the reader. They then specify the query, the date range, steps, Http parameters and which fields must be used as metadata. Also can be added additional metadata to the values using the parameter additional_metadata.

Automatically adds to the metadata the timestamp value and create a document per value returned by the prometheus query.

## Usage

Here's an example usage of the PrometheusReader.

```python
from llama_index import download_loader
import os

PrometheusReader = download_loader('PrometheusReader')

endpoint = "<endpoint>"
size = "<size>"
# query_dict is passed into db.collection.find()
query = ""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
metadata_fields = ['id', "host"]
additional_metadata = {
"collection_date": datetime.now().isoformat(),
}
reader = PrometheusReader(endpoint, size)
documents = reader.load_data(query=query,
start_time=start_time,
end_time=end_time,
step="1m",
get_pararameters=None,
metadata_fields=metadata_fields,
additional_metadata=additional_metadata)
```
4 changes: 4 additions & 0 deletions llama_hub/prometheus/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Init file."""
from llama_hub.prometheus.base import PrometheusReader

__all__ = ["PrometheusReader"]
72 changes: 72 additions & 0 deletions llama_hub/prometheus/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import List, Optional, Any
from llama_index.readers.base import BaseReader
from llama_index.readers.schema.base import Document


class PrometheusReader(BaseReader):
def __init__(self, endpoint: str, size: Optional[int] = 100):
try:
from prometheus_api_client import PrometheusConnect
except ImportError as err:
raise ImportError(
"`prometheus-api-client` package not found, please run `pip install prometheus-api-client`"
) from err

self.prometheus_url = endpoint
self.prom_client = PrometheusConnect(url=self.prometheus_url)

def load_data(self, query: str, **load_kwargs: Any) -> List[Document]:
"""Load metrics data from prometheus

Args:
query (str): Prometheus Query Language (PromQL) string
start_time (Optional[datetime], optional): Start time of the query. Defaults to None.
end_time (Optional[datetime], optional): End time of the query. Defaults to None.
step (Optional[str], optional): Step of the query. Defaults to None.
get_pararameters (Optional[dict], optional): Additional parameters to pass to the query. Defaults to None.
metadata_fields (Optional[list], optional): List of metadata fields to include in the response. Defaults to None.
additional_metadata (Optional[dict], optional): Additional metadata to include in the response. Defaults to None.
"""
start_time = load_kwargs.get("start_time", None)
end_time = load_kwargs.get("end_time", None)
step = load_kwargs.get("step", None)
get_pararameters = load_kwargs.get("get_pararameters", None)
metadata_fields = load_kwargs.get("metadata_fields", dict())
additional_metadata = load_kwargs.get("additional_metadata", dict())
# Verify if the start_time and end_time is present, to define if is a range query
if start_time is not None and end_time is not None:
params = {
"query": query,
"start_time": start_time,
"end_time": end_time,
}
if step is not None:
params["step"] = step
else:
params["step"] = "1m"
if get_pararameters is not None:
params["params"] = get_pararameters
result = self.prom_client.custom_query_range(**params)
else:
params = {
"query": query,
}
if get_pararameters is not None:
params["params"] = get_pararameters
result = self.prom_client.custom_query(**params)

documents = []
for row in result:
metadata = dict()
# extract the fields identified as metadata for the document
for key_value in row["metric"]:
if key_value in metadata_fields:
metadata[key_value] = row["metric"][key_value]
for value in row["values"]:
# merge the current metadata fields with additional fixed metadata
current_metadata = metadata | additional_metadata
current_metadata["timestamp"] = value[0]
documents.append(
Document(text=value[1], extra_info=current_metadata, embedding=None)
)
return documents
1 change: 1 addition & 0 deletions llama_hub/prometheus/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
prometheus-api-client==0.5.4
154 changes: 154 additions & 0 deletions tests/test_prometheus/test_prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from llama_hub.prometheus import PrometheusReader
from datetime import datetime, timedelta


def test_prometheus_reader_with_query_range(mocker) -> None:
end_time = datetime.utcnow() - timedelta(days=1)
start_time = end_time - timedelta(hours=1)
mocked_metric_data = [
{
"metric": {
"endpoint": "http-metrics",
"metric_name": "test_metric",
"pod": "test-pod",
},
"values": [],
}
]
for metric_info in mocked_metric_data:
for i in range(0, 60, 5):
timestamp = int((start_time + timedelta(minutes=i)).timestamp())
metric_info["values"].append([timestamp, i * 100])

mocker.patch(
"prometheus_api_client.PrometheusConnect.custom_query_range",
return_value=mocked_metric_data,
)
reader = PrometheusReader(endpoint="test-endpoint", size=100)

query = "avg_over_time(test_metric[5m])"

documents = reader.load_data(query=query, start_time=start_time, end_time=end_time)
assert len(documents) == 12
for doc in range(0, len(documents)):
assert documents[doc].text == str(doc * 5 * 100)


def test_prometheus_reader_with_query(mocker) -> None:
mocked_metric_data = [
{
"metric": {
"Name": "/dev_cicd_test1",
"metric_name": "docker_memory_usage",
"host": "dl1.dockerserver.com",
"offering": "docker",
},
"values": [
[1706898840, "338149320.8186831"],
[1706898900, "338146802.62924147"],
[1706898960, "281811971.070067"],
[1706899020, "281813920.81963813"],
[1706899080, "225405325.11296228"],
[1706899140, "281780621.1695041"],
[1706899200, "281800467.21272033"],
[1706899260, "338153045.11319935"],
[1706899320, "338145581.4473358"],
[1706899380, "338119211.1141539"],
],
},
{
"metric": {
"Name": "/prod_cicd_test2",
"metric_name": "docker_memory_usage",
"host": "dl1.dockerserver.com",
"offering": "docker",
},
"values": [
[1706898840, "106213135.11232835"],
[1706898900, "106212621.7800675"],
[1706898960, "106221285.85705297"],
[1706899020, "79666690.5925202"],
[1706899080, "79650036.00729768"],
[1706899140, "106208946.49888971"],
[1706899200, "106216536.73648405"],
[1706899260, "79660663.82172346"],
[1706899320, "106211849.69262712"],
[1706899380, "53101858.29009599"],
],
},
]
mocker.patch(
"prometheus_api_client.PrometheusConnect.custom_query",
return_value=mocked_metric_data,
)
reader = PrometheusReader(endpoint="test-endpoint", size=100)
query = "rate(docker_memory_usage[5m])"
metadata_fields = ["Name", "metric_name", "host", "offering"]
documents = reader.load_data(
query=query,
metadata_fields=metadata_fields,
)
assert len(documents) == 20
for doc in documents:
field_count = 0
for field in doc.metadata.keys():
if field in metadata_fields:
field_count += 1
assert field_count == len(metadata_fields)


def test_prometheus_reader_with_empty_result(mocker) -> None:
mocked_metric_data = []
mocker.patch(
"prometheus_api_client.PrometheusConnect.custom_query",
return_value=mocked_metric_data,
)
reader = PrometheusReader(endpoint="test-endpoint", size=100)
query = "rate(non_existing_metric[1m])"
documents = reader.load_data(query=query)
assert len(documents) == 0


def test_prometheus_reader_with_additional_metadata(mocker) -> None:
end_time = datetime.utcnow() - timedelta(days=1)
start_time = end_time - timedelta(hours=1)
mocked_metric_data = [
{
"metric": {
"endpoint": "http-metrics",
"metric_name": "test_metric",
"pod": "test-pod",
},
"values": [],
}
]
for metric_info in mocked_metric_data:
for i in range(0, 60, 5):
timestamp = int((start_time + timedelta(minutes=i)).timestamp())
metric_info["values"].append([timestamp, i * 100])

mocker.patch(
"prometheus_api_client.PrometheusConnect.custom_query_range",
return_value=mocked_metric_data,
)
reader = PrometheusReader(endpoint="test-endpoint", size=100)
query = "avg_over_time(test_metric[5m])"
metadata_fields = ["endpoint", "metric_name", "pod"]
additional_metadata = {
"collector": "prometheus_reader",
"collection_date": datetime.now().isoformat(),
}
documents = reader.load_data(
query=query,
start_time=start_time,
end_time=end_time,
metadata_fields=metadata_fields,
additional_metadata=additional_metadata,
)
assert len(documents) == 12
for doc in documents:
all_items_exist = all(
key in doc.metadata and doc.metadata[key] == value
for key, value in additional_metadata.items()
)
assert all_items_exist