From 1d5c858623b4f82c45531a8fda5fa640c31735ab Mon Sep 17 00:00:00 2001 From: Meet Patel Date: Wed, 19 Nov 2025 16:02:57 +0530 Subject: [PATCH 1/4] Added logger and its test cases. Also added dist_utils which serves as utility code when dealing with distributed training. Signed-off-by: meetkuma --- .../finetune/experimental/core/logger.py | 156 ++++++++++++ .../experimental/core/utils/dist_utils.py | 26 ++ .../experimental/tests/test_logger.py | 234 ++++++++++++++++++ 3 files changed, 416 insertions(+) create mode 100644 QEfficient/finetune/experimental/core/logger.py create mode 100644 QEfficient/finetune/experimental/tests/test_logger.py diff --git a/QEfficient/finetune/experimental/core/logger.py b/QEfficient/finetune/experimental/core/logger.py new file mode 100644 index 000000000..cb894e879 --- /dev/null +++ b/QEfficient/finetune/experimental/core/logger.py @@ -0,0 +1,156 @@ +# ----------------------------------------------------------------------------- +# +# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries. +# SPDX-License-Identifier: BSD-3-Clause +# +# ----------------------------------------------------------------------------- + + +import logging +import sys +from pathlib import Path +from typing import Optional +from transformers.utils.logging import get_logger as hf_get_logger + +from .utils.dist_utils import get_rank + + +class Logger: + """Custom logger with console and file logging capabilities.""" + + def __init__( + self, + name: str = "transformers", # We are using "transformers" as default to align with HF logs + log_file: Optional[str] = None, + level: int = logging.INFO, + ): + """ + Initialize the logger. + + Args: + name: Logger name + log_file: Path to log file (if None, log only to console) + level: Logging level + """ + self.logger = hf_get_logger(name) + self.logger.setLevel(level) + + # Clear any existing handlers + self.logger.handlers.clear() + + # Create formatter + self.formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(level) + console_handler.setFormatter(self.formatter) + self.logger.addHandler(console_handler) + + # File handler (if log_file is provided) + if log_file: + # Create directory if it doesn't exist + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + file_handler.setFormatter(self.formatter) + self.logger.addHandler(file_handler) + + def debug(self, message: str) -> None: + """Log debug message.""" + self.logger.debug(message) + + def info(self, message: str) -> None: + """Log info message.""" + self.logger.info(message) + + def warning(self, message: str) -> None: + """Log warning message.""" + self.logger.warning(message) + + def error(self, message: str) -> None: + """Log error message.""" + self.logger.error(message) + + def critical(self, message: str) -> None: + """Log critical message.""" + self.logger.critical(message) + + def log_rank_zero(self, message: str, level: int = logging.INFO) -> None: + """ + Log message only on rank 0 process. + + Args: + message: Message to log + level: Logging level + """ + if get_rank() == 0: + self.logger.log(level, message) + + def log_exception(self, message: str, exception: Exception, raise_exception: bool = True) -> None: + """ + Log exception message and optionally raise the exception. + + Args: + message: Custom message to log + exception: Exception to log + raise_exception: Whether to raise the exception after logging + """ + error_message = f"{message}: {str(exception)}" + self.logger.error(error_message) + + if raise_exception: + raise exception + + def prepare_for_logs(self, output_dir: str, save_metrics: bool = True, log_level: str = "INFO") -> None: + """ + Prepare logger for training logs. + + Args: + output_dir: Output directory for logs + save_metrics: Whether to save metrics to file + log_level: Logging level as string + """ + # Convert string log level to logging constant + level = getattr(logging, log_level.upper(), logging.INFO) + self.logger.setLevel(level) + + # Update existing handlers' levels + for handler in self.logger.handlers: + handler.setLevel(level) + + # Add file handler if saving metrics + if save_metrics: + log_file = Path(output_dir) / "training.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Check if file handler already exists + file_handler_exists = any(isinstance(handler, logging.FileHandler) for handler in self.logger.handlers) + + if not file_handler_exists: + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + file_handler.setFormatter(self.formatter) + self.logger.addHandler(file_handler) + + +# Global logger instance +_logger: Optional[Logger] = None + + +def get_logger(log_file: Optional[str] = None) -> Logger: + """ + Get or create a logger instance. + + Args: + log_file: Path to log file (if None, log only to console) + + Returns: + Logger instance + """ + global _logger + if _logger is None: + _logger = Logger(log_file=log_file) + return _logger diff --git a/QEfficient/finetune/experimental/core/utils/dist_utils.py b/QEfficient/finetune/experimental/core/utils/dist_utils.py index d647b73a6..b9ec0adf5 100644 --- a/QEfficient/finetune/experimental/core/utils/dist_utils.py +++ b/QEfficient/finetune/experimental/core/utils/dist_utils.py @@ -4,3 +4,29 @@ # SPDX-License-Identifier: BSD-3-Clause # # ----------------------------------------------------------------------------- + +import torch.distributed as dist + + +def is_dist_available_and_initialized() -> bool: + """Check if distributed training is available and initialized.""" + return dist.is_available() and dist.is_initialized() + + +def get_rank() -> int: + """Get the rank of the current process in distributed training.""" + if not is_dist_available_and_initialized(): + return 0 + return dist.get_rank() + + +def get_world_size() -> int: + """Get the total number of processes in distributed training.""" + if not is_dist_available_and_initialized(): + return 1 + return dist.get_world_size() + + +def is_main_process() -> bool: + """Check if the current process is the main process (rank 0).""" + return get_rank() == 0 diff --git a/QEfficient/finetune/experimental/tests/test_logger.py b/QEfficient/finetune/experimental/tests/test_logger.py new file mode 100644 index 000000000..2282cded7 --- /dev/null +++ b/QEfficient/finetune/experimental/tests/test_logger.py @@ -0,0 +1,234 @@ +# ----------------------------------------------------------------------------- +# +# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries. +# SPDX-License-Identifier: BSD-3-Clause +# +# ----------------------------------------------------------------------------- + +import logging +from unittest.mock import patch + +import pytest + +from QEfficient.finetune.experimental.core.logger import Logger, get_logger + + +class TestLogger: + def setup_method(self): + """Reset the global logger before each test method""" + import QEfficient.finetune.experimental.core.logger as logger_module + + logger_module._logger = None + + def test_init_console_only(self): + """Test logger initialization with console-only output""" + logger = Logger("test_logger") + + # Check logger attributes + assert logger.logger.name == "test_logger" + assert logger.logger.level == logging.INFO + + # Check handlers - should have console handler only + assert len(logger.logger.handlers) == 1 # Only console handler + assert isinstance(logger.logger.handlers[0], logging.StreamHandler) + + def test_init_with_file(self, tmp_path): + """Test logger initialization with file output""" + log_file = tmp_path / "test.log" + logger = Logger("file_test_logger", str(log_file)) + + # Check handlers - should have both console and file handlers + assert len(logger.logger.handlers) == 2 # Console + file handler + assert isinstance(logger.logger.handlers[0], logging.StreamHandler) + assert isinstance(logger.logger.handlers[1], logging.FileHandler) + + # Check file creation + assert log_file.exists() + + def test_log_levels(self, caplog): + """Test all log levels work correctly""" + logger = Logger("level_test_logger", level=logging.DEBUG) + + with caplog.at_level(logging.DEBUG): + logger.debug("Debug message") + logger.info("Info message") + logger.warning("Warning message") + logger.error("Error message") + logger.critical("Critical message") + + # Check all messages were logged + assert "Debug message" in caplog.text + assert "Info message" in caplog.text + assert "Warning message" in caplog.text + assert "Error message" in caplog.text + assert "Critical message" in caplog.text + + @patch("QEfficient.finetune.experimental.core.logger.get_rank") + def test_log_rank_zero(self, mock_get_rank, caplog): + """Test rank zero logging functionality""" + mock_get_rank.return_value = 0 + logger = Logger("rank_test_logger") + + with caplog.at_level(logging.INFO): + logger.log_rank_zero("Rank zero message") + + assert "Rank zero message" in caplog.text + + @patch("QEfficient.finetune.experimental.core.logger.get_rank") + def test_log_rank_zero_not_zero(self, mock_get_rank, caplog): + """Test that non-rank zero messages are not logged""" + mock_get_rank.return_value = 1 + logger = Logger("rank_test_logger") + + with caplog.at_level(logging.INFO): + logger.log_rank_zero("Should not appear") + + assert "Should not appear" not in caplog.text + + def test_log_exception_raise(self, caplog): + """Test exception logging with raising""" + logger = Logger("exception_test_logger") + + with pytest.raises(ValueError) as exc_info, caplog.at_level(logging.ERROR): + logger.log_exception("Custom error", ValueError("Test exception"), raise_exception=True) + + # The actual logged message is "Custom error: Test exception" + # But the exception itself contains just "Test exception" + assert "Custom error: Test exception" in caplog.text + + def test_log_exception_no_raise(self, caplog): + """Test exception logging without raising""" + logger = Logger("exception_test_logger") + + with caplog.at_level(logging.ERROR): + logger.log_exception("Custom error", ValueError("Test exception"), raise_exception=False) + + # Check that the formatted message was logged + assert "Custom error: Test exception" in caplog.text + + def test_prepare_for_logs(self, tmp_path): + """Test preparing logger for training logs""" + output_dir = tmp_path / "output" + logger = Logger("prepare_test_logger") + + # Prepare for logs + logger.prepare_for_logs(str(output_dir), save_metrics=True, log_level="DEBUG") + + # Check file handler was added + file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] + assert len(file_handlers) == 1 + + # Check file exists + log_file = output_dir / "training.log" + assert log_file.exists() + + # Check log level was updated + assert logger.logger.level == logging.DEBUG + + def test_prepare_for_logs_no_metrics(self, tmp_path): + """Test preparing logger without saving metrics""" + output_dir = tmp_path / "output" + logger = Logger("prepare_test_logger") + + # Prepare for logs without saving metrics + logger.prepare_for_logs(str(output_dir), save_metrics=False, log_level="INFO") + + # Check no file handler was added + file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] + assert len(file_handlers) == 0 + + def test_prepare_for_logs_already_has_file_handler(self, tmp_path): + """Test preparing logger when file handler already exists""" + output_dir = tmp_path / "output" + logger = Logger("prepare_test_logger") + + # Add a file handler manually first + log_file = output_dir / "manual.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler(str(log_file)) + logger.logger.addHandler(file_handler) + + # Prepare for logs again + logger.prepare_for_logs(str(output_dir), save_metrics=True, log_level="INFO") + + # Should still have only one file handler + file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] + assert len(file_handlers) == 1 + + def test_get_logger_singleton(self): + """Test that get_logger returns the same instance""" + logger1 = get_logger() + logger2 = get_logger() + + assert logger1 is logger2 + + def test_get_logger_with_file(self, tmp_path): + """Test get_logger with file parameter""" + log_file = tmp_path / "get_logger_test.log" + logger = get_logger(str(log_file)) + + # Check that we have 2 handlers (console + file) + assert len(logger.logger.handlers) == 2 # Console + file + assert isinstance(logger.logger.handlers[1], logging.FileHandler) + + # Check file exists + assert log_file.exists() + + +class TestLoggerIntegration: + """Integration tests for logger functionality""" + + def setup_method(self): + """Reset the global logger before each test method""" + import QEfficient.finetune.experimental.core.logger as logger_module + + logger_module._logger = None + + def test_complete_workflow(self, tmp_path, caplog): + """Test complete logger workflow""" + # Setup + log_file = tmp_path / "workflow.log" + logger = Logger("workflow_test", str(log_file), logging.DEBUG) + + # Test all methods + logger.debug("Debug test") + logger.info("Info test") + logger.warning("Warning test") + logger.error("Error test") + logger.critical("Critical test") + + # Test exception handling + try: + raise ValueError("Test exception") + except ValueError as e: + logger.log_exception("Caught exception", e, raise_exception=False) + + # Test rank zero logging + with patch("QEfficient.finetune.experimental.core.logger.get_rank") as mock_rank: + mock_rank.return_value = 0 + logger.log_rank_zero("Rank zero test") + + # Verify all messages were logged + with caplog.at_level(logging.DEBUG): + assert "Debug test" in caplog.text + assert "Info test" in caplog.text + assert "Warning test" in caplog.text + assert "Error test" in caplog.text + assert "Critical test" in caplog.text + assert "Caught exception: Test exception" in caplog.text + assert "Rank zero test" in caplog.text + + # Check file was written to + assert log_file.exists() + content = log_file.read_text() + assert "Debug test" in content + assert "Info test" in content + assert "Warning test" in content + assert "Error test" in content + assert "Critical test" in content + assert "Caught exception: Test exception" in content + assert "Rank zero test" in content + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From 0fb3f0307a3a24193a8861f57deeb5f7327b68c3 Mon Sep 17 00:00:00 2001 From: Meet Patel Date: Thu, 27 Nov 2025 15:17:37 +0530 Subject: [PATCH 2/4] Addressed comments and made some cleanup in tests. Signed-off-by: meetkuma --- .../finetune/experimental/core/logger.py | 26 +++++++++++++----- .../experimental/core/utils/dist_utils.py | 9 ++++++- .../experimental/tests/test_logger.py | 27 +++++++++---------- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/QEfficient/finetune/experimental/core/logger.py b/QEfficient/finetune/experimental/core/logger.py index cb894e879..64efac2d7 100644 --- a/QEfficient/finetune/experimental/core/logger.py +++ b/QEfficient/finetune/experimental/core/logger.py @@ -12,7 +12,21 @@ from typing import Optional from transformers.utils.logging import get_logger as hf_get_logger -from .utils.dist_utils import get_rank +from .utils.dist_utils import get_local_rank + + +# ----------------------------------------------------------------------------- +# Logger usage: +# Initialize logger: +# logger = Logger("my_logger", log_file="logs/output.log", level=logging.DEBUG) +# Log messages: +# logger.info("This is an info message") +# logger.error("This is an error message") +# logger.log_rank_zero("This message is logged only on rank 0") +# logger.log_exception("An error occurred", exception, raise_exception=False) +# Attach file handler later if needed: +# logger.prepare_for_logs(output_dir="logs", log_level="DEBUG") +# ----------------------------------------------------------------------------- class Logger: @@ -86,7 +100,7 @@ def log_rank_zero(self, message: str, level: int = logging.INFO) -> None: message: Message to log level: Logging level """ - if get_rank() == 0: + if get_local_rank() == 0: self.logger.log(level, message) def log_exception(self, message: str, exception: Exception, raise_exception: bool = True) -> None: @@ -104,13 +118,13 @@ def log_exception(self, message: str, exception: Exception, raise_exception: boo if raise_exception: raise exception - def prepare_for_logs(self, output_dir: str, save_metrics: bool = True, log_level: str = "INFO") -> None: + def prepare_for_logs(self, output_dir: Optional[str] = None, log_level: str = "INFO") -> None: """ - Prepare logger for training logs. + Prepare existing logger to log to both console and file with specified + output directory and log level. Args: output_dir: Output directory for logs - save_metrics: Whether to save metrics to file log_level: Logging level as string """ # Convert string log level to logging constant @@ -122,7 +136,7 @@ def prepare_for_logs(self, output_dir: str, save_metrics: bool = True, log_level handler.setLevel(level) # Add file handler if saving metrics - if save_metrics: + if output_dir: log_file = Path(output_dir) / "training.log" log_file.parent.mkdir(parents=True, exist_ok=True) diff --git a/QEfficient/finetune/experimental/core/utils/dist_utils.py b/QEfficient/finetune/experimental/core/utils/dist_utils.py index b9ec0adf5..aed88862d 100644 --- a/QEfficient/finetune/experimental/core/utils/dist_utils.py +++ b/QEfficient/finetune/experimental/core/utils/dist_utils.py @@ -14,12 +14,19 @@ def is_dist_available_and_initialized() -> bool: def get_rank() -> int: - """Get the rank of the current process in distributed training.""" + """Return the global rank of the current process, else 0.""" if not is_dist_available_and_initialized(): return 0 return dist.get_rank() +def get_local_rank() -> int: + """Return the local rank of the current process on its node, else 0.""" + if not is_dist_available_and_initialized(): + return 0 + return dist.get_node_local_rank() + + def get_world_size() -> int: """Get the total number of processes in distributed training.""" if not is_dist_available_and_initialized(): diff --git a/QEfficient/finetune/experimental/tests/test_logger.py b/QEfficient/finetune/experimental/tests/test_logger.py index 2282cded7..a9dc86c8a 100644 --- a/QEfficient/finetune/experimental/tests/test_logger.py +++ b/QEfficient/finetune/experimental/tests/test_logger.py @@ -63,10 +63,10 @@ def test_log_levels(self, caplog): assert "Error message" in caplog.text assert "Critical message" in caplog.text - @patch("QEfficient.finetune.experimental.core.logger.get_rank") - def test_log_rank_zero(self, mock_get_rank, caplog): + @patch("QEfficient.finetune.experimental.core.logger.get_local_rank") + def test_log_rank_zero_positive_case(self, mock_get_local_rank, caplog): """Test rank zero logging functionality""" - mock_get_rank.return_value = 0 + mock_get_local_rank.return_value = 0 logger = Logger("rank_test_logger") with caplog.at_level(logging.INFO): @@ -74,10 +74,10 @@ def test_log_rank_zero(self, mock_get_rank, caplog): assert "Rank zero message" in caplog.text - @patch("QEfficient.finetune.experimental.core.logger.get_rank") - def test_log_rank_zero_not_zero(self, mock_get_rank, caplog): - """Test that non-rank zero messages are not logged""" - mock_get_rank.return_value = 1 + @patch("QEfficient.finetune.experimental.core.logger.get_local_rank") + def test_log_rank_zero_negative_case(self, mock_get_local_rank, caplog): + """Test to verify that only rank‑zero messages are logged""" + mock_get_local_rank.return_value = 1 logger = Logger("rank_test_logger") with caplog.at_level(logging.INFO): @@ -112,7 +112,7 @@ def test_prepare_for_logs(self, tmp_path): logger = Logger("prepare_test_logger") # Prepare for logs - logger.prepare_for_logs(str(output_dir), save_metrics=True, log_level="DEBUG") + logger.prepare_for_logs(str(output_dir), log_level="DEBUG") # Check file handler was added file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] @@ -125,13 +125,12 @@ def test_prepare_for_logs(self, tmp_path): # Check log level was updated assert logger.logger.level == logging.DEBUG - def test_prepare_for_logs_no_metrics(self, tmp_path): - """Test preparing logger without saving metrics""" - output_dir = tmp_path / "output" + def test_prepare_for_logs_no_file_handler(self): + """Test preparing logger without saving to file""" logger = Logger("prepare_test_logger") # Prepare for logs without saving metrics - logger.prepare_for_logs(str(output_dir), save_metrics=False, log_level="INFO") + logger.prepare_for_logs(log_level="INFO") # Check no file handler was added file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] @@ -149,7 +148,7 @@ def test_prepare_for_logs_already_has_file_handler(self, tmp_path): logger.logger.addHandler(file_handler) # Prepare for logs again - logger.prepare_for_logs(str(output_dir), save_metrics=True, log_level="INFO") + logger.prepare_for_logs(str(output_dir), log_level="INFO") # Should still have only one file handler file_handlers = [h for h in logger.logger.handlers if isinstance(h, logging.FileHandler)] @@ -204,7 +203,7 @@ def test_complete_workflow(self, tmp_path, caplog): logger.log_exception("Caught exception", e, raise_exception=False) # Test rank zero logging - with patch("QEfficient.finetune.experimental.core.logger.get_rank") as mock_rank: + with patch("QEfficient.finetune.experimental.core.logger.get_local_rank") as mock_rank: mock_rank.return_value = 0 logger.log_rank_zero("Rank zero test") From 90366a8b7ae8316cec37866f737e70e00dca4b10 Mon Sep 17 00:00:00 2001 From: Meet Patel Date: Thu, 27 Nov 2025 15:22:03 +0530 Subject: [PATCH 3/4] Fixed lint errors. Signed-off-by: meetkuma --- QEfficient/finetune/experimental/core/logger.py | 2 +- QEfficient/finetune/experimental/tests/test_logger.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/QEfficient/finetune/experimental/core/logger.py b/QEfficient/finetune/experimental/core/logger.py index 64efac2d7..cc7c57df0 100644 --- a/QEfficient/finetune/experimental/core/logger.py +++ b/QEfficient/finetune/experimental/core/logger.py @@ -12,7 +12,7 @@ from typing import Optional from transformers.utils.logging import get_logger as hf_get_logger -from .utils.dist_utils import get_local_rank +from QEfficient.finetune.experimental.core.utils.dist_utils import get_local_rank # ----------------------------------------------------------------------------- diff --git a/QEfficient/finetune/experimental/tests/test_logger.py b/QEfficient/finetune/experimental/tests/test_logger.py index a9dc86c8a..0af0c8b51 100644 --- a/QEfficient/finetune/experimental/tests/test_logger.py +++ b/QEfficient/finetune/experimental/tests/test_logger.py @@ -89,7 +89,7 @@ def test_log_exception_raise(self, caplog): """Test exception logging with raising""" logger = Logger("exception_test_logger") - with pytest.raises(ValueError) as exc_info, caplog.at_level(logging.ERROR): + with pytest.raises(ValueError), caplog.at_level(logging.ERROR): logger.log_exception("Custom error", ValueError("Test exception"), raise_exception=True) # The actual logged message is "Custom error: Test exception" From 251f89c15402f9d2d5f6701385a36043f30bd6a1 Mon Sep 17 00:00:00 2001 From: Meet Patel Date: Fri, 28 Nov 2025 17:04:48 +0530 Subject: [PATCH 4/4] Fixed ruff errors. Signed-off-by: meetkuma --- QEfficient/finetune/experimental/core/logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/QEfficient/finetune/experimental/core/logger.py b/QEfficient/finetune/experimental/core/logger.py index cc7c57df0..a1b9c771f 100644 --- a/QEfficient/finetune/experimental/core/logger.py +++ b/QEfficient/finetune/experimental/core/logger.py @@ -10,11 +10,11 @@ import sys from pathlib import Path from typing import Optional + from transformers.utils.logging import get_logger as hf_get_logger from QEfficient.finetune.experimental.core.utils.dist_utils import get_local_rank - # ----------------------------------------------------------------------------- # Logger usage: # Initialize logger: