Skip to content

Commit

Permalink
Changes from PR review
Browse files Browse the repository at this point in the history
Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>

Fixed import formatting in sample code for gudie.

Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>

Fixed nox formatting of log collection sample module.

Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>

Added types to log_collection_sample.py

Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>

Added type ignore to StramHandler class

Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>

Added formatting change

Signed-off-by: Djcarrillo6 <djcarrillo6@yahoo.com>
  • Loading branch information
Djcarrillo6 committed Nov 19, 2023
1 parent cc91e93 commit 18373d1
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 67 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added guide on the document lifecycle API(s) ([#559](https://github.com/opensearch-project/opensearch-py/pull/559))
- Added Windows CI ([#569](https://github.com/opensearch-project/opensearch-py/pull/569))
- Added `client.http` JSON REST request API helpers ([#544](https://github.com/opensearch-project/opensearch-py/pull/544))
- Added guide on using a Python logging integration with OpenSearch logs ([#579](https://github.com/opensearch-project/opensearch-py/pull/579))
- Added guide on using a custom Python logging integration with OpenSearch logs ([#579](https://github.com/opensearch-project/opensearch-py/pull/579))
### Changed
- Generate `tasks` client from API specs ([#508](https://github.com/opensearch-project/opensearch-py/pull/508))
- Generate `ingest` client from API specs ([#513](https://github.com/opensearch-project/opensearch-py/pull/513))
Expand Down
122 changes: 72 additions & 50 deletions guides/log_collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,75 +23,82 @@ from opensearchpy import OpenSearch
from logging.handlers import QueueHandler, QueueListener
```

# Setup Connection with OpenSearch Cluster
Let's create a client instance:
# Setup Connection with OpenSearch

Run the following commands to install the docker image:
```
docker pull opensearchproject/opensearch:latest
```

Create a client instance:
```python
opensearch_client = OpenSearch(
"https://admin:admin@localhost:9200",
use_ssl=True,
verify_certs=False,
ssl_show_warn=False,
http_auth=("admin", "admin")
client = OpenSearch(
hosts=['https://@localhost:9200'],
use_ssl=True,
verify_certs=False,
http_auth=('admin', 'admin')
)
```

# Initialize Logger
Now, let's initialize a logger named "OpenSearchLogs" for OpenSearch and set the log level to INFO:
Set the OpenSearch logger level top INFO:

```python
# Initialize a logger named "OpenSearchLogs" for OpenSearch & set log level to INFO
print("Initializing logger...")
os_logger = logging.getLogger("OpenSearchLogs")
os_logger.setLevel(logging.INFO)

# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# Add console handler to the logger
os_logger.addHandler(console_handler)
```

# Define Custom Handler for OpenSearch
Next, let's define a custom handler that logs to OpenSearch:
# Custom Handler For Logs
Define a custom handler that logs to OpenSearch:

```python
# Define a custom handler that logs to OpenSearch
class OpenSearchHandler(logging.Handler):
# Initializer / Instance attributes
def __init__(self, opensearch_client):
logging.Handler.__init__(self)
self.os_client = opensearch_client

# Build index name (e.g., "logs-YYYY-MM-DD")
def _build_index_name(self):
return f"logs-{datetime.date(datetime.now())}"

# Emit logs to the OpenSearch cluster
def emit(self, record):
document = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"source": {
"file": record.pathname,
"line": record.lineno,
"function": record.funcName
},
"process": {
"id": record.process,
"name": record.processName
},
"thread": {
"id": record.thread,
"name": record.threadName
}
}

# Write the log entry to OpenSearch, handle exceptions
try:
self.os_client.index(index="movies", id=1, body={'title': 'Beauty and the Beast', 'year': 1991})
except Exception as e:
print(f"Failed to send log to OpenSearch: {e}")
# Initializer / Instance attributes
def __init__(self, opensearch_client):
logging.Handler.__init__(self)
self.os_client = opensearch_client

# Build index name (e.g., "logs-YYYY-MM-DD")
def _build_index_name(self):
return f"logs-{datetime.date(datetime.now())}"

# Emit logs to the OpenSearch cluster
def emit(self, record):
document = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"name": record.name,
"level": record.levelname,
"message": record.getMessage(),
"source": {
"file": record.pathname,
"line": record.lineno,
"function": record.funcName,
},
"process": {"id": record.process, "name": record.processName},
"thread": {"id": record.thread, "name": record.threadName},
}

# Write the log entry to OpenSearch, handle exceptions
try:
self.os_client.index(
index=self._build_index_name(),
body=document,
)
except Exception as e:
print(f"Failed to send log to OpenSearch: {e}")
```

# Create OpenSearch Handler and Add to Logger
Now, let's create an instance of OpenSearchHandler and add it to the logger:
Create an instance of OpenSearchHandler and add it to the logger:

```python
print("Creating an instance of OpenSearchHandler and adding it to the logger...")
Expand Down Expand Up @@ -128,4 +135,19 @@ print("Log Collection Guide has completed running")
```

# Sample Code
See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide.
See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide. This Python script is a guide for setting up and running a custom log collection system using the OpenSearch service. The script will create a logger named "OpenSearchLogs" and set the log level to INFO. It will then create an instance of OpenSearchHandler and add it to the logger. Finally, it will setup asynchronous logging using Queues and send a test log to the OpenSearch cluster.

Exptected Output From Running [log_collection_sample.py](/samples/logging/log_collection_sample.py):
```python
"""
Running Log Collection Guide
Setting up connection with OpenSearch cluster...
Initializing logger...
Creating an instance of OpenSearchHandler and adding it to the logger...
Setting up asynchronous logging using Queues...
Logger is set up and listener has started. Sending a test log...
This is a test log message
Cleaning up...
Log Collection Guide has completed running
"""
```
39 changes: 23 additions & 16 deletions samples/logging/log_collection_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from datetime import datetime
import logging
import queue
from opensearchpy import OpenSearch
from datetime import datetime
from logging.handlers import QueueHandler, QueueListener
from typing import Any

from opensearchpy import OpenSearch

# For cleaner output, comment in the two lines below to disable warnings and informational messages
# import urllib3
Expand All @@ -24,9 +26,13 @@
def run_log_collection_guide() -> None:
print("Running Log Collection Guide")

# Create a console handler
console_handler: logging.StreamHandler = logging.StreamHandler() # type: ignore
console_handler.setLevel(logging.INFO)

# Setup connection with the OpenSearch cluster
print("Setting up connection with OpenSearch cluster...")
opensearch_client = OpenSearch(
opensearch_client: Any = OpenSearch(
"https://admin:admin@localhost:9200",
use_ssl=True,
verify_certs=False,
Expand All @@ -36,22 +42,25 @@ def run_log_collection_guide() -> None:

# Initialize a logger named "OpenSearchLogs" for OpenSearch
print("Initializing logger...")
os_logger = logging.getLogger("OpenSearchLogs")
os_logger: logging.Logger = logging.getLogger("OpenSearchLogs")
os_logger.setLevel(logging.INFO)

# Add console handler to the logger
os_logger.addHandler(console_handler)

# Define a custom handler that logs to OpenSearch
class OpenSearchHandler(logging.Handler):
# Initializer / Instance attributes
def __init__(self, opensearch_client):
logging.Handler.__init__(self)
def __init__(self, opensearch_client: Any) -> None:
super().__init__()
self.os_client = opensearch_client

# Build index name (e.g., "logs-YYYY-MM-DD")
def _build_index_name(self):
def _build_index_name(self) -> str:
return f"logs-{datetime.date(datetime.now())}"

# Emit logs to the OpenSearch cluster
def emit(self, record):
def emit(self, record: logging.LogRecord) -> None:
document = {
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"name": record.name,
Expand All @@ -66,26 +75,24 @@ def emit(self, record):
"thread": {"id": record.thread, "name": record.threadName},
}

# Write the log entry to OpenSearch, handle exceptions
try:
self.os_client.index(
index="movies",
id=1,
body={"title": "Beauty and the Beast", "year": 1991},
index=self._build_index_name(),
body=document,
)
except Exception as e:
print(f"Failed to send log to OpenSearch: {e}")

print("Creating an instance of OpenSearchHandler and adding it to the logger...")
# Create an instance of OpenSearchHandler and add it to the logger
os_handler = OpenSearchHandler(opensearch_client)
os_handler: OpenSearchHandler = OpenSearchHandler(opensearch_client)
os_logger.addHandler(os_handler)

print("Setting up asynchronous logging using Queues...")
# Setup asynchronous logging using Queues
log_queue = queue.Queue(-1) # no limit on size
os_queue_handler = QueueHandler(log_queue)
os_queue_listener = QueueListener(log_queue, os_handler)
log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1) # no limit on size
os_queue_handler: logging.Handler = QueueHandler(log_queue)
os_queue_listener: QueueListener = QueueListener(log_queue, os_handler)

# Add queue handler to the logger
os_logger.addHandler(os_queue_handler)
Expand Down

0 comments on commit 18373d1

Please sign in to comment.