From 7c27b4ae7a0fccdec873c06be4c7d1e236e2ecd4 Mon Sep 17 00:00:00 2001 From: Rafael JP Damaceno Date: Mon, 4 May 2026 19:08:07 -0300 Subject: [PATCH 1/2] Route log parsing tasks by collection size --- core/libs/chkcsv.py | 749 ------------------ log_manager/tasks.py | 433 ++++++---- log_manager/tests.py | 31 + ...collectionlogdirectory_translator_class.py | 19 + log_manager_config/models.py | 6 +- metrics/services/daily_payloads.py | 4 +- metrics/tasks/parse.py | 21 +- metrics/tests/test_tasks.py | 17 + reports/tasks.py | 2 +- 9 files changed, 359 insertions(+), 923 deletions(-) delete mode 100644 core/libs/chkcsv.py create mode 100644 log_manager_config/migrations/0005_alter_collectionlogdirectory_translator_class.py diff --git a/core/libs/chkcsv.py b/core/libs/chkcsv.py deleted file mode 100644 index 15b6231..0000000 --- a/core/libs/chkcsv.py +++ /dev/null @@ -1,749 +0,0 @@ -#! /usr/bin/python -# chkcsv.py -# -# PURPOSE: -# Check the contents of a CSV file, specifically that columns match a -# specified format. -# -# NOTES: -# 1. Column format specifications are stored in a configuration file -# with an INI-file format, where bracketed sections correspond to -# columns and each section contains key-value pairs of format specifications. -# 2. Recognized column specifications are: -# column_required=1|Yes|True|On|0|No|False|Off -# data_required=1|Yes|True|On|0|No|False|Off -# minlen= -# maxlen= -# type=integer|float|string|date|datetime|bool -# pattern= -# 3. Global options in the format specification file are not yet implemented, -# though a section name for them is reserved. -# -# COPYRIGHT: -# Copyright (c) 2011,2018 R.Dreas Nielsen (RDN) -# -# LICENSE: -# GPL v.3 -# This program is free software: you can redistribute it and/or -# modify it under the terms of the GNU General Public License as published -# by the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. This program is distributed in the -# hope that it will be useful, but WITHOUT ANY WARRANTY; without even -# the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR -# PURPOSE. See the GNU General Public License for more details. The GNU -# General Public License is available at http://www.gnu.org/licenses/. -# -# HISTORY: -# Date Remarks -# ---------- -------------------------------------------------------------- -# 2011-09-25 First version. Version 0.8.0.0. RDN. -# 2018-10-27 Converted to run under both Python 2 and 3. Version 1.0.0. RDN. -# 2019-01-02 Corrected handling of next() for csv library. Version 1.0.1. RDN. -# 2018-01-04 Added check for data rows with more columns than column headers. -# Version 1.1.0. RDN. -# ============================================================================ - -_version = "1.1.0" -_vdate = "2019-01-04" - -import sys -from optparse import OptionParser - -try: - # Py2 - from ConfigParser import SafeConfigParser as ConfigParser -except: - # Py3 - from configparser import ConfigParser - -import codecs -import csv -import datetime -import os.path -import re -import traceback -import types - -FORMATSPECS = """Format specification options: - column_required=1|Yes|True|On|0|No|False|Off - type=integer|float|string|date|datetime|bool - data_required=1|Yes|True|On|0|No|False|Off - minlen= - maxlen= - pattern= -""" - - -class ChkCsvError(Exception): - """Base class for chkcsv errors.""" - - def __init__(self, errmsg, infile=None, line=None, column=None): - self.errmsg = errmsg - self.infile = infile - self.line = line - self.column = column - - -class CsvChecker: - """Create an object to check a specific column of a defined type. - - :param fmt_spec: A ConfigParser object. - :param colname: The name of the data column. - :param column_required_default: A Boolean indicating whether the column is required by default. - :param data_required_default: A Boolean indicating whether data values are required (non-null) by default. - - After initialization, the 'check()' - method will return a boolean indicating whether a data value is acceptable. - """ - - get_fn = { - "column_required": ConfigParser.getboolean, - "data_required": ConfigParser.getboolean, - "type": ConfigParser.get, - "minlen": ConfigParser.getint, - "maxlen": ConfigParser.getint, - "pattern": ConfigParser.get, - } - datetime_fmts = ( - "%x", - "%c", - "%x %X", - "%m/%d/%Y", - "%m/%d/%y", - "%m/%d/%Y %H%M", - "%m/%d/%Y %I:%M %p", - "%m/%d/%y %H%M", - "%m/%d/%y %I:%M %p", - "%Y-%m-%d %H%M", - "%Y-%m-%d %I:%M %p", - "%Y-%m-%d", - "%Y/%m/%d %H%M", - "%Y/%m/%d %I:%M %p", - "%Y/%m/%d %X", - "%Y/%m/%d", - "%b %d, %Y", - "%b %d, %Y %X", - "%b %d, %Y %I:%M %p", - "%b %d %Y", - "%b %d %Y %X", - "%b %d %Y %I:%M %p", - "%d %b, %Y", - "%d %b, %Y %X", - "%d %b, %Y %I:%M %p", - "%d %b %Y", - "%d %b %Y %X", - "%d %b %Y %I:%M %p", - "%b. %d, %Y", - "%b. %d, %Y %X", - "%b. %d, %Y %I:%M %p", - "%b. %d %Y", - "%b. %d %Y %X", - "%b. %d %Y %I:%M %p", - "%d %b., %Y", - "%d %b., %Y %X", - "%d %b., %Y %I:%M %p", - "%d %b. %Y", - "%d %b. %Y %X", - "%d %b. %Y %I:%M %p", - "%Y", - "%b %Y", - "%b, %Y", - "%b. %Y", - "%b., %Y", - "%b-%Y", - "%b.-%Y", - "%B %d, %Y", - "%B %d, %Y %X", - "%B %d, %Y %I:%M %p", - "%B %d %Y", - "%B %d %Y %X", - "%B %d %Y %I:%M %p", - "%d %B, %Y", - "%d %B, %Y %X", - "%d %B, %Y %I:%M %p", - "%d %B %Y", - "%d %B %Y %X", - "%d %B %Y %I:%M %p", - "%B %Y", - "%B, %Y", - "%B-%Y", - ) - date_fmts = ( - "%x", - "%c", - "%x %X", - "%m/%d/%Y", - "%m/%d/%y", - "%Y-%m-%d", - "%Y/%m/%d", - "%b %d, %Y", - "%b %d %Y", - "%d %b, %Y", - "%d %b %Y", - "%b. %d, %Y", - "%b. %d %Y", - "%d %b., %Y", - "%d %b. %Y", - "%Y", - "%b %Y", - "%b, %Y", - "%b. %Y", - "%b., %Y", - "%b-%Y", - "%b.-%Y", - "%B %d, %Y", - "%B %d %Y", - "%d %B, %Y", - "%d %B %Y", - "%B %Y", - "%B, %Y", - "%B-%Y", - ) - - # Basic format checking functions. These return None if the data are acceptable, - # a textual description of the problem otherwise. - def chk_req(self, data): - return "Dado faltando" if len(data) == 0 else None - - def chk_min(self, data): - return ( - None - if (not self.data_required and len(data) == 0) or len(data) >= self.minlen - else "data too short" - ) - - def chk_max(self, data): - return None if len(data) <= self.maxlen else "data too long" - - def chk_pat(self, data): - return None if len(data) == 0 or self.rx.match(data) else "Padrão incompatível" - - def chk_int(self, data): - if len(data) == 0: - return None - try: - x = int(data) - return None - except ValueError: - return "Não é um inteiro" - - def chk_float(self, data): - if len(data) == 0: - return None - try: - x = float(data) - return None - except ValueError: - return "Não é um número com separado de casa decimal" - - def chk_bool(self, data): - if len(data) == 0: - return None - return ( - None - if data - in ( - "True", - "true", - "TRUE", - "T", - "t", - "Yes", - "yes", - "YES", - "Y", - "y", - "False", - "false", - "FALSE", - "F", - "f", - "No", - "no", - "NO", - "N", - "n", - True, - False, - ) - else "Padrão incompatível, tente ['yes', 'no', 'true', 'false', 'y', 'n']" - ) - - def chk_datetime(self, data): - if len(data) == 0: - return None - if type(data) == type(datetime.datetime.now()): - return None - if type(data) == type(datetime.date.today()): - return None - if type(data) != type(""): - if data == None: - return "missing date/time" - try: - data = str(data) - except ValueError: - return "can't convert data to string for date/time test" - for f in self.datetime_fmts: - try: - dt = datetime.datetime.strptime(data, f) - except: - continue - break - else: - return "invalid date/time" - return None - - def chk_date(self, data): - if len(data) == 0: - return None - if type(data) == type(datetime.date.today()): - return None - if type(data) != type(""): - if data == None: - return "missing date" - try: - data = str(data) - except ValueError: - return "can't convert data to string for date test" - for f in self.date_fmts: - try: - dt = datetime.datetime.strptime(data, f) - except: - continue - break - else: - return "invalid date" - return None - - def dispatch(self, check_funcs, data): - errlist = [f(data) for f in check_funcs] - return [e for e in errlist if e] - - def __init__( - self, fmt_spec, colname, column_required_default, data_required_default - ): - self.name = colname - self.data_required = data_required_default - # By default, all columns are required unless there is a specification indicating that it is not. - self.column_required = column_required_default - specs = fmt_spec.options(colname) - # Get the value for each option, using an appropriate function for each expected value type. - for spec in specs: - try: - specval = self.get_fn[spec](fmt_spec, colname, spec) - except KeyError: - raise ChkCsvError( - "Unrecognized format specification (%s)" % spec, column=colname - ) - setattr(self, spec, specval) - # Convert any pattern attribute to an rx attribute - if hasattr(self, "pattern"): - try: - self.rx = re.compile(self.pattern) - except: - raise ChkCsvError( - "Invalid regular expression pattern: %s" % self.pattern, - column=colname, - ) - # Create the check method - errfuncs = [] - if self.data_required: - errfuncs.append(self.chk_req) - if hasattr(self, "type"): - if self.type == "string": - if hasattr(self, "minlen"): - errfuncs.append(self.chk_min) - if hasattr(self, "maxlen"): - errfuncs.append(self.chk_max) - if hasattr(self, "pattern"): - errfuncs.append(self.chk_pat) - elif self.type == "integer": - errfuncs.append(self.chk_int) - elif self.type == "float": - errfuncs.append(self.chk_float) - elif self.type == "date": - errfuncs.append(self.chk_date) - if hasattr(self, "pattern"): - errfuncs.append(self.chk_pat) - elif self.type == "datetime": - errfuncs.append(self.chk_datetime) - if hasattr(self, "pattern"): - errfuncs.append(self.chk_pat) - else: - if hasattr(self, "minlen"): - errfuncs.append(self.chk_min) - if hasattr(self, "maxlen"): - errfuncs.append(self.chk_max) - if hasattr(self, "pattern"): - errfuncs.append(self.chk_pat) - self.check = lambda data: self.dispatch(errfuncs, data) - - -def clparser(): - usage_msg = """Usage: %prog [options] -Arguments: - CSV file name The name of a comma-separated-values file to check.""" - vers_msg = "%prog " + "%s %s" % (_version, _vdate) - desc_msg = "Checks the content and format of a CSV file." - parser = OptionParser(usage=usage_msg, version=vers_msg, description=desc_msg) - parser.add_option( - "-s", - "--showspecs", - action="store_true", - dest="showspecs", - default=False, - help="Show the format specifications allowed in the configuration file, and exit.", - ) - parser.add_option( - "-f", - "--formatspec", - action="store", - dest="formatspec", - type="string", - help="Name of the file with the format specification. The default is the name of the CSV file with an extension of fmt.", - ) - parser.add_option( - "-r", - "--required", - action="store_true", - dest="data_required", - default=False, - help="A data value is required in data columns for which the format specification does not include an explicit specification of whether data is required for a column. The default is false (i.e., data are not required).", - ) - parser.add_option( - "-q", - "--columnsnotrequired", - action="store_false", - dest="column_required", - default=True, - help="Columns listed in the format configuration file are not required to be present unless the column_required specification is explicitly set in the configuration file. The default is true (i.e., all columns in the configuration file are required in the CSV file).", - ) - parser.add_option( - "-c", - "--columnexit", - action="store_true", - dest="columnexit", - default=False, - help="Exit immediately if there are more columns in the CSV file header than are specified in the format configuration file.", - ) - parser.add_option( - "-l", - "--linelength", - action="store_false", - dest="linelength", - default=True, - help="Allow rows of the CSV file to have fewer columns than in the column headers. The default is to report an error for short data rows. If short data rows are allowed, any row without enough columns to match the format specification will still be reported as an error.", - ) - parser.add_option( - "-i", - "--case-insensitive", - action="store_true", - dest="caseinsensitive", - default=False, - help="Case-insensitive matching of column names in the format configuration file and the CSV file. The default is case-sensitive (i.e., column names must match exactly).", - ) - parser.add_option( - "-e", - "--encoding", - action="store", - type="string", - dest="encoding", - default=None, - help="Character encoding of the CSV file. It should be one of the strings listed at http://docs.python.org/library/codecs.html#standard-encodings.", - ) - parser.add_option( - "-o", - "--optsection", - action="store", - dest="optsection", - type="string", - help="An alternate name for the chkcsv options section in the format specification configuration file.", - ) - parser.add_option( - "-x", - "--exitonerror", - action="store_true", - dest="haltonerror", - default=False, - help="Exit when the first error is found.", - ) - return parser - - -class UTF8Recoder: - """Iterator that reads an encoded stream and reencodes the input to UTF-8.""" - - def __init__(self, f, encoding): - self.reader = codecs.getreader(encoding)(f) - - def __iter__(self): - return self - - def __next__(self): - if sys.version_info < (3,): - return next(self.reader).encode("utf-8") - else: - return next(self.reader) - - def next(self): - return self.__next__() - - -class UnicodeReader: - """A CSV reader which will iterate over lines in the CSV file "f", - which is encoded in the given encoding. - """ - - def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds): - uf = UTF8Recoder(f, encoding) - self.reader = csv.reader(uf, dialect=dialect, **kwds) - - def __iter__(self): - return self - - def next(self): - if sys.version_info < (3,): - row = self.reader.next() - else: - row = next(self.reader) - return [type("")(s, "utf-8") for s in row] - - def __next__(self): - if sys.version_info < (3,): - row = self.reader.next() - else: - row = next(self.reader) - return [type("")(s, "utf-8") for s in row] - - -def show_errors(errlist): - """Write a list of error messages to stderr. - - :param errlist: A tuple of a narrative message, the name of the file - in which the error occurred, the line number of the file, and the column - name of the file. All but the first may be null. - """ - for err in errlist: - sys.stderr.write( - "%s.\n" - % " ".join( - [ - "%s %s" % em - for em in [ - e - for e in zip(("Error:", "in file", "on line", "in column"), err) - if e[1] - ] - ] - ) - ) - - -def read_format_specs( - fmt_file, column_required, data_required, chkopts="chkcsvoptions" -): - """Read format specifications from a file. - - :param fmt_file: The name of the file containing format specifications. - :param column_required: Whether or not the column must be in the CSV file to be checked. - :param data_required: Whether or not a data value is required on every row of the CSV file. - :param chkopts: The name of a section in the format specification file containing additional options. - """ - fmtspecs = ConfigParser() - try: - files_read = fmtspecs.read([fmt_file]) - except configparser.Error: - raise ChkCsvError("Error reading format specification file.", fmt_file) - if len(files_read) == 0: - raise ChkCsvError("Error reading format specification file.", fmt_file) - # Convert ConfigParser object into a list of CsvChecker objects - speccols = [sect for sect in fmtspecs.sections() if sect != chkopts] - cols = {} - for col in speccols: - cols[col] = CsvChecker(fmtspecs, col, column_required, data_required) - return cols - - -def check_csv_file( - csv_fname, cols, halt_on_err, columnexit, linelength, caseinsensitive, encoding=None -): - """Check that all of the required columns and data are present in the CSV file, and that - the data conform to the appropriate type and other specifications. - - :param csv_fname: The name of the CSV file to check. - :param cols: A dictionary of specifications (CsvChecker objects) indexed by column name. - :param halt_on_err: Whether to exit on the first error. - :param columnexit: Whether to exit if the CSV file doesn't have exactly the same columns in the format specifications. - :param linelength: Whether to report an error if any data row has a different number of items than indicated by the column headers. - :param casesensitive: Whether column names in the specifications and CSV file should be compared case-insensitively. - :param encoding: The character encoding of the CSV file. - """ - errorlist = [] - dialect = csv.Sniffer().sniff(open(csv_fname, "rt").readline()) - encoding = "utf-8" if not encoding else encoding - if sys.version_info < (3,): - inf = UnicodeReader(open(csv_fname, "rt"), dialect, encoding) - else: - inf = csv.reader(open(csv_fname, mode="rt", encoding=encoding), dialect=dialect) - colnames = next(inf) - req_cols = [c for c in cols if cols[c].column_required] - # Exit if all required columns are not present - if caseinsensitive: - colnames_l = [c.lower() for c in colnames] - req_missing = [col for col in req_cols if not (col.lower() in colnames_l)] - else: - req_missing = [col for col in req_cols if not (col in colnames)] - if len(req_missing) > 0: - errorlist.append( - ( - "The following columns are required, but are not present in the CSV file: %s." - % ", ".join(req_missing), - csv_fname, - 1, - ) - ) - return errorlist - # Exit if there are extra columns and the option to exit is set. - if columnexit: - if caseinsensitive: - speccols_l = [c.lower() for c in cols] - extra = [col for col in colnames if not (col.lower() in speccols_l)] - else: - extra = [col for col in colnames if not (col in cols)] - if len(extra) > 0: - errorlist.append( - ( - "The following columns have no format specifications but are in the CSV file: %s." - % ", ".join(extra), - csv_fname, - 1, - ) - ) - return errorlist - # Column names common to specifications and data file. These will be used - # to index the cols dictionary to get the appropriate check method - # and to index the CSV column name list (colnames) to get the column position. - if caseinsensitive: - chkcols = {} - for x in cols: - for y in colnames: - if x.lower() == y.lower(): - chkcols[x] = y - else: - datacols = [col for col in cols if col in colnames] - chkcols = dict(zip(datacols, datacols)) - # Get maximum required column number (index) to check data rows - dataindex = [colnames.index(chkcols[col]) for col in chkcols] - maxindex = max(dataindex) if len(dataindex) > 0 else 0 # 0 if format file is empty - colloc = dict(zip([chkcols[c] for c in chkcols], dataindex)) - # Read and check the CSV file until done (or until an error). - row_no = 1 # Header is row 1. - for datarow in inf: - row_no += 1 - if (len(datarow) > 0) and (len(datarow) < len(colnames)) and linelength: - errorlist.append( - ("fewer data values than column headers", csv_fname, row_no) - ) - if halt_on_err: - return errorlist - if len(datarow) > len(colnames): - errorlist.append( - ("more data values than column headers", csv_fname, row_no) - ) - if halt_on_err: - return errorlist - if len(datarow) < maxindex + 1: - if len(datarow) > 0: - errorlist.append( - ( - "fewer data values than columns in the format specification", - csv_fname, - row_no, - ) - ) - if halt_on_err: - return errorlist - else: - for col in chkcols: - col_errs = cols[col].check(datarow[colloc[chkcols[col]]]) - if len(col_errs) > 0: - errorlist.extend( - [(e, csv_fname, row_no, cols[col].name) for e in col_errs] - ) - if halt_on_err: - return errorlist - return errorlist - - -def main(): - parser = clparser() - (opts, args) = parser.parse_args() - if opts.showspecs: - print(FORMATSPECS) - return 0 - if len(args) == 0: - parser.print_help() - return 0 - if len(args) != 1: - raise ChkCsvError( - "A single argument, the name of the CSV file to check, must be provided." - ) - csv_file = args[0] - if not os.path.exists(csv_file): - raise ChkCsvError("The specified CSV file does not exist.", csv_file) - if opts.formatspec: - fmt_file = opts.formatspec - else: - (fn, ext) = os.path.splitext(csv_file) - fmt_file = "%s.fmt" % fn - if not os.path.exists(fmt_file): - raise ChkCsvError("The format file does not exist.", fmt_file) - # Get format specifications as a list of ChkCsv objects from the configuration file. - if opts.optsection: - chkopts = opts.optsection - else: - chkopts = "chkcsvoptions" - cols = read_format_specs( - fmt_file, opts.column_required, opts.data_required, chkopts - ) - # Check the file - errorlist = check_csv_file( - csv_file, - cols, - opts.haltonerror, - opts.columnexit, - opts.linelength, - opts.caseinsensitive, - opts.encoding, - ) - if len(errorlist) > 0: - show_errors(errorlist) - return 1 - else: - return 0 - - -if __name__ == "__main__": - try: - status = main() - except ChkCsvError as msg: - show_errors([(msg.errmsg, msg.infile, msg.line, msg.column)]) - exit(1) - except SystemExit as x: - sys.exit(x) - except Exception: - strace = traceback.extract_tb(sys.exc_info()[2])[-1:] - lno = strace[0][1] - src = strace[0][3] - sys.stderr.write( - "%s: Uncaught exception %s (%s) on line %s (%s)." - % ( - os.path.basename(sys.argv[0]), - str(sys.exc_info()[0]), - sys.exc_info()[1], - lno, - src, - ) - ) - sys.exit(1) - sys.exit(status) diff --git a/log_manager/tasks.py b/log_manager/tasks.py index 10148b1..08da275 100644 --- a/log_manager/tasks.py +++ b/log_manager/tasks.py @@ -1,221 +1,328 @@ import logging import os +from celery import chord from django.conf import settings -from django.contrib.auth import get_user_model from django.utils.translation import gettext as _ +from collection.models import Collection +from config import celery_app from core.utils import date_utils from core.utils.request_utils import _get_user -from config import celery_app -from collection.models import Collection from log_manager_config import models as lmc_models +from metrics.services.resources import extract_celery_queue_name +from metrics.tasks import task_parse_logs -from . import ( - choices, - models, - utils, -) - +from . import choices, models, utils LOGFILE_STAT_RESULT_CTIME_INDEX = 9 -User = get_user_model() - -@celery_app.task(bind=True, name=_('[Log Pipeline] 1. Search Logs (Manual)'), queue='load') -def task_search_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None, trigger_validation=False): - """ - Task to search for log files in the directories defined in the CollectionLogDirectory model. - - Parameters: - collections (list, optional): List of collection acronyms. Defaults to []. - from_date (str, optional): The start date for log discovery in YYYY-MM-DD format. Defaults to None. - until_date (str, optional): The end date for log discovery in YYYY-MM-DD format. Defaults to None. - days_to_go_back (int, optional): The number of days to go back from today for log discovery. Defaults to None. - user_id (int, optional): The ID of the user initiating the task. Defaults to None. - username (str, optional): The username of the user initiating the task. Defaults to None. +@celery_app.task( + bind=True, name=_("[Log Pipeline] 1. Search Logs (Manual)"), queue="load" +) +def task_search_log_files( + self, + collections=None, + from_date=None, + until_date=None, + days_to_go_back=None, + user_id=None, + username=None, + trigger_validation=False, +): """ - user = _get_user(self.request, username=username, user_id=user_id) - - for col in collections or Collection.acron3_list(): - collection = Collection.objects.get(acron3=col) + Search for log files in configured collection directories. - col_configs_dirs = lmc_models.CollectionLogDirectory.objects.filter(config__collection__acron3=col, active=True) - if len(col_configs_dirs) == 0: - logging.error(f'No CollectionLogDirectory found for collection {col}.') - - supported_logfile_extensions = settings.SUPPORTED_LOGFILE_EXTENSIONS - if len(supported_logfile_extensions) == 0: - logging.error('No SupportedLogFile found. Please, add a SupportedLogFile for each of the supported log file formats.') + When trigger_validation=True, this starts the full Search -> Validate -> Parse + chain. Parse callbacks are routed by collection size. + """ + _get_user(self.request, username=username, user_id=user_id) + + from_date_str, until_date_str = date_utils.get_date_range_str( + from_date, until_date, days_to_go_back + ) + visible_dates = date_utils.get_date_objs_from_date_range( + from_date_str, until_date_str + ) + supported_extensions = settings.SUPPORTED_LOGFILE_EXTENSIONS + if not supported_extensions: + logging.error("No supported log file extensions configured.") + + for collection_code in collections or Collection.acron3_list(): + collection = Collection.objects.get(acron3=collection_code) + directories = lmc_models.CollectionLogDirectory.objects.filter( + config__collection__acron3=collection_code, + active=True, + ) + if not directories: + logging.error( + "No CollectionLogDirectory found for collection %s.", collection_code + ) - for cd in col_configs_dirs: - for root, _sub_dirs, files in os.walk(cd.path): + for directory in directories: + for root, _sub_dirs, files in os.walk(directory.path): for name in files: _name, extension = os.path.splitext(name) - if extension.lower() not in supported_logfile_extensions: + if extension.lower() not in supported_extensions: continue - visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back) - logging.debug(f'Visible dates: {visible_dates}') - - _add_log_file(collection, root, name, visible_dates) + file_path = os.path.join(root, name) + file_stat = os.stat(file_path) + file_ctime = date_utils.get_date_obj_from_timestamp( + file_stat.st_ctime + ) + + logging.debug( + "Checking file %s with ctime %s.", file_path, file_ctime + ) + if file_ctime in visible_dates: + models.LogFile.create_or_update( + collection=collection, + path=file_path, + stat_result=file_stat, + hash=utils.hash_file(file_path), + ) if trigger_validation: - task_validate_log_files.apply_async(kwargs={ - "collections": collections, - "from_date": from_date, - "until_date": until_date, - "days_to_go_back": days_to_go_back, - "user_id": user_id, - "username": username, - "trigger_parse": True - }) - - -def _get_visible_dates(from_date, until_date, days_to_go_back): - from_date_str, until_date_str = date_utils.get_date_range_str(from_date, until_date, days_to_go_back) - return date_utils.get_date_objs_from_date_range(from_date_str, until_date_str) - - -def _add_log_file(collection, root, name, visible_dates): - file_path = os.path.join(root, name) - file_ctime = date_utils.get_date_obj_from_timestamp(os.stat(file_path).st_ctime) - - logging.debug(f'Checking file {file_path} with ctime {file_ctime}.') - if file_ctime in visible_dates: - models.LogFile.create_or_update( - collection=collection, - path=file_path, - stat_result=os.stat(file_path), - hash=utils.hash_file(file_path), + task_validate_log_files.apply_async( + kwargs={ + "collections": collections, + "from_date": from_date, + "until_date": until_date, + "days_to_go_back": days_to_go_back, + "user_id": user_id, + "username": username, + "trigger_parse": True, + } ) -@celery_app.task(bind=True, name=_('[Log Pipeline] 2. Validate Logs (Manual)'), timelimit=-1, queue='load') -def task_validate_log_files(self, collections=[], from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None, ignore_date=False, trigger_parse=False, revalidate=False, status_list=None): - """ - Task to validate log files in the database. - - Parameters: - collections (list, optional): List of collection acronyms. Defaults to []. - from_date (str, optional): The start date for log discovery in YYYY-MM-DD format. Defaults to None. - until_date (str, optional): The end date for log discovery in YYYY-MM-DD format. Defaults to None. - days_to_go_back (int, optional): The number of days to go back from today for log discovery. Defaults to None. - user_id (int, optional): The ID of the user initiating the task. Defaults to None. - username (str, optional): The username of the user initiating the task. Defaults to None. - ignore_date (bool, optional): If True, ignore the date of the log file. Defaults to False. - revalidate (bool, optional): If True, also revalidate files in statuses from status_list. Defaults to False. - status_list (list, optional): List of status codes to revalidate when revalidate=True. Defaults to [QUE, INV, ERR]. +@celery_app.task( + bind=True, + name=_("[Log Pipeline] 2. Validate Logs (Manual)"), + timelimit=-1, + queue="load", +) +def task_validate_log_files( + self, + collections=None, + from_date=None, + until_date=None, + days_to_go_back=None, + user_id=None, + username=None, + ignore_date=False, + trigger_parse=False, + revalidate=False, + status_list=None, +): """ - cols = collections or Collection.acron3_list() - logging.info(f'Validating log files for collections: {cols}.') + Validate cataloged log files. - visible_dates = _get_visible_dates(from_date, until_date, days_to_go_back) + When trigger_parse=True, one parse orchestration task is enqueued per + collection and routed to the proper parse_ queue. + """ + collection_codes = collections or Collection.acron3_list() + logging.info("Validating log files for collections: %s.", collection_codes) + + from_date_str, until_date_str = date_utils.get_date_range_str( + from_date, until_date, days_to_go_back + ) + visible_dates = date_utils.get_date_objs_from_date_range( + from_date_str, until_date_str + ) if not ignore_date: if not visible_dates: logging.warning("No visible dates found for log validation.") return - logging.info(f'Interval: {visible_dates[0]} to {visible_dates[-1]}.') + logging.info("Interval: %s to %s.", visible_dates[0], visible_dates[-1]) status_filter = [choices.LOG_FILE_STATUS_CREATED] if revalidate: - status_filter += status_list or [choices.LOG_FILE_STATUS_QUEUED, choices.LOG_FILE_STATUS_INVALIDATED, choices.LOG_FILE_STATUS_ERROR] - - tasks = [] - for col in cols: - for log_file in models.LogFile.objects.filter(status__in=status_filter, collection__acron3=col): - file_ctime = date_utils.get_date_obj_from_timestamp(log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX]) - if file_ctime in visible_dates or ignore_date: - tasks.append(task_validate_log_file.s(log_file.hash, user_id, username)) - - if tasks: - if trigger_parse: - from celery import chord - from metrics.tasks import task_parse_logs - chord(tasks)(task_parse_logs.si( - collections=collections, - from_date=from_date, - until_date=until_date, - days_to_go_back=days_to_go_back, - user_id=user_id, - username=username, - )) - else: - for task in tasks: - task.apply_async() - elif trigger_parse: - from metrics.tasks import task_parse_logs - task_parse_logs.apply_async(kwargs={ - "collections": collections, - "from_date": from_date, - "until_date": until_date, - "days_to_go_back": days_to_go_back, - "user_id": user_id, - "username": username, - }) + status_filter += status_list or [ + choices.LOG_FILE_STATUS_QUEUED, + choices.LOG_FILE_STATUS_INVALIDATED, + choices.LOG_FILE_STATUS_ERROR, + ] + + tasks_by_collection = {} + for collection_code in collection_codes: + tasks_by_collection[collection_code] = [] + log_files = models.LogFile.objects.filter( + status__in=status_filter, + collection__acron3=collection_code, + ) + for log_file in log_files: + if not ignore_date: + file_ctime = date_utils.get_date_obj_from_timestamp( + log_file.stat_result[LOGFILE_STAT_RESULT_CTIME_INDEX] + ) + if file_ctime not in visible_dates: + continue + + tasks_by_collection[collection_code].append( + task_validate_log_file.s(log_file.hash, user_id, username) + ) + + if trigger_parse: + _enqueue_parse_after_validation( + tasks_by_collection=tasks_by_collection, + from_date=from_date, + until_date=until_date, + days_to_go_back=days_to_go_back, + user_id=user_id, + username=username, + ) + return + for collection_tasks in tasks_by_collection.values(): + for validation_task in collection_tasks: + validation_task.apply_async() -@celery_app.task(bind=True, name=_('[Log Pipeline] Validate Single Log File (Auto)'), timelimit=-1, queue='load') -def task_validate_log_file(self, log_file_hash, user_id=None, username=None): - """ - Task to validate a specific log file. - Parameters: - log_file_id (int): The ID of the log file to validate. - user_id (int, optional): The ID of the user initiating the task. Defaults to None. - username (str, optional): The username of the user initiating the task. Defaults to None. - """ - user = _get_user(self.request, username=username, user_id=user_id) +@celery_app.task( + bind=True, + name=_("[Log Pipeline] Validate Single Log File (Auto)"), + timelimit=-1, + queue="load", +) +def task_validate_log_file(self, log_file_hash, user_id=None, username=None): + """Validate a single LogFile and update its status.""" + _get_user(self.request, username=username, user_id=user_id) log_file = models.LogFile.objects.get(hash=log_file_hash) collection = log_file.collection.acron3 buffer_size, sample_size = _fetch_validation_parameters(collection) - logging.info(f'Validating log file {log_file.path}.') - val_result = utils.validate_file(path=log_file.path, buffer_size=buffer_size, sample_size=sample_size) - if 'datetimes' in val_result.get('content', {}).get('summary', {}): - del val_result['content']['summary']['datetimes'] - - if 'probably_date' in val_result: - if isinstance(val_result['probably_date'], dict): - logging.error(f"Error determining probably_date: {val_result['probably_date'].get('error')}") - val_result['probably_date'] = None - else: - try: - val_result['probably_date'] = date_utils.get_date_str(val_result['probably_date']) - except (ValueError, AttributeError) as e: - logging.error(f'Error serializing probably_date: {e}') - val_result['probably_date'] = None + logging.info("Validating log file %s.", log_file.path) + val_result = utils.validate_file( + path=log_file.path, buffer_size=buffer_size, sample_size=sample_size + ) + _clean_validation_result(val_result) log_file.validation = val_result - log_file.validation.update({'buffer_size': buffer_size, 'sample_size': sample_size}) + log_file.validation.update({"buffer_size": buffer_size, "sample_size": sample_size}) - if val_result.get('is_valid', {}).get('all', False): - log_file.date = val_result.get('probably_date') or None + if val_result.get("is_valid", {}).get("all", False): + log_file.date = val_result.get("probably_date") or None log_file.status = choices.LOG_FILE_STATUS_QUEUED - else: log_file.status = choices.LOG_FILE_STATUS_INVALIDATED - logging.info(f'Log file {log_file.path} ({log_file.collection.acron3}) has status {log_file.status}.') + logging.info( + "Log file %s (%s) has status %s.", + log_file.path, + log_file.collection.acron3, + log_file.status, + ) log_file.save() -def _fetch_validation_parameters(collection, default_buffer_size=0.1, default_sample_size=2048): - col_configs = lmc_models.LogManagerCollectionConfig.objects.filter(collection__acron3=collection).first() - if not col_configs: - logging.warning(f'No LogManagerCollectionConfig found for collection {collection}. Using default values.') - return default_buffer_size, default_sample_size - return col_configs.buffer_size, col_configs.sample_size - - -@celery_app.task(bind=True, name=_('[Log Pipeline] Daily Routine (Auto)'), queue='load') +@celery_app.task(bind=True, name=_("[Log Pipeline] Daily Routine (Auto)"), queue="load") def task_daily_log_ingestion_pipeline(self): """ - Facade task for the daily log ingestion pipeline. - It initiates the Search -> Validate -> Parse chain using default parameters. - No arguments are required, making it easy to schedule periodically. + Start the daily Search -> Validate -> Parse chain with default parameters. """ logging.info("Starting Daily Log Ingestion Pipeline") task_search_log_files.apply_async(kwargs={"trigger_validation": True}) + + +def _enqueue_parse_after_validation( + tasks_by_collection, from_date, until_date, days_to_go_back, user_id, username +): + for collection_code, validation_tasks in tasks_by_collection.items(): + if validation_tasks: + chord(validation_tasks)( + _build_parse_signature( + collection_code, + from_date, + until_date, + days_to_go_back, + user_id, + username, + ) + ) + else: + task_parse_logs.apply_async( + **_build_parse_apply_kwargs( + collection_code, + from_date, + until_date, + days_to_go_back, + user_id, + username, + ) + ) + + +def _build_parse_signature( + collection_code, from_date, until_date, days_to_go_back, user_id, username +): + apply_kwargs = _build_parse_apply_kwargs( + collection_code, + from_date, + until_date, + days_to_go_back, + user_id, + username, + ) + parse_callback = task_parse_logs.si(**apply_kwargs["kwargs"]) + if apply_kwargs.get("queue"): + parse_callback.set(queue=apply_kwargs["queue"]) + return parse_callback + + +def _build_parse_apply_kwargs( + collection_code, from_date, until_date, days_to_go_back, user_id, username +): + collections = [collection_code] + parse_queue = extract_celery_queue_name(collection_code) + apply_kwargs = { + "kwargs": { + "collections": collections, + "from_date": from_date, + "until_date": until_date, + "days_to_go_back": days_to_go_back, + "queue_name": parse_queue, + "user_id": user_id, + "username": username, + }, + "queue": parse_queue, + } + return apply_kwargs + + +def _fetch_validation_parameters( + collection, default_buffer_size=0.1, default_sample_size=2048 +): + col_configs = lmc_models.LogManagerCollectionConfig.objects.filter( + collection__acron3=collection + ).first() + if not col_configs: + logging.warning( + "No LogManagerCollectionConfig found for collection %s. Using default values.", + collection, + ) + return default_buffer_size, default_sample_size + return col_configs.buffer_size, col_configs.sample_size + + +def _clean_validation_result(val_result): + if "datetimes" in val_result.get("content", {}).get("summary", {}): + del val_result["content"]["summary"]["datetimes"] + + if "probably_date" not in val_result: + return + + probably_date = val_result["probably_date"] + if isinstance(probably_date, dict): + logging.error("Error determining probably_date: %s", probably_date.get("error")) + val_result["probably_date"] = None + return + + try: + val_result["probably_date"] = date_utils.get_date_str(probably_date) + except (ValueError, AttributeError) as exc: + logging.error("Error serializing probably_date: %s", exc) + val_result["probably_date"] = None diff --git a/log_manager/tests.py b/log_manager/tests.py index 51c1402..8832e25 100644 --- a/log_manager/tests.py +++ b/log_manager/tests.py @@ -56,3 +56,34 @@ def test_validate_log_files_returns_for_empty_visible_date_range(self): self.assertIsNone(result) mocked_signature.assert_not_called() + + def test_validate_log_files_routes_parse_callback_to_collection_parse_queue(self): + with patch("metrics.tasks.task_parse_logs.apply_async") as mocked_apply_async: + tasks.task_validate_log_files.run( + collections=["books"], + from_date="2024-02-01", + until_date="2024-02-02", + trigger_parse=True, + ) + + mocked_apply_async.assert_called_once() + self.assertEqual(mocked_apply_async.call_args.kwargs["queue"], "parse_small") + self.assertEqual( + mocked_apply_async.call_args.kwargs["kwargs"]["queue_name"], + "parse_small", + ) + + def test_validate_log_files_routes_each_collection_parse_to_its_queue(self): + with patch("metrics.tasks.task_parse_logs.apply_async") as mocked_apply_async: + tasks.task_validate_log_files.run( + collections=["books", "scl"], + from_date="2024-02-01", + until_date="2024-02-02", + trigger_parse=True, + ) + + calls = { + call.kwargs["kwargs"]["collections"][0]: call.kwargs["queue"] + for call in mocked_apply_async.call_args_list + } + self.assertEqual(calls, {"books": "parse_small", "scl": "parse_xlarge"}) diff --git a/log_manager_config/migrations/0005_alter_collectionlogdirectory_translator_class.py b/log_manager_config/migrations/0005_alter_collectionlogdirectory_translator_class.py new file mode 100644 index 0000000..fb8a7ee --- /dev/null +++ b/log_manager_config/migrations/0005_alter_collectionlogdirectory_translator_class.py @@ -0,0 +1,19 @@ +# Generated by Django 5.2.12 on 2026-05-04 20:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("log_manager_config", "0004_logmanagercollectionconfig_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="collectionlogdirectory", + name="translator_class", + field=models.CharField( + default="classic", verbose_name="URL Translator Class" + ), + ), + ] diff --git a/log_manager_config/models.py b/log_manager_config/models.py index 8cf3e34..35b5f90 100644 --- a/log_manager_config/models.py +++ b/log_manager_config/models.py @@ -125,7 +125,7 @@ class CollectionLogDirectory(Orderable, CommonControlField): verbose_name=_('URL Translator Class'), blank=False, null=False, - default='URLTranslatorClassicSite', + default='classic', ) def __str__(self): @@ -148,6 +148,7 @@ def load(cls, data, user): directory_name=item.get('directory_name'), path=item.get('path'), active=item.get('active', True), + translator_class=item.get('translator_class', 'classic'), ) @classmethod @@ -158,6 +159,7 @@ def create_or_update( directory_name, path, active, + translator_class='classic', ): try: obj = cls.objects.get(config=config, path=path) @@ -172,6 +174,7 @@ def create_or_update( obj.directory_name = directory_name obj.path = path obj.active = active + obj.translator_class = translator_class or 'classic' obj.save() logging.info(f'{config.collection.acron3} - {directory_name} - {path}') @@ -275,4 +278,3 @@ class Meta: models.UniqueConstraint(fields=['config', 'email'], name='unique_config_email') ] - diff --git a/metrics/services/daily_payloads.py b/metrics/services/daily_payloads.py index 0e06af9..8b96f7b 100644 --- a/metrics/services/daily_payloads.py +++ b/metrics/services/daily_payloads.py @@ -8,6 +8,8 @@ from django.conf import settings from django.utils import timezone +from metrics.models import DailyMetricJob + def get_daily_payload_root(): return Path(settings.MEDIA_ROOT) / "metrics" / "daily_payloads" @@ -61,8 +63,6 @@ def delete_payload(storage_path): def cleanup_exported_payloads(collections=None, older_than_days=7): - from metrics.models import DailyMetricJob - root = get_daily_payload_root() if not root.exists(): return 0 diff --git a/metrics/tasks/parse.py b/metrics/tasks/parse.py index 7748922..ad3398c 100644 --- a/metrics/tasks/parse.py +++ b/metrics/tasks/parse.py @@ -205,7 +205,10 @@ def _schedule_parse_logs_reexecution( if robots_source is not None: kwargs["robots_source"] = robots_source - task_wait_parse_logs_wave.apply_async(kwargs=kwargs) + apply_kwargs = {"kwargs": kwargs} + if queue_name: + apply_kwargs["queue"] = queue_name + task_wait_parse_logs_wave.apply_async(**apply_kwargs) return True @@ -257,10 +260,13 @@ def task_wait_parse_logs_wave( if robots_source is not None: kwargs["robots_source"] = robots_source - task_wait_parse_logs_wave.apply_async( - kwargs=kwargs, - countdown=poll_interval_seconds, - ) + apply_kwargs = { + "kwargs": kwargs, + "countdown": poll_interval_seconds, + } + if queue_name: + apply_kwargs["queue"] = queue_name + task_wait_parse_logs_wave.apply_async(**apply_kwargs) return {"wave_completed": False, "reexecution_enqueued": False} kwargs = { @@ -282,5 +288,8 @@ def task_wait_parse_logs_wave( if robots_source is not None: kwargs["robots_source"] = robots_source - task_parse_logs.apply_async(kwargs=kwargs) + apply_kwargs = {"kwargs": kwargs} + if queue_name: + apply_kwargs["queue"] = queue_name + task_parse_logs.apply_async(**apply_kwargs) return {"wave_completed": True, "reexecution_enqueued": True} diff --git a/metrics/tests/test_tasks.py b/metrics/tests/test_tasks.py index 932944f..5ffdaf0 100644 --- a/metrics/tests/test_tasks.py +++ b/metrics/tests/test_tasks.py @@ -102,6 +102,23 @@ def test_wait_parse_logs_wave_rechecks_until_daily_jobs_complete(self): mocked_parse_logs_apply_async.assert_not_called() mocked_wait_apply_async.assert_called_once() + def test_wait_parse_logs_wave_preserves_queue_name(self): + job = DailyMetricJob.objects.create( + collection=self.collection, + access_date=date(2012, 3, 10), + status=DailyMetricJob.STATUS_EXPORTING, + ) + + with patch("metrics.tasks.task_wait_parse_logs_wave.apply_async") as mocked_wait_apply_async: + result = tasks.task_wait_parse_logs_wave.run( + wave_log_hashes=[job.pk], + collections=["books"], + queue_name="parse_small", + ) + + self.assertEqual(result, {"wave_completed": False, "reexecution_enqueued": False}) + self.assertEqual(mocked_wait_apply_async.call_args.kwargs["queue"], "parse_small") + class ResumeDailyMetricJobTests(TestCase): def setUp(self): diff --git a/reports/tasks.py b/reports/tasks.py index 69a53a1..d4dde0b 100644 --- a/reports/tasks.py +++ b/reports/tasks.py @@ -1,4 +1,5 @@ import logging +import re from collections import defaultdict from django.core.mail import send_mail @@ -24,7 +25,6 @@ def _extract_date_from_log_file(lf): return date_utils.get_date_obj(probably_date) try: - import re match = re.search(r"(\d{4}-\d{2}-\d{2})", lf.path) if match: return date_utils.get_date_obj(match.group(1)) From 08956d71534c7be8b2520780477a577e69fc8291 Mon Sep 17 00:00:00 2001 From: Rafael JP Damaceno Date: Mon, 4 May 2026 19:13:30 -0300 Subject: [PATCH 2/2] atualiza version para v2.0.2 --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index 38f77a6..e9307ca 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.1 +2.0.2