Skip to content
This repository has been archived by the owner on Dec 13, 2022. It is now read-only.

Commit

Permalink
CPA-297 Handling custom jobs
Browse files Browse the repository at this point in the history
[x] Created custom jobs for red_flag, summarize_supplier and summarize_buyer
[x] Added new ScheduleRunner class in schedule.py
  • Loading branch information
Suyoj Man Tamrakar committed May 26, 2021
1 parent 1586a4f commit 959cfc4
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 45 deletions.
41 changes: 12 additions & 29 deletions country/management/commands/evaluate_contract_red_flag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.core.management.base import BaseCommand

from country.models import Country, Tender
from country.tasks import clear_redflag, process_redflag, process_redflag6, process_redflag7
from country.models import Country
from country.tasks import evaluate_contract_red_flag


class Command(BaseCommand):
Expand All @@ -10,31 +10,14 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("country", nargs="+", type=str)

def handle(self, *args, **options):
country_code = options["country"][0]
country_code = str(country_code).upper()
country_list = Country.objects.exclude(country_code_alpha_2="gl").values_list(
"country_code_alpha_2", flat=True
def handle(self, *args, **kwargs):
country_alpha_code = kwargs["country"]
try:
Country.objects.get(country_code_alpha_2=country_alpha_code)
except Exception:
return self.stdout.write("Country alpha code doesnt exist")
evaluate_contract_red_flag.apply_async(
args=(country_alpha_code,),
queue="covid19",
)

if country_code not in country_list:
self.stderr.write("Country code is invalid.")
return

country = Country.objects.get(country_code_alpha_2=country_code)

self.stdout.write(f"Processing Red flag for {country.name}")

country_tenders = Tender.objects.filter(
country_id=country.id, supplier__isnull=False, buyer__isnull=False
).values("id", "buyer__buyer_name", "supplier__supplier_name", "supplier__supplier_address")

for tender in country_tenders:
tender_id = tender["id"]
self.stdout.write("Created task for id :" + str(tender_id))
clear_redflag.apply_async(args=(tender_id,), queue="covid19")
process_redflag6.apply_async(args=(tender_id,), queue="covid19")
process_redflag7.apply_async(args=(tender_id,), queue="covid19")
process_redflag.apply_async(args=(tender_id,), queue="covid19")

self.stdout.write("Done")
return "Done"
12 changes: 5 additions & 7 deletions country/management/commands/summarize_country_buyer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.core.management.base import BaseCommand

from country.models import Buyer, Country
from country.tasks import summarize_buyer
from country.models import Country
from country.tasks import evaluate_country_buyer


class Command(BaseCommand):
Expand All @@ -11,10 +11,8 @@ def add_arguments(self, parser):
def handle(self, *args, **kwargs):
country_alpha_code = kwargs["country"]
try:
country = Country.objects.get(country_code_alpha_2=country_alpha_code)
Country.objects.get(country_code_alpha_2=country_alpha_code)
except Exception:
return self.stdout.write("Country alpha code doesnt exist")
buyers = Buyer.objects.filter(tenders__country=country)
for buyer in buyers:
self.stdout.write("Created tasks for buyer_id" + str(buyer.id))
summarize_buyer.apply_async(args=(buyer.id,), queue="covid19")
evaluate_country_buyer.apply_async(args=(country_alpha_code,), queue="covid19")
return "Done"
12 changes: 5 additions & 7 deletions country/management/commands/summarize_country_supplier.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from django.core.management.base import BaseCommand

from country.models import Country, Supplier
from country.tasks import summarize_supplier
from country.models import Country
from country.tasks import evaluate_country_supplier


class Command(BaseCommand):
Expand All @@ -11,10 +11,8 @@ def add_arguments(self, parser):
def handle(self, *args, **kwargs):
country_alpha_code = kwargs["country"]
try:
country = Country.objects.get(country_code_alpha_2=country_alpha_code)
Country.objects.get(country_code_alpha_2=country_alpha_code)
except Exception:
return self.stdout.write("Country alpha code doesnt exist")
suppliers = Supplier.objects.filter(tenders__country=country)
for supplier in suppliers:
self.stdout.write("Created tasks for supplier_id" + str(supplier.id))
summarize_supplier.apply_async(args=(supplier.id,), queue="covid19")
evaluate_country_supplier.apply_async(args=(country_alpha_code,), queue="covid19")
return "Done"
28 changes: 27 additions & 1 deletion country/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime
import os
import sys
import traceback
from collections import defaultdict
from datetime import datetime
from pathlib import Path

import dateutil.parser
Expand Down Expand Up @@ -1107,3 +1107,29 @@ def summarize_supplier(supplier_id):
except Exception as e:
return e
return "Completed"


@app.task(name="evaluate_country_buyer")
def evaluate_country_buyer(country_code):
buyers = Buyer.objects.filter(country__country_code_alpha_2=country_code)
for buyer in buyers:
summarize_buyer(buyer.id)


@app.task(name="evaluate_country_supplier")
def evaluate_country_supplier(country_code):
suppliers = Supplier.objects.filter(country__country_code_alpha_2=country_code)
for supplier in suppliers:
summarize_supplier(supplier.id)


@app.task(name="evaluate_contract_red_flag")
def evaluate_contract_red_flag(country_code):
country = Country.objects.get(country_code_alpha_2=country_code)
country_tenders = Tender.objects.filter(country_id=country.id, supplier__isnull=False, buyer__isnull=False)
for tender in country_tenders:
tender_id = tender.id
clear_redflag.apply_async(args=(tender_id,), queue="covid19")
process_redflag6.apply_async(args=(tender_id,), queue="covid19")
process_redflag7.apply_async(args=(tender_id,), queue="covid19")
process_redflag.apply_async(args=(tender_id,), queue="covid19")
2 changes: 1 addition & 1 deletion country/tests/commands/test_evaluate_contract_red_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def test_without_country_code(self):
call_command("evaluate_contract_red_flag")

def test_with_country_code(self):
self.assertEquals(call_command("evaluate_contract_red_flag", "mx"), None)
self.assertEquals(call_command("evaluate_contract_red_flag", "MX"), None)

def test_with_country_wrong_code(self):
self.assertEquals(call_command("evaluate_contract_red_flag", "mxss"), None)
113 changes: 113 additions & 0 deletions visualization/helpers/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from datetime import datetime

from celery import Celery

from country.tasks import (
clear_redflag,
country_contract_excel,
delete_dataset,
evaluate_contract_red_flag,
evaluate_country_buyer,
evaluate_country_supplier,
fetch_covid_data,
fetch_equity_data,
fill_contract_values,
import_tender_from_batch_id,
local_currency_to_usd,
process_currency_conversion,
process_redflag,
store_in_temp_table,
summarize_buyer,
summarize_supplier,
)


class ScheduleRunner:
def __init__(self):
from datetime import datetime

self.app = Celery()
self.instance = self.app.control.inspect()
self.scheduled_list = self.instance.scheduled()
self.scheduled_machine = list(self.scheduled_list.keys())[0]
# task_list = self.instance.registered_tasks()[self.scheduled_machine]
self.celery_task_list = {
"evaluate_country_supplier": evaluate_country_supplier,
"clear_redflag": clear_redflag,
"country_contract_excel": country_contract_excel,
"delete_dataset": delete_dataset,
"evaluate_contract_red_flag": evaluate_contract_red_flag,
"evaluate_country_buyer": evaluate_country_buyer,
"fetch_covid_data": fetch_covid_data,
"fetch_equity_data": fetch_equity_data,
"fill_contract_values": fill_contract_values,
"import_tender_from_batch_id": import_tender_from_batch_id,
"local_currency_to_usd": local_currency_to_usd,
"process_currency_conversion": process_currency_conversion,
"process_redflag": process_redflag,
"store_in_temp_table": store_in_temp_table,
"summarize_buyer": summarize_buyer,
"summarize_supplier": summarize_supplier,
}
self.datetime_now = datetime.now()

def every_hour(self, interval):
dt = self.datetime_now
dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0)
dt = dt_start_of_hour + datetime.timedelta(hours=int(interval))
return dt

