Skip to content

Commit

Permalink
Merge pull request #91 from jonodrew/set-unmatched-bonus
Browse files Browse the repository at this point in the history
Change task to allow a value for unmatched bonus
  • Loading branch information
jonodrew committed Jun 19, 2022
2 parents 49a80ea + 5aaaa6b commit 058d2c2
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.2.0
current_version = 2.3.0
commit = True
tag = True

Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Users can now edit the weightings, but only for specific attributes, and for the pre-existing calculation

## [2.3.0] - 2022-06-19

### Changed
- The system now uses pickle under the hood, so please be careful - if you've not secured the connections between
the machines you're running this on you could really get yourself into trouble.
- However, it has made it significantly faster - a single matching exercise is now down to just 40s, from a best of
97s when we were using JSON to serialize data.

### Added

- This is the big one. We've added functionality that will creep up an `UnmatchedBonus`. This functionality is
useful if you want to ensure everyone gets at least one mentor. It calculates a lot of values - one client is
calculating 37 different iterations of a three-round program, requiring 111 rounds of matching - so it takes a bit
longer to calculate. Exposing this functionality in the front end will be patched very soon, but in the meantime
dig around in the routes section or add a `"pairing": True` key-value pair to your JSON call to the appropriate
endpoint.
- Given the huge amount of processing happening, this functionality takes a lot longer than you're expecting. It's
enough time to make several cups of tea - on my hardware, it's clocking in at around 7 or 8 minutes. That's a long
time to stare at the same screen. We'll be updating the frontend to give more feedback soon, but for the moment,
either check the logs from celery or accept that you'll be here a little while.
- A note about the approach: I could have built a system that iterated over potential outcomes sequentially,
stopping when it got to the approach that scored above a specific threshold. I see two problems with this. First,
assuming that each matching process takes _n_ seconds, in the worst case iterating upwards takes _Mn_ seconds. In
the best case, of course, it takes _n_ seconds!
- My approach batches up the number of approaches into chunks of ten (_M_/10) that are done simultaneously (_Mn_/10).
This is therefore generally faster, although not in the case where the first outcome is the one we want. Given
that I can't predict things will be perfect every time, I've opted for the apparently longer approach.

## [2.2.0] - 2022-05-26

### Changed
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM python:3.9-slim-bullseye AS parent
MAINTAINER CS LGBTQ+
COPY ./app /app
COPY ./requirements.txt /requirements.txt
RUN pip install -r requirements.txt
COPY ./app /app

FROM parent AS web
CMD ["gunicorn", "app:create_app()"]
Expand Down
11 changes: 8 additions & 3 deletions app/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CSPerson(Person):
def __init__(self, **kwargs):
self.biography = kwargs.get("biography")
self.both = True if kwargs.get("both mentor and mentee") == "yes" else False
self.map_input_to_model(kwargs)
kwargs = self.map_input_to_model(kwargs)
super(CSPerson, self).__init__(**kwargs)
self._connections: list[CSPerson] = []

Expand All @@ -44,18 +44,23 @@ def val_grade_to_str(cls, grade: int):
def connections(self) -> list["CSPerson"]:
return self._connections

@connections.setter
def connections(self, new_connections):
self._connections = new_connections

@staticmethod
def map_input_to_model(data: dict):
data["role"] = data["job title"]
data["email"] = data["email address"]
data["email"] = data.get("email address", data.get("email"))
data["grade"] = CSPerson.str_grade_to_val(data.get("grade", ""))
return data

def map_model_to_output(self, data: dict):
data["job title"] = data.pop("role")
data["email address"] = data.pop("email")
data["grade"] = self.val_grade_to_str(int(data.get("grade", "0")))
data["biography"] = self.biography
data["both mentor and mentee"] = "yes" if self.both else "no"
return data

def to_dict_for_output(self, depth=1) -> dict:
return {
Expand Down
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ class TestConfig(Config):
"interval_max": 0.5,
}
if os.environ.get("REDIS_URL") is None:
os.environ["REDIS_URL"] = "redis@redis"
os.environ["REDIS_URL"] = "redis://localhost:6379/0"
7 changes: 5 additions & 2 deletions app/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

from celery import Celery

