Skip to content
This repository has been archived by the owner on Dec 13, 2022. It is now read-only.

Commit

Permalink
CPA-297 Handling custom jobs
Browse files Browse the repository at this point in the history
[x] Created custom jobs for red_flag, summarize_supplier and summarize_buyer
  • Loading branch information
Suyoj Man Tamrakar committed May 21, 2021
1 parent 27517e6 commit e83a46b
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 45 deletions.
72 changes: 43 additions & 29 deletions country/management/commands/evaluate_contract_red_flag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from datetime import datetime

from celery import Celery
from django.core.management.base import BaseCommand

from country.models import Country, Tender
from country.tasks import clear_redflag, process_redflag, process_redflag6, process_redflag7
from country.tasks import evaluate_contract_red_flag, round_to_6hour

app = Celery()


class Command(BaseCommand):
Expand All @@ -11,30 +15,40 @@ def add_arguments(self, parser):
parser.add_argument("country", nargs="+", type=str)

def handle(self, *args, **options):
country_code = options["country"][0]
country_code = str(country_code).upper()
country_list = Country.objects.exclude(country_code_alpha_2="gl").values_list(
"country_code_alpha_2", flat=True
)

if country_code not in country_list:
self.stderr.write("Country code is invalid.")
return

country = Country.objects.get(country_code_alpha_2=country_code)

self.stdout.write(f"Processing Red flag for {country.name}")

country_tenders = Tender.objects.filter(
country_id=country.id, supplier__isnull=False, buyer__isnull=False
).values("id", "buyer__buyer_name", "supplier__supplier_name", "supplier__supplier_address")

for tender in country_tenders:
tender_id = tender["id"]
self.stdout.write("Created task for id :" + str(tender_id))
clear_redflag.apply_async(args=(tender_id,), queue="covid19")
process_redflag6.apply_async(args=(tender_id,), queue="covid19")
process_redflag7.apply_async(args=(tender_id,), queue="covid19")
process_redflag.apply_async(args=(tender_id,), queue="covid19")

self.stdout.write("Done")
country_alpha_code = options["country"][0]

present_time = datetime.now()
nearest_hour = round_to_6hour(present_time)
instance = app.control.inspect()
scheduled_list = instance.scheduled()
if bool(scheduled_list):
scheduled_machine = list(scheduled_list.keys())[0]

if len(scheduled_list[scheduled_machine]) == 0:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_contract_red_flag.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour)
return "Done"
else:
task_name = "evaluate_contract_red_flag"
country = [country_alpha_code]
user_data = {"task": task_name, "country": country}
task_list = []
for task in scheduled_list[scheduled_machine]:
task_name = task["request"]["name"]
country = task["request"]["args"]
data = {"task": task_name, "country": country}
if data not in task_list:
task_list.append(data)

if user_data not in task_list:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_contract_red_flag.apply_async(
args=(country_alpha_code,), queue="covid19", eta=nearest_hour
)
return "Done"
else:
self.stdout.write("Already in queue")
return "Exited"
else:
self.stdout.write("Celery queue not executed!!")
return None
48 changes: 41 additions & 7 deletions country/management/commands/summarize_country_buyer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from datetime import datetime

from celery import Celery
from django.core.management.base import BaseCommand

from country.models import Buyer, Country
from country.tasks import summarize_buyer
from country.models import Country
from country.tasks import evaluate_country_buyer, round_to_hour

app = Celery()


class Command(BaseCommand):
Expand All @@ -11,10 +16,39 @@ def add_arguments(self, parser):
def handle(self, *args, **kwargs):
country_alpha_code = kwargs["country"]
try:
country = Country.objects.get(country_code_alpha_2=country_alpha_code)
Country.objects.get(country_code_alpha_2=country_alpha_code)
except Exception:
return self.stdout.write("Country alpha code doesnt exist")
buyers = Buyer.objects.filter(tenders__country=country)
for buyer in buyers:
self.stdout.write("Created tasks for buyer_id" + str(buyer.id))
summarize_buyer.apply_async(args=(buyer.id,), queue="covid19")

present_time = datetime.now()
nearest_hour = round_to_hour(present_time)
instance = app.control.inspect()
scheduled_list = instance.scheduled()
if bool(scheduled_list):
scheduled_machine = list(scheduled_list.keys())[0]
if len(scheduled_list[scheduled_machine]) == 0:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour)
return "Done"
else:
task_name = "evaluate_country_buyer"
country = [country_alpha_code]
user_data = {"task": task_name, "country": country}
task_list = []
for task in scheduled_list[scheduled_machine]:
task_name = task["request"]["name"]
country = task["request"]["args"]
data = {"task": task_name, "country": country}
if data not in task_list:
task_list.append(data)

if user_data not in task_list:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour)
return "Done"
else:
self.stdout.write("Already in queue")
return "Exited"
else:
self.stdout.write("Celery queue not executed!!")
return "Error"
51 changes: 44 additions & 7 deletions country/management/commands/summarize_country_supplier.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from datetime import datetime

