diff --git a/country/management/commands/evaluate_contract_red_flag.py b/country/management/commands/evaluate_contract_red_flag.py index 7ec5164..0a2a28e 100644 --- a/country/management/commands/evaluate_contract_red_flag.py +++ b/country/management/commands/evaluate_contract_red_flag.py @@ -1,7 +1,7 @@ from django.core.management.base import BaseCommand -from country.models import Country, Tender -from country.tasks import clear_redflag, process_redflag, process_redflag6, process_redflag7 +from country.models import Country +from country.tasks import evaluate_contract_red_flag class Command(BaseCommand): @@ -10,31 +10,14 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument("country", nargs="+", type=str) - def handle(self, *args, **options): - country_code = options["country"][0] - country_code = str(country_code).upper() - country_list = Country.objects.exclude(country_code_alpha_2="gl").values_list( - "country_code_alpha_2", flat=True + def handle(self, *args, **kwargs): + country_alpha_code = kwargs["country"] + try: + Country.objects.get(country_code_alpha_2=country_alpha_code) + except Exception: + return self.stdout.write("Country alpha code doesnt exist") + evaluate_contract_red_flag.apply_async( + args=(country_alpha_code,), + queue="covid19", ) - - if country_code not in country_list: - self.stderr.write("Country code is invalid.") - return - - country = Country.objects.get(country_code_alpha_2=country_code) - - self.stdout.write(f"Processing Red flag for {country.name}") - - country_tenders = Tender.objects.filter( - country_id=country.id, supplier__isnull=False, buyer__isnull=False - ).values("id", "buyer__buyer_name", "supplier__supplier_name", "supplier__supplier_address") - - for tender in country_tenders: - tender_id = tender["id"] - self.stdout.write("Created task for id :" + str(tender_id)) - clear_redflag.apply_async(args=(tender_id,), queue="covid19") - process_redflag6.apply_async(args=(tender_id,), queue="covid19") - process_redflag7.apply_async(args=(tender_id,), queue="covid19") - process_redflag.apply_async(args=(tender_id,), queue="covid19") - - self.stdout.write("Done") + return "Done" diff --git a/country/management/commands/summarize_country_buyer.py b/country/management/commands/summarize_country_buyer.py index 3f0ff82..499429e 100644 --- a/country/management/commands/summarize_country_buyer.py +++ b/country/management/commands/summarize_country_buyer.py @@ -1,7 +1,7 @@ from django.core.management.base import BaseCommand -from country.models import Buyer, Country -from country.tasks import summarize_buyer +from country.models import Country +from country.tasks import evaluate_country_buyer class Command(BaseCommand): @@ -11,10 +11,8 @@ def add_arguments(self, parser): def handle(self, *args, **kwargs): country_alpha_code = kwargs["country"] try: - country = Country.objects.get(country_code_alpha_2=country_alpha_code) + Country.objects.get(country_code_alpha_2=country_alpha_code) except Exception: return self.stdout.write("Country alpha code doesnt exist") - buyers = Buyer.objects.filter(tenders__country=country) - for buyer in buyers: - self.stdout.write("Created tasks for buyer_id" + str(buyer.id)) - summarize_buyer.apply_async(args=(buyer.id,), queue="covid19") + evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19") + return "Done" diff --git a/country/management/commands/summarize_country_supplier.py b/country/management/commands/summarize_country_supplier.py index 02ef685..cd58009 100644 --- a/country/management/commands/summarize_country_supplier.py +++ b/country/management/commands/summarize_country_supplier.py @@ -1,7 +1,7 @@ from django.core.management.base import BaseCommand -from country.models import Country, Supplier -from country.tasks import summarize_supplier +from country.models import Country +from country.tasks import evaluate_country_supplier class Command(BaseCommand): @@ -11,10 +11,8 @@ def add_arguments(self, parser): def handle(self, *args, **kwargs): country_alpha_code = kwargs["country"] try: - country = Country.objects.get(country_code_alpha_2=country_alpha_code) + Country.objects.get(country_code_alpha_2=country_alpha_code) except Exception: return self.stdout.write("Country alpha code doesnt exist") - suppliers = Supplier.objects.filter(tenders__country=country) - for supplier in suppliers: - self.stdout.write("Created tasks for supplier_id" + str(supplier.id)) - summarize_supplier.apply_async(args=(supplier.id,), queue="covid19") + evaluate_country_supplier.apply_async(args=(country_alpha_code,), queue="covid19") + return "Done" diff --git a/country/tasks.py b/country/tasks.py index 449d438..fe00065 100644 --- a/country/tasks.py +++ b/country/tasks.py @@ -1,8 +1,8 @@ +import datetime import os import sys import traceback from collections import defaultdict -from datetime import datetime from pathlib import Path import dateutil.parser @@ -1107,3 +1107,29 @@ def summarize_supplier(supplier_id): except Exception as e: return e return "Completed" + + +@app.task(name="evaluate_country_buyer") +def evaluate_country_buyer(country_code): + buyers = Buyer.objects.filter(country__country_code_alpha_2=country_code) + for buyer in buyers: + summarize_buyer(buyer.id) + + +@app.task(name="evaluate_country_supplier") +def evaluate_country_supplier(country_code): + suppliers = Supplier.objects.filter(country__country_code_alpha_2=country_code) + for supplier in suppliers: + summarize_supplier(supplier.id) + + +@app.task(name="evaluate_contract_red_flag") +def evaluate_contract_red_flag(country_code): + country = Country.objects.get(country_code_alpha_2=country_code) + country_tenders = Tender.objects.filter(country_id=country.id, supplier__isnull=False, buyer__isnull=False) + for tender in country_tenders: + tender_id = tender.id + clear_redflag.apply_async(args=(tender_id,), queue="covid19") + process_redflag6.apply_async(args=(tender_id,), queue="covid19") + process_redflag7.apply_async(args=(tender_id,), queue="covid19") + process_redflag.apply_async(args=(tender_id,), queue="covid19") diff --git a/country/tests/commands/test_evaluate_contract_red_flag.py b/country/tests/commands/test_evaluate_contract_red_flag.py index fe8148b..f8b254a 100644 --- a/country/tests/commands/test_evaluate_contract_red_flag.py +++ b/country/tests/commands/test_evaluate_contract_red_flag.py @@ -50,7 +50,7 @@ def test_without_country_code(self): call_command("evaluate_contract_red_flag") def test_with_country_code(self): - self.assertEquals(call_command("evaluate_contract_red_flag", "mx"), None) + self.assertEquals(call_command("evaluate_contract_red_flag", "MX"), None) def test_with_country_wrong_code(self): self.assertEquals(call_command("evaluate_contract_red_flag", "mxss"), None) diff --git a/visualization/helpers/scheduler.py b/visualization/helpers/scheduler.py new file mode 100644 index 0000000..8fe1511 --- /dev/null +++ b/visualization/helpers/scheduler.py @@ -0,0 +1,113 @@ +from datetime import datetime + +from celery import Celery + +from country.tasks import ( + clear_redflag, + country_contract_excel, + delete_dataset, + evaluate_contract_red_flag, + evaluate_country_buyer, + evaluate_country_supplier, + fetch_covid_data, + fetch_equity_data, + fill_contract_values, + import_tender_from_batch_id, + local_currency_to_usd, + process_currency_conversion, + process_redflag, + store_in_temp_table, + summarize_buyer, + summarize_supplier, +) + + +class ScheduleRunner: + def __init__(self): + from datetime import datetime + + self.app = Celery() + self.instance = self.app.control.inspect() + self.scheduled_list = self.instance.scheduled() + self.scheduled_machine = list(self.scheduled_list.keys())[0] + # task_list = self.instance.registered_tasks()[self.scheduled_machine] + self.celery_task_list = { + "evaluate_country_supplier": evaluate_country_supplier, + "clear_redflag": clear_redflag, + "country_contract_excel": country_contract_excel, + "delete_dataset": delete_dataset, + "evaluate_contract_red_flag": evaluate_contract_red_flag, + "evaluate_country_buyer": evaluate_country_buyer, + "fetch_covid_data": fetch_covid_data, + "fetch_equity_data": fetch_equity_data, + "fill_contract_values": fill_contract_values, + "import_tender_from_batch_id": import_tender_from_batch_id, + "local_currency_to_usd": local_currency_to_usd, + "process_currency_conversion": process_currency_conversion, + "process_redflag": process_redflag, + "store_in_temp_table": store_in_temp_table, + "summarize_buyer": summarize_buyer, + "summarize_supplier": summarize_supplier, + } + self.datetime_now = datetime.now() + + def every_hour(self, interval): + dt = self.datetime_now + dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0) + dt = dt_start_of_hour + datetime.timedelta(hours=int(interval)) + return dt + + def round_minute(self, interval): + minute = self.datetime_now.minute + base_number = int(str(minute)[0] + "0") + tmp = base_number + interval_list = [] + + while tmp <= minute <= 60: + tmp += interval + interval_list.append(tmp) + + dt = self.datetime_now + dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0) + return dt_start_of_hour + datetime.timedelta(minutes=interval_list[-1]) + + # task_scheduler(task_name='evaluate_country_supplier',interval_name='every_hour',interval=1,'MX') + def task_scheduler(self, task_name, interval_name, interval, country_alpha_code=None): + function_name = getattr(ScheduleRunner, interval_name) + nearest_hour = function_name(self, interval) + if bool(self.scheduled_list): + if len(self.scheduled_list[self.scheduled_machine]) == 0: + print(f"Task created for time :{nearest_hour} {self.celery_task_list[task_name]}") + if country_alpha_code: + self.celery_task_list[task_name].apply_async( + args=(country_alpha_code,), queue="covid19", eta=nearest_hour + ) + else: + self.celery_task_list[task_name].apply_async(queue="covid19", eta=nearest_hour) + return "Done" + else: + country = [country_alpha_code] + user_data = {"task": task_name, "country": country} + task_list = [] + for task in self.scheduled_list[self.scheduled_machine]: + task_name = task["request"]["name"] + country = task["request"]["args"] + data = {"task": task_name, "country": country} + if data not in task_list: + task_list.append(data) + + if user_data not in task_list: + print(f"Task created for time :{nearest_hour} {self.celery_task_list[task_name]}") + if country_alpha_code: + self.celery_task_list[task_name].apply_async( + args=(country_alpha_code,), queue="covid19", eta=nearest_hour + ) + else: + self.celery_task_list[task_name].apply_async(queue="covid19", eta=nearest_hour) + return "Done" + else: + print("Already in queue") + return "Exited" + else: + print("Celery queue not executed!!") + return "Error"