From 18373d1086d186974b3071b36f299e100beb7fb3 Mon Sep 17 00:00:00 2001 From: Djcarrillo6 Date: Thu, 16 Nov 2023 17:59:25 -0800 Subject: [PATCH] Changes from PR review Signed-off-by: Djcarrillo6 Fixed import formatting in sample code for gudie. Signed-off-by: Djcarrillo6 Fixed nox formatting of log collection sample module. Signed-off-by: Djcarrillo6 Added types to log_collection_sample.py Signed-off-by: Djcarrillo6 Added type ignore to StramHandler class Signed-off-by: Djcarrillo6 Added formatting change Signed-off-by: Djcarrillo6 --- CHANGELOG.md | 2 +- guides/log_collection.md | 122 +++++++++++++---------- samples/logging/log_collection_sample.py | 39 +++++--- 3 files changed, 96 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 059d3021..ee76d493 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added guide on the document lifecycle API(s) ([#559](https://github.com/opensearch-project/opensearch-py/pull/559)) - Added Windows CI ([#569](https://github.com/opensearch-project/opensearch-py/pull/569)) - Added `client.http` JSON REST request API helpers ([#544](https://github.com/opensearch-project/opensearch-py/pull/544)) -- Added guide on using a Python logging integration with OpenSearch logs ([#579](https://github.com/opensearch-project/opensearch-py/pull/579)) +- Added guide on using a custom Python logging integration with OpenSearch logs ([#579](https://github.com/opensearch-project/opensearch-py/pull/579)) ### Changed - Generate `tasks` client from API specs ([#508](https://github.com/opensearch-project/opensearch-py/pull/508)) - Generate `ingest` client from API specs ([#513](https://github.com/opensearch-project/opensearch-py/pull/513)) diff --git a/guides/log_collection.md b/guides/log_collection.md index 25029886..af83f467 100644 --- a/guides/log_collection.md +++ b/guides/log_collection.md @@ -23,75 +23,82 @@ from opensearchpy import OpenSearch from logging.handlers import QueueHandler, QueueListener ``` -# Setup Connection with OpenSearch Cluster -Let's create a client instance: +# Setup Connection with OpenSearch +Run the following commands to install the docker image: +``` +docker pull opensearchproject/opensearch:latest +``` + +Create a client instance: ```python -opensearch_client = OpenSearch( - "https://admin:admin@localhost:9200", - use_ssl=True, - verify_certs=False, - ssl_show_warn=False, - http_auth=("admin", "admin") +client = OpenSearch( + hosts=['https://@localhost:9200'], + use_ssl=True, + verify_certs=False, + http_auth=('admin', 'admin') ) ``` # Initialize Logger -Now, let's initialize a logger named "OpenSearchLogs" for OpenSearch and set the log level to INFO: +Set the OpenSearch logger level top INFO: ```python # Initialize a logger named "OpenSearchLogs" for OpenSearch & set log level to INFO print("Initializing logger...") os_logger = logging.getLogger("OpenSearchLogs") os_logger.setLevel(logging.INFO) + +# Create a console handler +console_handler = logging.StreamHandler() +console_handler.setLevel(logging.INFO) + +# Add console handler to the logger +os_logger.addHandler(console_handler) ``` -# Define Custom Handler for OpenSearch -Next, let's define a custom handler that logs to OpenSearch: +# Custom Handler For Logs +Define a custom handler that logs to OpenSearch: ```python -# Define a custom handler that logs to OpenSearch class OpenSearchHandler(logging.Handler): - # Initializer / Instance attributes - def __init__(self, opensearch_client): - logging.Handler.__init__(self) - self.os_client = opensearch_client - - # Build index name (e.g., "logs-YYYY-MM-DD") - def _build_index_name(self): - return f"logs-{datetime.date(datetime.now())}" - - # Emit logs to the OpenSearch cluster - def emit(self, record): - document = { - "timestamp": datetime.fromtimestamp(record.created).isoformat(), - "name": record.name, - "level": record.levelname, - "message": record.getMessage(), - "source": { - "file": record.pathname, - "line": record.lineno, - "function": record.funcName - }, - "process": { - "id": record.process, - "name": record.processName - }, - "thread": { - "id": record.thread, - "name": record.threadName - } - } - - # Write the log entry to OpenSearch, handle exceptions - try: - self.os_client.index(index="movies", id=1, body={'title': 'Beauty and the Beast', 'year': 1991}) - except Exception as e: - print(f"Failed to send log to OpenSearch: {e}") + # Initializer / Instance attributes + def __init__(self, opensearch_client): + logging.Handler.__init__(self) + self.os_client = opensearch_client + + # Build index name (e.g., "logs-YYYY-MM-DD") + def _build_index_name(self): + return f"logs-{datetime.date(datetime.now())}" + + # Emit logs to the OpenSearch cluster + def emit(self, record): + document = { + "timestamp": datetime.fromtimestamp(record.created).isoformat(), + "name": record.name, + "level": record.levelname, + "message": record.getMessage(), + "source": { + "file": record.pathname, + "line": record.lineno, + "function": record.funcName, + }, + "process": {"id": record.process, "name": record.processName}, + "thread": {"id": record.thread, "name": record.threadName}, + } + + # Write the log entry to OpenSearch, handle exceptions + try: + self.os_client.index( + index=self._build_index_name(), + body=document, + ) + except Exception as e: + print(f"Failed to send log to OpenSearch: {e}") ``` # Create OpenSearch Handler and Add to Logger -Now, let's create an instance of OpenSearchHandler and add it to the logger: +Create an instance of OpenSearchHandler and add it to the logger: ```python print("Creating an instance of OpenSearchHandler and adding it to the logger...") @@ -128,4 +135,19 @@ print("Log Collection Guide has completed running") ``` # Sample Code -See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide. \ No newline at end of file +See [log_collection_sample.py](/samples/logging/log_collection_sample.py) for a working sample of the concepts in this guide. This Python script is a guide for setting up and running a custom log collection system using the OpenSearch service. The script will create a logger named "OpenSearchLogs" and set the log level to INFO. It will then create an instance of OpenSearchHandler and add it to the logger. Finally, it will setup asynchronous logging using Queues and send a test log to the OpenSearch cluster. + +Exptected Output From Running [log_collection_sample.py](/samples/logging/log_collection_sample.py): +```python +""" + Running Log Collection Guide + Setting up connection with OpenSearch cluster... + Initializing logger... + Creating an instance of OpenSearchHandler and adding it to the logger... + Setting up asynchronous logging using Queues... + Logger is set up and listener has started. Sending a test log... + This is a test log message + Cleaning up... + Log Collection Guide has completed running +""" +``` \ No newline at end of file diff --git a/samples/logging/log_collection_sample.py b/samples/logging/log_collection_sample.py index 4c9149ae..0303eede 100644 --- a/samples/logging/log_collection_sample.py +++ b/samples/logging/log_collection_sample.py @@ -10,11 +10,13 @@ # Modifications Copyright OpenSearch Contributors. See # GitHub history for details. -from datetime import datetime import logging import queue -from opensearchpy import OpenSearch +from datetime import datetime from logging.handlers import QueueHandler, QueueListener +from typing import Any + +from opensearchpy import OpenSearch # For cleaner output, comment in the two lines below to disable warnings and informational messages # import urllib3 @@ -24,9 +26,13 @@ def run_log_collection_guide() -> None: print("Running Log Collection Guide") + # Create a console handler + console_handler: logging.StreamHandler = logging.StreamHandler() # type: ignore + console_handler.setLevel(logging.INFO) + # Setup connection with the OpenSearch cluster print("Setting up connection with OpenSearch cluster...") - opensearch_client = OpenSearch( + opensearch_client: Any = OpenSearch( "https://admin:admin@localhost:9200", use_ssl=True, verify_certs=False, @@ -36,22 +42,25 @@ def run_log_collection_guide() -> None: # Initialize a logger named "OpenSearchLogs" for OpenSearch print("Initializing logger...") - os_logger = logging.getLogger("OpenSearchLogs") + os_logger: logging.Logger = logging.getLogger("OpenSearchLogs") os_logger.setLevel(logging.INFO) + # Add console handler to the logger + os_logger.addHandler(console_handler) + # Define a custom handler that logs to OpenSearch class OpenSearchHandler(logging.Handler): # Initializer / Instance attributes - def __init__(self, opensearch_client): - logging.Handler.__init__(self) + def __init__(self, opensearch_client: Any) -> None: + super().__init__() self.os_client = opensearch_client # Build index name (e.g., "logs-YYYY-MM-DD") - def _build_index_name(self): + def _build_index_name(self) -> str: return f"logs-{datetime.date(datetime.now())}" # Emit logs to the OpenSearch cluster - def emit(self, record): + def emit(self, record: logging.LogRecord) -> None: document = { "timestamp": datetime.fromtimestamp(record.created).isoformat(), "name": record.name, @@ -66,26 +75,24 @@ def emit(self, record): "thread": {"id": record.thread, "name": record.threadName}, } - # Write the log entry to OpenSearch, handle exceptions try: self.os_client.index( - index="movies", - id=1, - body={"title": "Beauty and the Beast", "year": 1991}, + index=self._build_index_name(), + body=document, ) except Exception as e: print(f"Failed to send log to OpenSearch: {e}") print("Creating an instance of OpenSearchHandler and adding it to the logger...") # Create an instance of OpenSearchHandler and add it to the logger - os_handler = OpenSearchHandler(opensearch_client) + os_handler: OpenSearchHandler = OpenSearchHandler(opensearch_client) os_logger.addHandler(os_handler) print("Setting up asynchronous logging using Queues...") # Setup asynchronous logging using Queues - log_queue = queue.Queue(-1) # no limit on size - os_queue_handler = QueueHandler(log_queue) - os_queue_listener = QueueListener(log_queue, os_handler) + log_queue: queue.Queue[logging.LogRecord] = queue.Queue(-1) # no limit on size + os_queue_handler: logging.Handler = QueueHandler(log_queue) + os_queue_listener: QueueListener = QueueListener(log_queue, os_handler) # Add queue handler to the logger os_logger.addHandler(os_queue_handler)