def round_minute(self, interval):
minute = self.datetime_now.minute
base_number = int(str(minute)[0] + "0")
tmp = base_number
interval_list = []

while tmp <= minute <= 60:
tmp += interval
interval_list.append(tmp)

dt = self.datetime_now
dt_start_of_hour = dt.replace(minute=0, second=0, microsecond=0)
return dt_start_of_hour + datetime.timedelta(minutes=interval_list[-1])

# task_scheduler(task_name='evaluate_country_supplier',interval_name='every_hour',interval=1,'MX')
def task_scheduler(self, task_name, interval_name, interval, country_alpha_code=None):
function_name = getattr(ScheduleRunner, interval_name)
nearest_hour = function_name(self, interval)
if bool(self.scheduled_list):
if len(self.scheduled_list[self.scheduled_machine]) == 0:
print(f"Task created for time :{nearest_hour} {self.celery_task_list[task_name]}")
if country_alpha_code:
self.celery_task_list[task_name].apply_async(
args=(country_alpha_code,), queue="covid19", eta=nearest_hour
)
else:
self.celery_task_list[task_name].apply_async(queue="covid19", eta=nearest_hour)
return "Done"
else:
country = [country_alpha_code]
user_data = {"task": task_name, "country": country}
task_list = []
for task in self.scheduled_list[self.scheduled_machine]:
task_name = task["request"]["name"]
country = task["request"]["args"]
data = {"task": task_name, "country": country}
if data not in task_list:
task_list.append(data)

if user_data not in task_list:
print(f"Task created for time :{nearest_hour} {self.celery_task_list[task_name]}")
if country_alpha_code:
self.celery_task_list[task_name].apply_async(
args=(country_alpha_code,), queue="covid19", eta=nearest_hour
)
else:
self.celery_task_list[task_name].apply_async(queue="covid19", eta=nearest_hour)
return "Done"
else:
print("Already in queue")
return "Exited"
else:
print("Celery queue not executed!!")
return "Error"

0 comments on commit 959cfc4

Please sign in to comment.