Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a LastLogAcitivity check #99

Merged
merged 2 commits into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 51 additions & 0 deletions doc/source/available_checks.rst
Expand Up @@ -216,6 +216,57 @@ Requirements

- `requests`_

.. _check-last-log-activity:

LastLogActivity
***************

.. program:: check-last-log-activity

Parses a log file and uses the most recent time contained in the file to determine activity.
For this purpose, the log file lines are iterated from the back until a line matching a configurable regular expression is found.
This expression is used to extract the contained timestamp in that log line, which is then compared to the current time with an allowed delta.
The check only looks at the first line from the back that contains a timestamp.
Further lines are ignored.
A typical use case for this check would be a web server access log file.

This check supports all date formats that are supported by the `dateutil parser <https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse>`_.

Options
=======

.. option:: log_file

path to the log file that should be analyzed

.. option:: pattern

A regular expression used to determine whether a line of the log file contains a timestamp to look at.
The expression must contain exactly one matching group.
For instance, ``^\[(.*)\] .*$`` might be used to find dates in square brackets at line beginnings.

.. option:: minutes

The number of minutes to allow log file timestamps to be in the past for detecting activity.
If a timestamp is older than ``<now> - <minutes>`` no activity is detected.
default: 10

.. option:: encoding

The encoding with which to parse the log file. default: ascii

.. option:: timezone

The timezone to assume in case a timestamp extracted from the log file has not associated timezone information.
Timezones are expressed using the names from the Olson timezone database (e.g. ``Europe/Berlin``).
default: ``UTC``

Requirements
============

* `dateutil`_
* `pytz`_

.. _check-load:

Load
Expand Down
1 change: 1 addition & 0 deletions doc/source/changelog.rst
Expand Up @@ -15,6 +15,7 @@ New activity checks
-------------------

* :ref:`check-jsonpath`: Similar to the existing :ref`check-xpath`, the new checks requests a JSON URL and evaluates it against a `JSONPath`_ expression to determine activity (:issue:`81`).
* :ref:`check-last-log-activity`: Check log files for most recent contained timestamps (:issue:`98`, :issue:`99`).

Fixed bugs
==========
Expand Down
1 change: 1 addition & 0 deletions doc/source/conf.py
Expand Up @@ -68,6 +68,7 @@
.. _portalocker: https://portalocker.readthedocs.io
.. _jsonpath-ng: https://github.com/h2non/jsonpath-ng
.. _JSONPath: https://goessner.net/articles/JsonPath/
.. _pytz: https://pythonhosted.org/pytz/

.. |project| replace:: {project}
.. |project_bold| replace:: **{project}**
Expand Down
100 changes: 100 additions & 0 deletions src/autosuspend/checks/activity.py
Expand Up @@ -780,3 +780,103 @@ def check(self) -> Optional[str]:
return None
except (json.JSONDecodeError, requests.exceptions.RequestException) as error:
raise TemporaryCheckError(error) from error


class LastLogActivity(Activity):
@classmethod
def create(cls, name: str, config: configparser.SectionProxy) -> "LastLogActivity":
import pytz

try:
return cls(
name,
Path(config["log_file"]),
re.compile(config["pattern"]),
timedelta(minutes=config.getint("minutes", fallback=10)),
config.get("encoding", "ascii"),
pytz.timezone(config.get("timezone", "UTC")), # type: ignore
)
except KeyError as error:
raise ConfigurationError(
"Missing config key {}".format(error),
) from error
except re.error as error:
raise ConfigurationError(
"Regular expression is invalid: {}".format(error),
) from error
except ValueError as error:
raise ConfigurationError(
"Unable to parse configuration: {}".format(error),
) from error

def __init__(
self,
name: str,
log_file: Path,
pattern: Pattern,
delta: timedelta,
encoding: str,
default_timezone: timezone,
) -> None:
if delta.total_seconds() < 0:
raise ValueError("Given delta must be positive")
if pattern.groups != 1:
raise ValueError("Given pattern must have exactly one capture group")
super().__init__(name=name)
self.log_file = log_file
self.pattern = pattern
self.delta = delta
self.encoding = encoding
self.default_timezone = default_timezone

def _safe_parse_date(self, match: str, now: datetime) -> datetime:
from dateutil.parser import parse
from dateutil.utils import default_tzinfo

try:
match_date = default_tzinfo(parse(match), self.default_timezone)
if match_date > now:
raise TemporaryCheckError(
"Detected date {} is in the future".format(match_date)
)
return match_date
except ValueError as error:
raise TemporaryCheckError(
"Detected date {} cannot be parsed as a date".format(match)
) from error
except OverflowError as error:
raise TemporaryCheckError(
"Detected date {} is out of the valid range".format(match)
) from error

def _file_lines_reversed(self) -> Iterable[str]:
try:
# Probably not the most effective solution for large log files. Might need
# optimizations later on.
return reversed(
self.log_file.read_text(encoding=self.encoding).splitlines()
)
except IOError as error:
raise TemporaryCheckError(
"Cannot access log file {}".format(self.log_file)
) from error

def check(self) -> Optional[str]:
lines = self._file_lines_reversed()

now = datetime.now(tz=timezone.utc)
for line in lines:
match = self.pattern.match(line)
if not match:
continue

match_date = self._safe_parse_date(match.group(1), now)

# Only check the first line (reverse order) that has a match, not all
if (now - match_date) < self.delta:
return "Log activity in {} at {}".format(self.log_file, match_date)
else:
return None

# No line matched at all
return None