Skip to content

Commit

Permalink
setup lock mechanism via django cache to prevent multiple instances o…
Browse files Browse the repository at this point in the history
…f the task from running. moved task lock logic to its own funciton. fixed log file lock logic
  • Loading branch information
rwblair committed Jan 21, 2016
1 parent 86eb030 commit 6ddc702
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 12 deletions.
29 changes: 29 additions & 0 deletions open_fmri/apps/log_parse/migrations/0002_auto_20160120_2149.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('log_parse', '0001_initial'),
]

operations = [
migrations.RemoveField(
model_name='s3file',
name='url',
),
migrations.AddField(
model_name='s3file',
name='filename',
field=models.TextField(default='filename', unique=True),
preserve_default=False,
),
migrations.AlterField(
model_name='logfile',
name='key',
field=models.TextField(unique=True),
),
]
29 changes: 20 additions & 9 deletions open_fmri/apps/log_parse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from datetime import datetime

from boto.s3.connection import S3Connection
from celery import shared_task, Celery
from celery import Celery, task, shared_task
from celery.utils.log import get_task_logger

from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist

from log_parse.models import LogFile, S3File
Expand All @@ -31,19 +32,31 @@
"referer", "user_agent")
# END

@app.task(name='log_parse_task')
def log_parse_task():
lock_id = "parse_log_files"
acquire_lock = lambda: cache.add(lock_id, 'true')
release_lock = lambda: cache.delete(lock_id)

if aquire_lock():
try:
parse_log_files()
finally:
release_lock()
return

def parse_log_files():
"""Parse S3 log files that reside in an S3 bucket
The contents of BUCKET_NAME are iterated over. Already parsed files have
their filename added to PARSED_FILES to prevent duplicate parsing.
"""

aws_access_key = os.environ.get('S3_LOG_ACCESS_KEY')
aws_secret_key = os.environ.get('S3_LOG_SECRET_KEY')
bucket_name = os.environ.get('S3_LOG_BUCKET')
prefix = os.environ.get('S3_LOG_PREFIX')
parsed_files = os.environ.get('S3_LOG_PARSED_FILES')

parsed_files = open(parsed_files, 'r+')
conn = S3Connection(aws_access_key, aws_secret_key)
bucket = conn.get_bucket(bucket_name)
for key in bucket.list(prefix=prefix):
Expand All @@ -61,6 +74,10 @@ def parse_log_files():
contents = str(key.get_contents_as_string())
parse_str(contents)

log_file.parsed = True
log_file.lock = False
log_file.save()


def parse_str(contents):
"""Writes the download count for a file referenced in an S3 log to database
Expand Down Expand Up @@ -106,9 +123,3 @@ def parse_str(contents):
except ObjectDoesNotExist:
s3_file = S3File(filename=filename, count=parsed_data[filename])
s3_file.save()

@app.task(name='test_parse')
def test_parse():
parse_log_files()
return datetime.now()

4 changes: 1 addition & 3 deletions open_fmri/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,9 @@
Queue('default', Exchange('default'), routing_key='default'),
)

CELERY_IMPORTS = ('open_fmri.apps.dataset.tasks', )

CELERYBEAT_SCHEDULE = {
'Parse Logs': {
'task': 'test_parse',
'task': 'log_parse_task',
'schedule': crontab()
},
}
Expand Down

0 comments on commit 6ddc702

Please sign in to comment.