from celery import Celery
from django.core.management.base import BaseCommand

from country.models import Country, Supplier
from country.tasks import summarize_supplier
from country.models import Country
from country.tasks import evaluate_country_supplier, round_to_hour

app = Celery()


class Command(BaseCommand):
Expand All @@ -10,11 +15,43 @@ def add_arguments(self, parser):

def handle(self, *args, **kwargs):
country_alpha_code = kwargs["country"]

try:
country = Country.objects.get(country_code_alpha_2=country_alpha_code)
Country.objects.get(country_code_alpha_2=country_alpha_code)
except Exception:
return self.stdout.write("Country alpha code doesnt exist")
suppliers = Supplier.objects.filter(tenders__country=country)
for supplier in suppliers:
self.stdout.write("Created tasks for supplier_id" + str(supplier.id))
summarize_supplier.apply_async(args=(supplier.id,), queue="covid19")
present_time = datetime.now()
nearest_hour = round_to_hour(present_time)
instance = app.control.inspect()
scheduled_list = instance.scheduled()
if bool(scheduled_list):
scheduled_machine = list(scheduled_list.keys())[0]

if len(scheduled_list[scheduled_machine]) == 0:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_country_supplier.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour)
return "Done"
else:
task_name = "evaluate_country_supplier"
country = [country_alpha_code]
user_data = {"task": task_name, "country": country}
task_list = []
for task in scheduled_list[scheduled_machine]:
task_name = task["request"]["name"]
country = task["request"]["args"]
data = {"task": task_name, "country": country}
if data not in task_list:
task_list.append(data)

if user_data not in task_list:
self.stdout.write(f"Task created for time :{nearest_hour}")
evaluate_country_supplier.apply_async(
args=(country_alpha_code,), queue="covid19", eta=nearest_hour
)
return "Done"
else:
self.stdout.write("Already in queue")
return "Exited"
else:
self.stdout.write("Celery queue not executed!!")
return "Error"
56 changes: 55 additions & 1 deletion country/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime
import os
import sys
import traceback
from collections import defaultdict
from datetime import datetime
from pathlib import Path

import dateutil.parser
Expand Down Expand Up @@ -1107,3 +1107,57 @@ def summarize_supplier(supplier_id):
except Exception as e:
return e
return "Completed"


@app.task(name="evaluate_country_buyer")
def evaluate_country_buyer(country_code):
buyers = Buyer.objects.filter(country__country_code_alpha_2=country_code)
for buyer in buyers:
summarize_buyer(buyer.id)


@app.task(name="evaluate_country_supplier")
def evaluate_country_supplier(country_code):
suppliers = Supplier.objects.filter(country__country_code_alpha_2=country_code)
for supplier in suppliers:
summarize_supplier(supplier.id)


@app.task(name="evaluate_contract_red_flag")
def evaluate_contract_red_flag(country_code):
country = Country.objects.get(country_code_alpha_2=country_code)
country_tenders = Tender.objects.filter(country_id=country.id, supplier__isnull=False, buyer__isnull=False)
for tender in country_tenders:
tender_id = tender.id
clear_redflag.apply_async(args=(tender_id,), queue="covid19")
process_redflag6.apply_async(args=(tender_id,), queue="covid19")
process_redflag7.apply_async(args=(tender_id,), queue="covid19")
process_redflag.apply_async(args=(tender_id,), queue="covid19")


def round_to_hour(dt):
dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0)
dt = dt_start_of_hour + datetime.timedelta(hours=1)
return dt


def round_to_6hour(dt):
dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0)
dt = dt_start_of_hour + datetime.timedelta(hours=6)
return dt


def round_to_15(dt):
dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0)
dt_15 = dt.replace(minute=15, second=0, microsecond=0)
dt_30 = dt.replace(minute=30, second=0, microsecond=0)
dt_45 = dt.replace(minute=45, second=0, microsecond=0)
if dt > dt_15 and dt < dt_30:
dt = dt_start_of_hour + datetime.timedelta(minutes=30)
elif dt > dt_30 and dt < dt_45:
dt = dt_start_of_hour + datetime.timedelta(minutes=45)
elif dt < dt_15:
dt = dt_start_of_hour + datetime.timedelta(minutes=15)
else:
dt = dt_start_of_hour + datetime.timedelta(hours=1)
return dt
2 changes: 1 addition & 1 deletion country/tests/commands/test_evaluate_contract_red_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_without_country_code(self):
call_command("evaluate_contract_red_flag")

def test_with_country_code(self):
self.assertEquals(call_command("evaluate_contract_red_flag", "mx"), None)
self.assertEquals(call_command("evaluate_contract_red_flag", "MX"), None)

def test_with_country_wrong_code(self):
self.assertEquals(call_command("evaluate_contract_red_flag", "mxss"), None)

0 comments on commit e83a46b

Please sign in to comment.