In [9]:
# подгружаем нужное
from elasticsearch import Elasticsearch # проверено на 8.11.1 https://elasticsearch-py.readthedocs.io/
from opensearchpy import OpenSearch # проверено на 2.4.2 https://github.com/opensearch-project/opensearch-py/blob/main/USER_GUIDE.md
import elastic_external_functions
import pandas
# отключаем по желанию
#import urllib3
#urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Сначала создаём клиенты подключений к elastic и opensearch

In [38]:
# работаем с учётными данными, лучше это делать безопасно, например, через keyring
elastic_api_key = "foobarapikey"
opensearch_credentials = ("foo_login", "bar_password")

In [39]:
#создаём экземпляр клиента к эластику
elastic_client = Elasticsearch(
    ['https://foobarelastic.ru:9201'],
    #basic_auth=('', ''), или http_auth=('', '')
    api_key=elastic_api_key, 
    verify_certs=False, 
    request_timeout=300, 
    max_retries=10, 
    retry_on_timeout=True
)

In [12]:
opensearch_client = OpenSearch(
    hosts = [{'host': "foobaropensearch.ru", 'port': 9200}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = opensearch_credentials,
    #client_cert = client_cert_path,
    #client_key = client_key_path,
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False,
    timeout=300, 
    max_retries=10
    #ca_certs = ca_certs_path
)

Подготовка для запроса в elastic (прямой, не агрегация)

In [34]:
current_client = elastic_client
current_query = {}
current_query["index"] = "data-*"
current_query["gte"] = "2024-07-30T14:23:42.840Z"
current_query["lte"] = "2024-07-30T14:23:52.840Z"
current_query["query"] = {
    "bool": {
      "must": [],
      "filter": [
        {
          "range": {
            "@timestamp": {
              "format": "strict_date_optional_time",
              "gte": current_query["gte"],
              "lte": current_query["lte"]
            }
          }
        },
        {
          "match_phrase": {
            "destination.ip": "10.10.44.107"
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
current_query["sort"] = [
    {
      "@timestamp": {
        "order": "desc",
        "format": "strict_date_optional_time",
        "unmapped_type": "boolean"
      }
    }
  ]
current_query["fields"] = [{"field":"*","include_unmapped":"true"}]
current_query["size"] = 1000
current_query["search_after_shift"] = -10 # если в ответе данных будет ровно current_query["size"], например, 1000, то в качестве нового lte мы возьмём значение поля sort от 1000 +  current_query["search_after_shift"], т.е. 990.

Делаем запрос

In [35]:
current_data = elastic_external_functions.data_taxi(
                elastic_client = current_client, 
                index = current_query["index"], 
                query = current_query["query"], 
                sort = current_query["sort"], 
                fields = current_query["fields"], 
                size = current_query["size"], 
                search_after = [current_query["lte"]], 
                search_after_shift = current_query["search_after_shift"], 
                debug = False
)

In [40]:
pandas.DataFrame(current_data)[["@timestamp","network.transport","event.type","destination.ip"]] #...пример полей

Unnamed: 0,@timestamp,network.transport,event.type,destination.ip
0,2024-07-30T14:23:52.001Z,udp,connection,10.10.44.107
1,2024-07-30T14:23:52.001Z,udp,connection,10.10.44.107


Подготовка для запроса в elastic (агрегация)

In [30]:
current_client = elastic_client
current_query = {}
current_query["index"] = "data-*"
current_query["gte"] = "2024-07-30T14:31:15.572Z"
current_query["lte"] = "2024-07-30T14:41:15.572Z"
current_query["query"] = {
    "bool": {
      "must": [],
      "filter": [
        {
          "range": {
            "@timestamp": {
              "format": "strict_date_optional_time",
              "gte": current_query["gte"],
              "lte": current_query["lte"]
            }
          }
        },
        {
          "match_phrase": {
            "destination.ip": "10.10.44.107"
          }
        }
      ],
      "should": [],
      "must_not": []
    }
  }
current_query["aggs"] = {
    "2": {
      "terms": {
        "field": "network.transport",
        "order": {
          "_count": "desc"
        },
        "size": 5,
        "shard_size": 25
      },
      "aggs": {
        "3": {
          "terms": {
            "field": "destination.ip",
            "order": {
              "_count": "desc"
            },
            "size": 5,
            "shard_size": 25
          },
          "aggs": {
            "4": {
              "min": {
                "field": "@timestamp"
              }
            },
            "5": {
              "max": {
                "field": "@timestamp"
              }
            }
          }
        }
      }
    }
  }

Делаем запрос

In [31]:
current_data = elastic_external_functions.data_taxi_aggs(
                elastic_client = current_client,
                index = current_query["index"], 
                query = current_query["query"],
                size = 0,
                aggs = current_query["aggs"],
                debug = False
        )

In [32]:
pandas.DataFrame(current_data)

Unnamed: 0,network.transport_terms,destination.ip_terms,doc_count,@timestamp_min,@timestamp_max
0,udp,10.10.44.107,27,2024-07-30T14:31:42.001Z,2024-07-30T14:41:02.001Z
1,tcp,10.10.44.107,19,2024-07-30T14:32:07.000Z,2024-07-30T14:41:02.001Z


Для opensearch работает аналогично, просто указываем в качестве current_client = opensearch_client