diff --git a/open_fmri/apps/log_parse/models.py b/open_fmri/apps/log_parse/models.py index b766a70..ff5ff72 100644 --- a/open_fmri/apps/log_parse/models.py +++ b/open_fmri/apps/log_parse/models.py @@ -9,3 +9,4 @@ class LogFile(models.Model): class S3File(models.Model): filename = models.TextField(unique=True) count = models.IntegerField() + dataset = models.ForeignKey('dataset.Dataset', null=True) diff --git a/open_fmri/apps/log_parse/tasks.py b/open_fmri/apps/log_parse/tasks.py index 03bc424..5325ad0 100644 --- a/open_fmri/apps/log_parse/tasks.py +++ b/open_fmri/apps/log_parse/tasks.py @@ -59,6 +59,7 @@ def parse_log_files(): conn = S3Connection(aws_access_key, aws_secret_key) bucket = conn.get_bucket(bucket_name) + file_count = 0 for key in bucket.list(prefix=prefix): try: log_file = LogFile.objects.get(key=key.key) @@ -77,7 +78,26 @@ def parse_log_files(): log_file.parsed = True log_file.lock = False log_file.save() + + #Only parse 10 log files at a time to prevent long running tasks + file_count += 1 + if file_count >= 100: + break +def parse_log_files_locally(path_to_logs): + """Parse S3 log files that are local + + Intended to be run manually, has no provisions for locking that the + normal task has. + """ + for log in os.listdir(path_to_logs): + contents = open(path_to_logs + log, 'r').read() + key = "logs/" + log + print(key) + parse_str(contents) + log_file = LogFile(key=key, parsed=True, lock=False) + log_file.save() + def parse_str(contents): """Writes the download count for a file referenced in an S3 log to database @@ -86,30 +106,32 @@ def parse_str(contents): count for each time its seen. For each filename that is seen an entry in the database is created and count stored. - We ignore entries where no bytes are transferred. + We ignore entries where no bytes are transferred, any response other than + a 200 was seen, if it wasn't a GET OBJECT, or the filename is blank """ parsed_data = {} for log_line in contents.splitlines(): match = s3_line_logpat.match(log_line) if match is not None: parsed_line = [match.group(1+n) for n in range(17)] - is_get_request = False + is_get_request = True is_valid_file = False + count_flag = True filename = '' for (name, val) in zip(s3_names, parsed_line): - if name == 'operation' and val == 'REST.GET.OBJECT': - print(val) - is_get_request = True - elif name == 'key' and val is not '-' and val[-1:] is not '/' and is_get_request: - print(val) - filename = val - is_valid_file = True - elif name == 'bytes_sent' and val is '-': - print(val) - is_valid_file = False + if name == 'operation' and val != 'REST.GET.OBJECT': + count_flag = False else: pass - if is_valid_file: + if name == 'http_status' and val != '200': + count_flag = False + if name == 'key': + filename = val + if name == 'key' and val is '-': + count_flag = False + if name == 'bytes_sent' and val is '-': + count_flag = False + if count_flag: try: parsed_data[filename] += 1 except KeyError: diff --git a/open_fmri/settings/base.py b/open_fmri/settings/base.py index e5fcdc4..e5c62bc 100644 --- a/open_fmri/settings/base.py +++ b/open_fmri/settings/base.py @@ -181,12 +181,14 @@ Queue('default', Exchange('default'), routing_key='default'), ) -CELERYBEAT_SCHEDULE = { - 'Parse Logs': { - 'task': 'log_parse_task', - 'schedule': crontab() - }, -} +if os.environ.get('RUN_TASKS', False): + CELERYBEAT_SCHEDULE = { + 'Parse Logs': { + 'task': 'log_parse_task', + 'schedule': crontab() + }, + } + # .local.py overrides all the common settings. try: