Skip to content

Commit

Permalink
Backfill command should ignore actioned messages which already exist
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 2, 2016
1 parent e7c1b15 commit 3b21ba8
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions casepro/msgs/management/commands/msgbackfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@ def add_arguments(self, parser):
help='The orgs to backfill')
parser.add_argument('--analyze', dest='analyze', action='store_const', const=True, default=False,
help="Whether to analyze local messages rather than actually back-fill")

parser.add_argument('--no-actioned', dest='ignore_actioned', action='store_const', const=True, default=False,
help="Whether to exclude actioned messages from the back-fill")
parser.add_argument('--no-labelled', dest='ignore_labelled', action='store_const', const=True, default=False,
help="Whether to exclude labelled messages from the back-fill")
parser.add_argument('--no-recent', dest='ignore_recent', action='store_const', const=True, default=False,
help="Whether to exclude recent messages from the back-fill")

parser.add_argument('--exclude-label', dest='exclude_label', default=None,
help="Name of a label to exclude")
parser.add_argument('--labelled-months', dest='labelled_months', type=int, default=NUM_LABELLED_MONTHS,
help="Number of months to fetch labelled messages for")
parser.add_argument('--recent-months', dest='recent_months', type=int, default=NUM_RECENT_MONTHS,
help="Number of months to fetch recent messages for")

def handle(self, *args, **options):
org_ids = options['org_ids']
Expand All @@ -52,6 +58,8 @@ def handle(self, *args, **options):
ignore_labelled = options['ignore_labelled']
ignore_recent = options['ignore_recent']
exclude_label = options['exclude_label']
labelled_months = options['labelled_months']
recent_months = options['recent_months']

if org_ids:
orgs = Org.objects.filter(pk__in=org_ids)
Expand All @@ -72,29 +80,29 @@ def handle(self, *args, **options):
if analyze:
self.analyze(org)
else:
self.backfill(org, ignore_actioned, ignore_labelled, ignore_recent, exclude_label)
self.backfill(org, ignore_actioned, ignore_labelled, ignore_recent, exclude_label, labelled_months, recent_months)

def analyze(self, org):
self.stdout.write("Starting analysis for org %s [%d]..." % (org.name, org.pk))

rapidpro_msg_ids = self.get_message_ids_for_cases_and_actions(org)
backend_ids = self.get_backend_ids_for_cases_and_actions(org)

num_existing = Message.objects.filter(org=org, backend_id__in=rapidpro_msg_ids).count()
num_missing = len(rapidpro_msg_ids) - num_existing
num_existing = Message.objects.filter(org=org, backend_id__in=backend_ids).count()
num_missing = len(backend_ids) - num_existing

self.stdout.write(" > Found %d message ids in cases and actions (%d missing locally)" % (len(rapidpro_msg_ids), num_missing))
self.stdout.write(" > Found %d message ids in cases and actions (%d missing locally)" % (len(backend_ids), num_missing))

def backfill(self, org, ignore_actioned, ignore_labelled, ignore_recent, exclude_label):
def backfill(self, org, ignore_actioned, ignore_labelled, ignore_recent, exclude_label, labelled_months, recent_months):
self.stdout.write("Starting backfill for org %s [%d]..." % (org.name, org.pk))

if not ignore_actioned:
self.backfill_actioned(org)

if not ignore_labelled:
self.backfill_labelled(org, NUM_LABELLED_MONTHS, exclude_label)
self.backfill_labelled(org, labelled_months, exclude_label)

if not ignore_recent:
self.backfill_recent(org, NUM_RECENT_MONTHS)
self.backfill_recent(org, recent_months)

self.stdout.write(" > Finished org with %d messages" % Message.objects.filter(org=org).count())

Expand All @@ -105,17 +113,19 @@ def backfill_actioned(self, org):
from casepro.backend.rapidpro import MessageSyncer
syncer = MessageSyncer(as_handled=True)

msg_ids = self.get_message_ids_for_cases_and_actions(org)
backend_ids = self.get_backend_ids_for_cases_and_actions(org)
existing_ids = set(Message.objects.filter(org=org, backend_id__in=backend_ids).values_list('backend_id', flat=True))
missing_ids = [m for m in backend_ids if m not in existing_ids]

self.stdout.write(" > Found %d message ids in cases and actions" % len(msg_ids))
self.stdout.write(" > Found %d message ids in cases and actions (%d missing locally)" % (len(backend_ids), len(missing_ids)))

label_uuids_by_name = {l.name: l.uuid for l in org.labels.all()}

client = org.get_temba_client(api_version=1)

num_synced = 0

for id_batch in chunks(msg_ids, MSGS_PER_ID_FETCH):
for id_batch in chunks(missing_ids, MSGS_PER_ID_FETCH):
fetched_v1s = client.get_messages(ids=id_batch)
remotes_as_v2s = [self.v1_message_to_v2(m, label_uuids_by_name) for m in fetched_v1s]

Expand All @@ -135,6 +145,8 @@ def backfill_labelled(self, org, num_months, exclude_label):

since = now() - relativedelta(months=num_months)

self.stdout.write(" > Fetching labelled messages since %s..." % since.strftime('%b %d, %Y %H:%M'))

client = org.get_temba_client(api_version=2)

def progress_callback(num_fetched):
Expand All @@ -160,7 +172,7 @@ def backfill_recent(self, org, num_months):
"""
since = now() - relativedelta(months=num_months)

self.stdout.write(" > Fetching all messages since %s..." % since.strftime('%b %d, %Y %H:%M'))
self.stdout.write(" > Fetching recent messages since %s..." % since.strftime('%b %d, %Y %H:%M'))

def progress_callback(num_fetched):
self.api_v2_message_requests[org] += 1
Expand All @@ -171,7 +183,7 @@ def progress_callback(num_fetched):

self.stdout.write(" > Synced messages (%d created, %d updated, %d deleted)" % (created, updated, deleted))

def get_message_ids_for_cases_and_actions(self, org):
def get_backend_ids_for_cases_and_actions(self, org):
ids_to_fetch = set()

for case in org.cases.all():
Expand Down

0 comments on commit 3b21ba8

Please sign in to comment.