# Import packages

In [2]:
from dotenv import load_dotenv
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import os

load_dotenv()

ES_USER = os.getenv("ES_USER")
ES_PASSWORD = os.getenv("ES_PASSWORD")
ES_ENDPOINT = os.getenv("ES_ENDPOINT")

url = f"https://{ES_USER}:{ES_PASSWORD}@{ES_ENDPOINT}:9200"

es = Elasticsearch(
    hosts=[url], 
    ca_certs = "./http_ca.crt", 
    verify_certs = True
)

print(es.info())

{'name': 'liuxgm.local', 'cluster_name': 'elasticsearch', 'cluster_uuid': '1FpGdI1WRcaqlKk01d0dYw', 'version': {'number': '8.12.0', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '1665f706fd9354802c02146c1e6b5c0fbcddfbc9', 'build_date': '2024-01-11T10:05:27.953830042Z', 'build_snapshot': False, 'lucene_version': '9.9.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'}


# Add sample data to Elasticsearch

In [4]:
index_name = "sample_data"

mappings = {
    "mappings": {
        "properties": {"client_ip": {"type": "ip"}, "message": {"type": "keyword"}}
    }
}

# Create the index
if not es.indices.exists(index=index_name):
    es.indices.create(index=index_name, body=mappings)

## Now we can index the data using the Elasticsearch Python client's bulk helpers.

In [7]:
# Documents to be indexed
documents = [
    {
        "@timestamp": "2023-10-23T12:15:03.360Z",
        "client_ip": "172.21.2.162",
        "message": "Connected to 10.1.0.3",
        "event_duration": 3450233,
    },
    {
        "@timestamp": "2023-10-23T12:27:28.948Z",
        "client_ip": "172.21.2.113",
        "message": "Connected to 10.1.0.2",
        "event_duration": 2764889,
    },
    {
        "@timestamp": "2023-10-23T13:33:34.937Z",
        "client_ip": "172.21.0.5",
        "message": "Disconnected",
        "event_duration": 1232382,
    },
    {
        "@timestamp": "2023-10-23T13:51:54.732Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 725448,
    },
    {
        "@timestamp": "2023-10-23T13:52:55.015Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 8268153,
    },
    {
        "@timestamp": "2023-10-23T13:53:55.832Z",
        "client_ip": "172.21.3.15",
        "message": "Connection error",
        "event_duration": 5033755,
    },
    {
        "@timestamp": "2023-10-23T13:55:01.543Z",
        "client_ip": "172.21.3.15",
        "message": "Connected to 10.1.0.1",
        "event_duration": 1756467,
    },
]

# Prepare the actions for the bulk API using list comprehension
actions = [{"_index": index_name, "_source": doc} for doc in documents]

# Perform the bulk index operation and capture the response
success, failed = bulk(es, actions)

if failed:
    print(f"Some documents failed to index: {failed}")
else:
    print(f"Successfully indexed {success} documents.")

Successfully indexed 7 documents.


## Suppress specific Elasticsearch warnings about default limit of [500] that pollute responses

In [9]:
import warnings
from elasticsearch import ElasticsearchWarning

warnings.filterwarnings("ignore", category=ElasticsearchWarning)

## Format response to return human-readable tables

In [10]:
def format_response(response_data):
    column_names = [col["name"] for col in response_data["columns"]]
    column_widths = [
        max(
            len(name),
            max(
                (
                    len(str(row[i]) if row[i] is not None else "None")
                    for row in response_data["values"]
                ),
                default=0,
            ),
        )
        for i, name in enumerate(column_names)
    ]
    row_format = " | ".join(["{:<" + str(width) + "}" for width in column_widths])
    print(row_format.format(*column_names))
    print("-" * sum(column_widths) + "-" * (len(column_widths) - 1) * 3)
    for row in response_data["values"]:
        # Convert None values in the row to "None" before formatting
        formatted_row = [(str(cell) if cell is not None else "None") for cell in row]
        print(row_format.format(*formatted_row))

# Your first ES|QL query

In [12]:
esql_query = "FROM sample_data"

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message              
------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2
2023-10-23T13:33:34.937Z | 172.21.0.5   | None         | None              | 1232382        | None           | Disconnected         
2023-10-23T13:51:54.732Z | 172.21.3.15  | None         | None              | 725448         | None           | Connection error     
2023-10-23T13:52:55.015Z | 172.21.3.15  | None         | None              | 8268153        | None           | Connection error     
2023-10-23T13:53:55.832Z | 172.21.3.15  | None         | None        

# Processing commands

In [14]:
esql_query = """
FROM sample_data
| LIMIT 3
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip | client_ip.keyword | event.duration | event_duration | message              
---------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None      | None              | 3450233        | None           | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | None      | None              | 2764889        | None           | Connected to 10.1.0.2
2023-10-23T13:33:34.937Z | 172.21.0.5   | None      | None              | 1232382        | None           | Disconnected         


# Sort a table

In [15]:
esql_query = """
FROM sample_data
| SORT @timestamp DESC
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message              
------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T13:55:01.543Z | None         | 172.21.3.15  | 172.21.3.15       | None           | 1756467        | Connected to 10.1.0.1
2023-10-23T13:55:01.543Z | 172.21.3.15  | None         | None              | 1756467        | None           | Connected to 10.1.0.1
2023-10-23T13:53:55.832Z | None         | 172.21.3.15  | 172.21.3.15       | None           | 5033755        | Connection error     
2023-10-23T13:53:55.832Z | 172.21.3.15  | None         | None              | 5033755        | None           | Connection error     
2023-10-23T13:52:55.015Z | None         | 172.21.3.15  | 172.21.3.15       | None           | 8268153        | Connection error     
2023-10-23T13:52:55.015Z | 172.21.3.15  | None         | None        

# Query the data

In [17]:
esql_query = """
FROM sample_data
| WHERE event_duration > 5000000
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip | client_ip   | client_ip.keyword | event.duration | event_duration | message         
---------------------------------------------------------------------------------------------------------------------------
2023-10-23T13:52:55.015Z | None      | 172.21.3.15 | 172.21.3.15       | None           | 8268153        | Connection error
2023-10-23T13:53:55.832Z | None      | 172.21.3.15 | 172.21.3.15       | None           | 5033755        | Connection error


## WHERE supports several operators.
## For example, you can use LIKE to run a wildcard query against the message column:

In [18]:
esql_query = """
FROM sample_data
| WHERE message LIKE "Connected*"
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message              
------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2
2023-10-23T13:55:01.543Z | 172.21.3.15  | None         | None              | 1756467        | None           | Connected to 10.1.0.1
2023-10-23T12:15:03.360Z | None         | 172.21.2.162 | 172.21.2.162      | None           | 3450233        | Connected to 10.1.0.3
2023-10-23T12:27:28.948Z | None         | 172.21.2.113 | 172.21.2.113      | None           | 2764889        | Connected to 10.1.0.2
2023-10-23T13:55:01.543Z | None         | 172.21.3.15  | 172.21.3.15 

# Chain processing commands

In [19]:
esql_query = """
FROM sample_data
| SORT @timestamp DESC
| LIMIT 3
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip   | client_ip   | client_ip.keyword | event.duration | event_duration | message              
----------------------------------------------------------------------------------------------------------------------------------
2023-10-23T13:55:01.543Z | None        | 172.21.3.15 | 172.21.3.15       | None           | 1756467        | Connected to 10.1.0.1
2023-10-23T13:55:01.543Z | 172.21.3.15 | None        | None              | 1756467        | None           | Connected to 10.1.0.1
2023-10-23T13:53:55.832Z | 172.21.3.15 | None        | None              | 5033755        | None           | Connection error     


# Compute values

In [20]:
esql_query = """
FROM sample_data
| EVAL duration_ms = event_duration/1000000.0
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message               | duration_ms
--------------------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3 | None       
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2 | None       
2023-10-23T13:33:34.937Z | 172.21.0.5   | None         | None              | 1232382        | None           | Disconnected          | None       
2023-10-23T13:51:54.732Z | 172.21.3.15  | None         | None              | 725448         | None           | Connection error      | None       
2023-10-23T13:52:55.015Z | 172.21.3.15  | None         | None              | 8268153        | None           | Connect

## EVAL supports several functions. For example, to round a number to the closest number with the specified number of digits, use the ROUND function:

In [21]:
esql_query = """
FROM sample_data
| EVAL duration_ms = ROUND(event_duration/1000000.0, 1)
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message               | duration_ms
--------------------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3 | None       
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2 | None       
2023-10-23T13:33:34.937Z | 172.21.0.5   | None         | None              | 1232382        | None           | Disconnected          | None       
2023-10-23T13:51:54.732Z | 172.21.3.15  | None         | None              | 725448         | None           | Connection error      | None       
2023-10-23T13:52:55.015Z | 172.21.3.15  | None         | None              | 8268153        | None           | Connect

# Calculate statistics

In [22]:
esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration)
"""

response = es.esql.query(query=esql_query)
format_response(response)

median_duration
---------------
2764889.0      


## You can calculate multiple stats with one command:

In [24]:
esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration), max_duration = MAX(event_duration)
"""

response = es.esql.query(query=esql_query)
format_response(response)

median_duration | max_duration
------------------------------
2764889.0       | 8268153     


## Use BY to group calculated stats by one or more columns. For example, to calculate the median duration per client IP:

In [26]:
esql_query = """
FROM sample_data
| STATS median_duration = MEDIAN(event_duration) BY client_ip
"""

response = es.esql.query(query=esql_query)
format_response(response)

median_duration | client_ip   
------------------------------
None            | None        
1232382.0       | 172.21.0.5  
2764889.0       | 172.21.2.113
3450233.0       | 172.21.2.162
3395111.0       | 172.21.3.15 


# Access columns

In [27]:
esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
"""

