diff --git a/country/management/commands/evaluate_contract_red_flag.py b/country/management/commands/evaluate_contract_red_flag.py index 7ec5164..c673ccc 100644 --- a/country/management/commands/evaluate_contract_red_flag.py +++ b/country/management/commands/evaluate_contract_red_flag.py @@ -1,7 +1,11 @@ +from datetime import datetime + +from celery import Celery from django.core.management.base import BaseCommand -from country.models import Country, Tender -from country.tasks import clear_redflag, process_redflag, process_redflag6, process_redflag7 +from country.tasks import evaluate_contract_red_flag, round_to_6hour + +app = Celery() class Command(BaseCommand): @@ -11,30 +15,40 @@ def add_arguments(self, parser): parser.add_argument("country", nargs="+", type=str) def handle(self, *args, **options): - country_code = options["country"][0] - country_code = str(country_code).upper() - country_list = Country.objects.exclude(country_code_alpha_2="gl").values_list( - "country_code_alpha_2", flat=True - ) - - if country_code not in country_list: - self.stderr.write("Country code is invalid.") - return - - country = Country.objects.get(country_code_alpha_2=country_code) - - self.stdout.write(f"Processing Red flag for {country.name}") - - country_tenders = Tender.objects.filter( - country_id=country.id, supplier__isnull=False, buyer__isnull=False - ).values("id", "buyer__buyer_name", "supplier__supplier_name", "supplier__supplier_address") - - for tender in country_tenders: - tender_id = tender["id"] - self.stdout.write("Created task for id :" + str(tender_id)) - clear_redflag.apply_async(args=(tender_id,), queue="covid19") - process_redflag6.apply_async(args=(tender_id,), queue="covid19") - process_redflag7.apply_async(args=(tender_id,), queue="covid19") - process_redflag.apply_async(args=(tender_id,), queue="covid19") - - self.stdout.write("Done") + country_alpha_code = options["country"][0] + + present_time = datetime.now() + nearest_hour = round_to_6hour(present_time) + instance = app.control.inspect() + scheduled_list = instance.scheduled() + if bool(scheduled_list): + scheduled_machine = list(scheduled_list.keys())[0] + + if len(scheduled_list[scheduled_machine]) == 0: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_contract_red_flag.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour) + return "Done" + else: + task_name = "evaluate_contract_red_flag" + country = [country_alpha_code] + user_data = {"task": task_name, "country": country} + task_list = [] + for task in scheduled_list[scheduled_machine]: + task_name = task["request"]["name"] + country = task["request"]["args"] + data = {"task": task_name, "country": country} + if data not in task_list: + task_list.append(data) + + if user_data not in task_list: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_contract_red_flag.apply_async( + args=(country_alpha_code,), queue="covid19", eta=nearest_hour + ) + return "Done" + else: + self.stdout.write("Already in queue") + return "Exited" + else: + self.stdout.write("Celery queue not executed!!") + return None diff --git a/country/management/commands/summarize_country_buyer.py b/country/management/commands/summarize_country_buyer.py index 3f0ff82..02ff05f 100644 --- a/country/management/commands/summarize_country_buyer.py +++ b/country/management/commands/summarize_country_buyer.py @@ -1,7 +1,12 @@ +from datetime import datetime + +from celery import Celery from django.core.management.base import BaseCommand -from country.models import Buyer, Country -from country.tasks import summarize_buyer +from country.models import Country +from country.tasks import evaluate_country_buyer, round_to_hour + +app = Celery() class Command(BaseCommand): @@ -11,10 +16,39 @@ def add_arguments(self, parser): def handle(self, *args, **kwargs): country_alpha_code = kwargs["country"] try: - country = Country.objects.get(country_code_alpha_2=country_alpha_code) + Country.objects.get(country_code_alpha_2=country_alpha_code) except Exception: return self.stdout.write("Country alpha code doesnt exist") - buyers = Buyer.objects.filter(tenders__country=country) - for buyer in buyers: - self.stdout.write("Created tasks for buyer_id" + str(buyer.id)) - summarize_buyer.apply_async(args=(buyer.id,), queue="covid19") + + present_time = datetime.now() + nearest_hour = round_to_hour(present_time) + instance = app.control.inspect() + scheduled_list = instance.scheduled() + if bool(scheduled_list): + scheduled_machine = list(scheduled_list.keys())[0] + if len(scheduled_list[scheduled_machine]) == 0: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour) + return "Done" + else: + task_name = "evaluate_country_buyer" + country = [country_alpha_code] + user_data = {"task": task_name, "country": country} + task_list = [] + for task in scheduled_list[scheduled_machine]: + task_name = task["request"]["name"] + country = task["request"]["args"] + data = {"task": task_name, "country": country} + if data not in task_list: + task_list.append(data) + + if user_data not in task_list: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour) + return "Done" + else: + self.stdout.write("Already in queue") + return "Exited" + else: + self.stdout.write("Celery queue not executed!!") + return "Error" diff --git a/country/management/commands/summarize_country_supplier.py b/country/management/commands/summarize_country_supplier.py index 02ef685..10a2e7b 100644 --- a/country/management/commands/summarize_country_supplier.py +++ b/country/management/commands/summarize_country_supplier.py @@ -1,7 +1,12 @@ +from datetime import datetime + +from celery import Celery from django.core.management.base import BaseCommand -from country.models import Country, Supplier -from country.tasks import summarize_supplier +from country.models import Country +from country.tasks import evaluate_country_supplier, round_to_hour + +app = Celery() class Command(BaseCommand): @@ -10,11 +15,43 @@ def add_arguments(self, parser): def handle(self, *args, **kwargs): country_alpha_code = kwargs["country"] + try: - country = Country.objects.get(country_code_alpha_2=country_alpha_code) + Country.objects.get(country_code_alpha_2=country_alpha_code) except Exception: return self.stdout.write("Country alpha code doesnt exist") - suppliers = Supplier.objects.filter(tenders__country=country) - for supplier in suppliers: - self.stdout.write("Created tasks for supplier_id" + str(supplier.id)) - summarize_supplier.apply_async(args=(supplier.id,), queue="covid19") + present_time = datetime.now() + nearest_hour = round_to_hour(present_time) + instance = app.control.inspect() + scheduled_list = instance.scheduled() + if bool(scheduled_list): + scheduled_machine = list(scheduled_list.keys())[0] + + if len(scheduled_list[scheduled_machine]) == 0: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_country_supplier.apply_async(args=(country_alpha_code,), queue="covid19", eta=nearest_hour) + return "Done" + else: + task_name = "evaluate_country_supplier" + country = [country_alpha_code] + user_data = {"task": task_name, "country": country} + task_list = [] + for task in scheduled_list[scheduled_machine]: + task_name = task["request"]["name"] + country = task["request"]["args"] + data = {"task": task_name, "country": country} + if data not in task_list: + task_list.append(data) + + if user_data not in task_list: + self.stdout.write(f"Task created for time :{nearest_hour}") + evaluate_country_supplier.apply_async( + args=(country_alpha_code,), queue="covid19", eta=nearest_hour + ) + return "Done" + else: + self.stdout.write("Already in queue") + return "Exited" + else: + self.stdout.write("Celery queue not executed!!") + return "Error" diff --git a/country/tasks.py b/country/tasks.py index 449d438..26e59cb 100644 --- a/country/tasks.py +++ b/country/tasks.py @@ -1,8 +1,8 @@ +import datetime import os import sys import traceback from collections import defaultdict -from datetime import datetime from pathlib import Path import dateutil.parser @@ -1107,3 +1107,57 @@ def summarize_supplier(supplier_id): except Exception as e: return e return "Completed" + + +@app.task(name="evaluate_country_buyer") +def evaluate_country_buyer(country_code): + buyers = Buyer.objects.filter(country__country_code_alpha_2=country_code) + for buyer in buyers: + summarize_buyer(buyer.id) + + +@app.task(name="evaluate_country_supplier") +def evaluate_country_supplier(country_code): + suppliers = Supplier.objects.filter(country__country_code_alpha_2=country_code) + for supplier in suppliers: + summarize_supplier(supplier.id) + + +@app.task(name="evaluate_contract_red_flag") +def evaluate_contract_red_flag(country_code): + country = Country.objects.get(country_code_alpha_2=country_code) + country_tenders = Tender.objects.filter(country_id=country.id, supplier__isnull=False, buyer__isnull=False) + for tender in country_tenders: + tender_id = tender.id + clear_redflag.apply_async(args=(tender_id,), queue="covid19") + process_redflag6.apply_async(args=(tender_id,), queue="covid19") + process_redflag7.apply_async(args=(tender_id,), queue="covid19") + process_redflag.apply_async(args=(tender_id,), queue="covid19") + + +def round_to_hour(dt): + dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0) + dt = dt_start_of_hour + datetime.timedelta(hours=1) + return dt + + +def round_to_6hour(dt): + dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0) + dt = dt_start_of_hour + datetime.timedelta(hours=6) + return dt + + +def round_to_15(dt): + dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0) + dt_15 = dt.replace(minute=15, second=0, microsecond=0) + dt_30 = dt.replace(minute=30, second=0, microsecond=0) + dt_45 = dt.replace(minute=45, second=0, microsecond=0) + if dt > dt_15 and dt < dt_30: + dt = dt_start_of_hour + datetime.timedelta(minutes=30) + elif dt > dt_30 and dt < dt_45: + dt = dt_start_of_hour + datetime.timedelta(minutes=45) + elif dt < dt_15: + dt = dt_start_of_hour + datetime.timedelta(minutes=15) + else: + dt = dt_start_of_hour + datetime.timedelta(hours=1) + return dt diff --git a/country/tests/commands/test_evaluate_contract_red_flag.py b/country/tests/commands/test_evaluate_contract_red_flag.py index fe8148b..f8b254a 100644 --- a/country/tests/commands/test_evaluate_contract_red_flag.py +++ b/country/tests/commands/test_evaluate_contract_red_flag.py @@ -50,7 +50,7 @@ def test_without_country_code(self): call_command("evaluate_contract_red_flag") def test_with_country_code(self): - self.assertEquals(call_command("evaluate_contract_red_flag", "mx"), None) + self.assertEquals(call_command("evaluate_contract_red_flag", "MX"), None) def test_with_country_wrong_code(self): self.assertEquals(call_command("evaluate_contract_red_flag", "mxss"), None)