celery = Celery(
"app",
celery_app = Celery(
"celery_app",
backend=os.environ["REDIS_URL"],
broker=os.environ["REDIS_URL"],
include=["app.tasks.tasks"],
accept_content=["pickle", "json"],
task_serializer="pickle",
result_serializer="pickle",
)
10 changes: 5 additions & 5 deletions app/helpers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import csv
import functools
import math
import operator
import pathlib
import string
import random
from matching.rules.rule import AbstractRule

import matching.rules.rule as rl


Expand Down Expand Up @@ -51,6 +52,7 @@ def random_string():
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))


@functools.lru_cache
def known_file(path_to_file, role_type: str, quantity=50):
padding_size = int(math.log10(quantity)) + 1
pathlib.Path(path_to_file).mkdir(parents=True, exist_ok=True)
Expand All @@ -77,12 +79,11 @@ def known_data(role_type: str):
"grade": "EO" if role_type == "mentor" else "AA",
"organisation": f"Department of {role_type.capitalize()}s",
"biography": "Test biography",
"profession": "Policy",
}
if role_type == "mentor":
data["profession"] = "Policy"
data["characteristics"] = "bisexual, transgender"
elif role_type == "mentee":
data["target profession"] = "Policy"
data["match with similar identity"] = "yes"
data["identity to match"] = "bisexual"
else:
Expand Down Expand Up @@ -172,7 +173,7 @@ def random_file(role_type: str, quantity: int = 50):
file_writer.writerows(rows)


def base_rules() -> list[AbstractRule]:
def base_rules() -> list[rl.Rule]:
return [
rl.Disqualify(
lambda match: match.mentee.organisation == match.mentor.organisation
Expand All @@ -192,5 +193,4 @@ def base_rules() -> list[AbstractRule]:
lambda match: match.mentee.characteristic in match.mentor.characteristics
and match.mentee.characteristic != "",
),
rl.UnmatchedBonus(6),
]
62 changes: 31 additions & 31 deletions app/main/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
)
import pathlib
import shutil

from werkzeug.utils import secure_filename, redirect

from app.classes import CSMentor, CSMentee, CSParticipantFactory
from app.extensions import celery
from app.classes import CSMentor, CSMentee
from app.extensions import celery_app
from app.main import main_bp
from app.helpers import valid_files, random_string
from app.tasks.tasks import async_process_data, delete_mailing_lists_after_period
from app.tasks.tasks import (
async_process_data,
delete_mailing_lists_after_period,
)
from app.tasks.helpers import most_mentees_with_at_least_one_mentor
from matching.process import create_participant_list_from_path, create_mailing_list


Expand Down Expand Up @@ -107,8 +110,15 @@ def delete_files(response):

@main_bp.route("/tasks", methods=["POST"])
def run_task():
"""
This route only accepts JSON encoded requests
"""
current_app.logger.debug(request.get_json())
data_folder = request.get_json()["data_folder"]
try:
optimise_for_pairing = request.get_json()["pairing"]
except KeyError:
optimise_for_pairing = False
folder = pathlib.Path(
os.path.join(current_app.config["UPLOAD_FOLDER"], data_folder)
)
Expand All @@ -118,23 +128,18 @@ def delete_upload(response):
shutil.rmtree(folder)
return response