response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message               | event_duration/1000000.0
---------------------------------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3 | None                    
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2 | None                    
2023-10-23T13:33:34.937Z | 172.21.0.5   | None         | None              | 1232382        | None           | Disconnected          | None                    
2023-10-23T13:51:54.732Z | 172.21.3.15  | None         | None              | 725448         | None           | Connection error      | None                    
2023-10-23T13:52:55.015Z | 172.21.3.15  

## In this query, EVAL adds a new column named event_duration/1000000.0. Because its name contains special characters, to access this column, quote it with backticks:

In [28]:
esql_query = """
FROM sample_data
| EVAL event_duration/1000000.0
| STATS MEDIAN(`event_duration/1000000.0`)
"""
response = es.esql.query(query=esql_query)
format_response(response)

MEDIAN(`event_duration/1000000.0`)
----------------------------------
2.764889                          


# Create a histogram

In [29]:
esql_query = """
FROM sample_data
| KEEP @timestamp
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
"""
response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | bucket                  
---------------------------------------------------
2023-10-23T12:15:03.360Z | 2023-10-23T12:00:00.000Z
2023-10-23T12:27:28.948Z | 2023-10-23T12:00:00.000Z
2023-10-23T13:33:34.937Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:51:54.732Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:52:55.015Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:53:55.832Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:55:01.543Z | 2023-10-23T13:00:00.000Z
2023-10-23T12:15:03.360Z | 2023-10-23T12:00:00.000Z
2023-10-23T12:27:28.948Z | 2023-10-23T12:00:00.000Z
2023-10-23T13:33:34.937Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:51:54.732Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:52:55.015Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:53:55.832Z | 2023-10-23T13:00:00.000Z
2023-10-23T13:55:01.543Z | 2023-10-23T13:00:00.000Z


