Skip to content

Commit

Permalink
file download counting logic more strict, new environment variable to…
Browse files Browse the repository at this point in the history
… control scheduling of cron style tasks, log_parse model now has FK into datasets
  • Loading branch information
rwblair committed Jan 25, 2016
1 parent 36cf0b5 commit d89ee4c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 19 deletions.
1 change: 1 addition & 0 deletions open_fmri/apps/log_parse/models.py
Expand Up @@ -9,3 +9,4 @@ class LogFile(models.Model):
class S3File(models.Model):
filename = models.TextField(unique=True)
count = models.IntegerField()
dataset = models.ForeignKey('dataset.Dataset', null=True)
48 changes: 35 additions & 13 deletions open_fmri/apps/log_parse/tasks.py
Expand Up @@ -59,6 +59,7 @@ def parse_log_files():

conn = S3Connection(aws_access_key, aws_secret_key)
bucket = conn.get_bucket(bucket_name)
file_count = 0
for key in bucket.list(prefix=prefix):
try:
log_file = LogFile.objects.get(key=key.key)
Expand All @@ -77,7 +78,26 @@ def parse_log_files():
log_file.parsed = True
log_file.lock = False
log_file.save()

#Only parse 10 log files at a time to prevent long running tasks
file_count += 1
if file_count >= 100:
break

def parse_log_files_locally(path_to_logs):
"""Parse S3 log files that are local
Intended to be run manually, has no provisions for locking that the
normal task has.
"""
for log in os.listdir(path_to_logs):
contents = open(path_to_logs + log, 'r').read()
key = "logs/" + log
print(key)
parse_str(contents)
log_file = LogFile(key=key, parsed=True, lock=False)
log_file.save()


def parse_str(contents):
"""Writes the download count for a file referenced in an S3 log to database
Expand All @@ -86,30 +106,32 @@ def parse_str(contents):
count for each time its seen. For each filename that is seen an entry in
the database is created and count stored.
We ignore entries where no bytes are transferred.
We ignore entries where no bytes are transferred, any response other than
a 200 was seen, if it wasn't a GET OBJECT, or the filename is blank
"""
parsed_data = {}
for log_line in contents.splitlines():
match = s3_line_logpat.match(log_line)
if match is not None:
parsed_line = [match.group(1+n) for n in range(17)]
is_get_request = False
is_get_request = True
is_valid_file = False
count_flag = True
filename = ''
for (name, val) in zip(s3_names, parsed_line):
if name == 'operation' and val == 'REST.GET.OBJECT':
print(val)
is_get_request = True
elif name == 'key' and val is not '-' and val[-1:] is not '/' and is_get_request:
print(val)
filename = val
is_valid_file = True
elif name == 'bytes_sent' and val is '-':
print(val)
is_valid_file = False
if name == 'operation' and val != 'REST.GET.OBJECT':
count_flag = False
else:
pass
if is_valid_file:
if name == 'http_status' and val != '200':
count_flag = False
if name == 'key':
filename = val
if name == 'key' and val is '-':
count_flag = False
if name == 'bytes_sent' and val is '-':
count_flag = False
if count_flag:
try:
parsed_data[filename] += 1
except KeyError:
Expand Down
14 changes: 8 additions & 6 deletions open_fmri/settings/base.py
Expand Up @@ -181,12 +181,14 @@
Queue('default', Exchange('default'), routing_key='default'),
)

CELERYBEAT_SCHEDULE = {
'Parse Logs': {
'task': 'log_parse_task',
'schedule': crontab()
},
}
if os.environ.get('RUN_TASKS', False):
CELERYBEAT_SCHEDULE = {
'Parse Logs': {
'task': 'log_parse_task',
'schedule': crontab()
},
}


# .local.py overrides all the common settings.
try:
Expand Down

0 comments on commit d89ee4c

Please sign in to comment.