mentors = [
mentor.to_dict()
for mentor in create_participant_list_from_path(
CSMentor,
path_to_data=folder,
)
]
mentees = [
mentee.to_dict()
for mentee in create_participant_list_from_path(
CSMentee,
path_to_data=folder,
)
]
task = async_process_data.delay(
(mentors, mentees),
mentors = create_participant_list_from_path(
CSMentor,
path_to_data=folder,
)
mentees = create_participant_list_from_path(
CSMentee,
path_to_data=folder,
)
if optimise_for_pairing:
task = most_mentees_with_at_least_one_mentor(mentors, mentees)
else:
task = async_process_data.delay(mentors, mentees)
return jsonify(task_id=task.id), 202


Expand All @@ -145,19 +150,15 @@ def get_status(task_id):
matched participants as JSON formatted data. This data is then fed into the `create_mailing_list` function,
where the mailing lists are saved to a folder that corresponds to the `task_id`.
"""
task_result = celery.AsyncResult(task_id)
task_result = celery_app.AsyncResult(task_id)
result = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": "processing",
}
if task_result.status == "SUCCESS":
if task_result.ready():
outputs = {}
for matched_participant_list in task_result.result:
participants = [
CSParticipantFactory.create_from_dict(participant_dict)
for participant_dict in matched_participant_list
]
for participants in task_result.result[:2]:
participant_class = participants[0].class_name()
connections = [len(p.connections) for p in participants]
outputs[participant_class] = {
Expand All @@ -169,10 +170,9 @@ def get_status(task_id):
os.path.join(current_app.config["UPLOAD_FOLDER"], task_id)
),
)
result["task_result"] = url_for(
"main.download", task_id=task_id, count_data=json.dumps(outputs)
)
current_app.logger.debug(outputs)
result["task_result"] = url_for(
"main.download", task_id=task_id, count_data=json.dumps(outputs)
)
return result, 200


Expand Down
12 changes: 6 additions & 6 deletions app/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from app.extensions import celery
from app.extensions import celery_app


def make_celery(app):
celery.conf.update(app.config)
celery.conf.imports = ("app.tasks.tasks",)
celery_app.conf.update(app.config)
celery_app.conf.imports = ("app.tasks.tasks",)

class ContextTask(celery.Task):
class ContextTask(celery_app.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery
celery_app.Task = ContextTask
return celery_app
18 changes: 18 additions & 0 deletions app/tasks/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from copy import deepcopy

import celery
from celery.result import AsyncResult
from app.tasks.tasks import async_process_data, find_best_output
from app.classes import CSMentor, CSMentee
from app.helpers import base_rules


def most_mentees_with_at_least_one_mentor(
mentors: list[CSMentor], mentees: list[CSMentee]
) -> AsyncResult:
max_score = sum(rule.results.get(True) for rule in base_rules())
copies = ((deepcopy(mentors), deepcopy(mentees), i) for i in range(max_score))
task = celery.chord(
(async_process_data.si(*data) for data in copies), find_best_output.s()
)()
return task
56 changes: 41 additions & 15 deletions app/tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,58 @@
import functools
import os

import sys
import requests
from typing import Tuple, List
from app.extensions import celery
from typing import Tuple, List, Sequence

from app.classes import CSMentor, CSMentee
from app.extensions import celery_app as celery_app
from matching import process
from app.classes import CSParticipantFactory
from app.helpers import base_rules
from matching.rules.rule import UnmatchedBonus

sys.setrecursionlimit(10000)


@celery.task(name="async_process_data", bind=True)
@celery_app.task(name="async_process_data", bind=True)
def async_process_data(
self,
data_to_process: Tuple[List[dict], List[dict]],
) -> Tuple[List[dict], List[dict]]:
mentor_data, mentee_data = data_to_process
mentors = map(CSParticipantFactory.create_from_dict, mentor_data)
mentees = map(CSParticipantFactory.create_from_dict, mentee_data)
mentors,
mentees,
unmatched_bonus: int = 6,
) -> Tuple[List[CSMentor], List[CSMentee], int]:
all_rules = [base_rules() for _ in range(3)]
for ruleset in all_rules:
ruleset.append(UnmatchedBonus(unmatched_bonus))
matched_mentors, matched_mentees = process.process_data(
list(mentors), list(mentees), all_rules=all_rules
)
matched_as_dict = [participant.to_dict() for participant in matched_mentors], [
participant.to_dict() for participant in matched_mentees
]
return matched_as_dict
return matched_mentors, matched_mentees, unmatched_bonus


@celery_app.task
def find_best_output(
group_result: Sequence[tuple[list[CSMentor], list[CSMentee], int]]
) -> tuple[list[CSMentor], list[CSMentee], int]:
highest_count = (0, 0)
best_outcome = group_result[0]
for participant_tuple in group_result:
mentors, mentees, unmatched_bonus = participant_tuple
one_connection_min_func = functools.partial(
map, lambda participant: len(participant.connections) > 0
)
current_count = (
sum(one_connection_min_func(mentors)),
sum(one_connection_min_func(mentees)),
)
if all(
current > highest for current, highest in zip(current_count, highest_count) # type: ignore
):
best_outcome = participant_tuple
highest_count = current_count # type: ignore
return best_outcome


@celery.task(name="delete_mailing_lists_after_period", bind=True)
@celery_app.task(name="delete_mailing_lists_after_period", bind=True)
def delete_mailing_lists_after_period(self, task_id: str):
url = f"{os.environ.get('SERVICE_URL', 'http://app:5000')}/tasks/{task_id}"
return requests.delete(url).status_code
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ services:
command: flask run -h 0.0.0.0

worker:
user: nobody
build:
context: "."
target: worker
command: celery --app=app.extensions.celery worker --loglevel=info
command: celery --app=app.extensions.celery_app worker --loglevel=info

environment:
- FLASK_ENV=development
Expand Down

0 comments on commit 058d2c2

Please sign in to comment.