## Combine AUTO_BUCKET with STATS ... BY to create a histogram. For example, to count the number of events per hour:

In [30]:
esql_query = """
FROM sample_data
| KEEP @timestamp, event_duration
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
| STATS COUNT(*) BY bucket
"""
response = es.esql.query(query=esql_query)
format_response(response)

COUNT(*) | bucket                  
-----------------------------------
4        | 2023-10-23T12:00:00.000Z
10       | 2023-10-23T13:00:00.000Z


## Or the median duration per hour:

In [31]:
esql_query = """
FROM sample_data
| KEEP @timestamp, event_duration
| EVAL bucket = AUTO_BUCKET (@timestamp, 24, "2023-10-23T00:00:00Z", "2023-10-23T23:59:59Z")
| STATS median_duration = MEDIAN(event_duration) BY bucket
"""
response = es.esql.query(query=esql_query)
format_response(response)

median_duration | bucket                  
------------------------------------------
3107561.0       | 2023-10-23T12:00:00.000Z
1756467.0       | 2023-10-23T13:00:00.000Z


# Enrich data

In [46]:
# Define the mapping
mapping = {
    "mappings": {
        "properties": {"client_ip": {"type": "keyword"}, "env": {"type": "keyword"}}
    }
}

# Create the index with the mapping
es.indices.create(index="clientips", body=mapping)

# Prepare bulk data
bulk_data = [
    {"index": {}},
    {"client_ip": "172.21.0.5", "env": "Development"},
    {"index": {}},
    {"client_ip": "172.21.2.113", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.2.162", "env": "QA"},
    {"index": {}},
    {"client_ip": "172.21.3.15", "env": "Production"},
    {"index": {}},
    {"client_ip": "172.21.3.16", "env": "Production"},
]

# Bulk index the data
es.bulk(index="clientips", body=bulk_data)

# Define the enrich policy
policy = {
    "match": {
        "indices": "clientips",
        "match_field": "client_ip",
        "enrich_fields": ["env"],
    }
}

# Put the enrich policy
es.enrich.put_policy(name="clientip_policy", body=policy)

# Execute the enrich policy without waiting for completion
es.enrich.execute_policy(name="clientip_policy", wait_for_completion=True)
     

ObjectApiResponse({'status': {'phase': 'COMPLETE'}})

In [47]:
esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
"""
response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | event_duration | client_ip    | env        
----------------------------------------------------------------------
2023-10-23T12:15:03.360Z | None           | None         | None       
2023-10-23T12:27:28.948Z | None           | None         | None       
2023-10-23T13:33:34.937Z | None           | None         | None       
2023-10-23T13:51:54.732Z | None           | None         | None       
2023-10-23T13:52:55.015Z | None           | None         | None       
2023-10-23T13:53:55.832Z | None           | None         | None       
2023-10-23T13:55:01.543Z | None           | None         | None       
2023-10-23T12:15:03.360Z | 3450233        | 172.21.2.162 | QA         
2023-10-23T12:27:28.948Z | 2764889        | 172.21.2.113 | QA         
2023-10-23T13:33:34.937Z | 1232382        | 172.21.0.5   | Development
2023-10-23T13:51:54.732Z | 725448         | 172.21.3.15  | Production 
2023-10-23T13:52:55.015Z | 8268153        | 172.21.3.15  | Production 
2023-1

## You can use the new env column thatâ€™s added by the ENRICH command in subsequent commands. For example, to calculate the median duration per environment:

In [48]:
esql_query = """
FROM sample_data
| KEEP @timestamp, client_ip, event_duration
| EVAL client_ip = TO_STRING(client_ip)
| ENRICH clientip_policy ON client_ip WITH env
| STATS median_duration = MEDIAN(event_duration) BY env
"""
response = es.esql.query(query=esql_query)
format_response(response)

median_duration | env        
-----------------------------
None            | None       
3107561.0       | QA         
1232382.0       | Development
3395111.0       | Production 


# Process data

In [50]:
esql_query = """
FROM sample_data
| DISSECT message "Connected to %{server_ip}"
"""
response = es.esql.query(query=esql_query)
format_response(response)

@timestamp               | client.ip    | client_ip    | client_ip.keyword | event.duration | event_duration | message               | server_ip
------------------------------------------------------------------------------------------------------------------------------------------------
2023-10-23T12:15:03.360Z | 172.21.2.162 | None         | None              | 3450233        | None           | Connected to 10.1.0.3 | 10.1.0.3 
2023-10-23T12:27:28.948Z | 172.21.2.113 | None         | None              | 2764889        | None           | Connected to 10.1.0.2 | 10.1.0.2 
2023-10-23T13:33:34.937Z | 172.21.0.5   | None         | None              | 1232382        | None           | Disconnected          | None     
2023-10-23T13:51:54.732Z | 172.21.3.15  | None         | None              | 725448         | None           | Connection error      | None     
2023-10-23T13:52:55.015Z | 172.21.3.15  | None         | None              | 8268153        | None           | Connection error   

In [51]:
esql_query = """
FROM sample_data
| WHERE STARTS_WITH(message, "Connected to")
| DISSECT message "Connected to %{server_ip}"
| STATS COUNT(*) BY server_ip
"""
response = es.esql.query(query=esql_query)
format_response(response)

COUNT(*) | server_ip
--------------------
2        | 10.1.0.3 
2        | 10.1.0.2 
2        | 10.1.0.1 
