diff --git a/dags/aeon_to_lobby.py b/dags/aeon_to_lobby.py index 077cb0f1b..16883b0cc 100644 --- a/dags/aeon_to_lobby.py +++ b/dags/aeon_to_lobby.py @@ -5,7 +5,9 @@ from airflow.operators.python import PythonOperator from airflow.models import Variable -from plugins.aeon_to_lobby.aeon import user_data +from plugins.aeon_to_lobby.aeon import ( + user_transaction_data, route_aeon_post, filtered_users +) from plugins.aeon_to_lobby.lobbytrack import lobby_post @@ -23,8 +25,8 @@ def transform_data(*args, **kwargs): lobby_users = [] task_instance = kwargs["task_instance"] aeon_users = task_instance.xcom_pull( - key="return_value", task_ids="get_user_data_from_aeon" - ) + key="return_value", task_ids="get_user_transaction_data_from_aeon" + ) # [['aeonuser2', 111], ['aeonuser2', 222]] for aeon_user in aeon_users: # map keys and values for user @@ -69,13 +71,18 @@ def transform_data(*args, **kwargs): ) as dag: aeon_user_data = PythonOperator( - task_id="get_user_data_from_aeon", python_callable=user_data, + task_id="get_user_transaction_data_from_aeon", python_callable=user_transaction_data, op_kwargs={ "aeon_url": Variable.get("AEON_URL"), "aeon_key": Variable.get("AEON_KEY"), + "queue_id": Variable.get("SOURCE_QUEUE_ID") } ) + filtered_user_data = PythonOperator( + task_id="filter_aeon_user_data", python_callable=filtered_users + ) + transform_to_lobby_data = PythonOperator( task_id="transform_aeon_data_to_lobby_json", python_callable=transform_data ) @@ -84,5 +91,16 @@ def transform_data(*args, **kwargs): task_id="post_to_lobbytrack", python_callable=lobby_post ) + route_aeon_post = PythonOperator( + task_id="route_aeon_post", python_callable=route_aeon_post, + op_kwargs={ + "aeon_url": Variable.get("AEON_URL"), + "aeon_key": Variable.get("AEON_KEY"), + "queue_id": Variable.get("SOURCE_QUEUE_ID"), + "final_queue": Variable.get("FINAL_QUEUE") + } + ) + -aeon_user_data >> transform_to_lobby_data >> post_to_lobbytrack +aeon_user_data >> route_aeon_post +aeon_user_data >> filtered_user_data >> transform_to_lobby_data >> post_to_lobbytrack diff --git a/plugins/aeon_to_lobby/aeon.py b/plugins/aeon_to_lobby/aeon.py index 273e3d3da..bbb5b56cc 100644 --- a/plugins/aeon_to_lobby/aeon.py +++ b/plugins/aeon_to_lobby/aeon.py @@ -4,45 +4,95 @@ import requests -def user_data(**kwargs): +def user_transaction_data(**kwargs): users = [] - for username in user_requests_in_queue(**kwargs): - user = aeon_user(**kwargs, user=username) - users.append(user) + queue_users = user_requests_in_queue(**kwargs) + for user_transaction in queue_users: # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]] + users.append(user_transaction[0]) return users +def filtered_users(**kwargs): + users = [] + task_instance = kwargs["task_instance"] + queue_users = task_instance.xcom_pull( + key="return_value", task_ids="get_user_transaction_data_from_aeon" + ) # ['aeonuser1@stanford.edu', 'aeonuser1@stanford.edu', 'aesonuser2@gmail.com'] + for user in queue_users: + if "@stanford.edu" not in user: + user = aeon_user(**kwargs, user=user) + logging.info(f"Adding {user}") + users.append(user) + else: + logging.info(f"Skipping {user}") + + return users + + +def route_aeon_post(**kwargs): + task_instance = kwargs["task_instance"] + aeon_data = task_instance.xcom_pull( + key="return_value", task_ids="get_user_transaction_data_from_aeon" + ) # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]] + aeon_url = kwargs["aeon_url"] + queue = kwargs["final_queue"] + aeon_headers = {"X-AEON-API-KEY": kwargs["aeon_key"], "Accept": "application/json"} + + responses = [] + for user_transactions in aeon_data: + id = user_transactions[1] + logging.info(f"Routing transactionNumber {id} : {aeon_url}/Requests/{id}/route") + logging.info({"newStatus": {queue}}) + + response = requests.post(f"{aeon_url}/Requests/{id}/route", headers=aeon_headers, json={"newStatus": queue}) + + if response.status_code != 200: + logging.error(f"aeon rsponded with: {response.status_code}, {response.text}") + return None + + responses.append(response.json()) + + return responses + + def user_requests_in_queue(**kwargs): - usernames = [] - data = queue_requests(**kwargs, id=1) + result = [] + data = queue_requests(**kwargs, queue=kwargs['queue_id']) today = datetime.today().strftime("%Y-%m-%d") tree = objectpath.Tree(data) - result = tree.execute(f"$.*[@.creationDate >= '{today}'].username") - for entry in result: - usernames.append(entry) + generator = tree.execute(f"$.*[@.creationDate >= '{today}']") + + for entry in generator: + result.append([entry['username'], entry['transactionNumber']]) + + if len(result) < 1: + logging.info(f"No entries in queue_requests at this time: {today}") - return usernames + return result def aeon_user(**kwargs): aeon_user = kwargs["user"] aeon_url = kwargs["aeon_url"] - return aeon_request(**kwargs, url=f"{aeon_url}/Users/{aeon_user}") + return aeon_get(**kwargs, url=f"{aeon_url}/Users/{aeon_user}") def queue_requests(**kwargs): - queue = kwargs["id"] + queue = kwargs["queue"] aeon_url = kwargs["aeon_url"] - return aeon_request(**kwargs, url=f"{aeon_url}/Queues/{queue}/requests") + return aeon_get(**kwargs, url=f"{aeon_url}/Queues/{queue}/requests") -def aeon_request(**kwargs): +def aeon_get(**kwargs): aeon_key = kwargs["aeon_key"] + url = kwargs["url"] aeon_headers = {"X-AEON-API-KEY": aeon_key, "Accept": "application/json"} - response = requests.get(kwargs["url"], headers=aeon_headers) + response = requests.get(url, headers=aeon_headers) + logging.info(f"aeon_get: {url} : {response.status_code}") + logging.info(response.json()) if response.status_code != 200: logging.error(f"aeon rsponded with: {response.status_code}, {response.text}") diff --git a/plugins/aeon_to_lobby/lobbytrack.py b/plugins/aeon_to_lobby/lobbytrack.py index 71ad8474c..175a76423 100644 --- a/plugins/aeon_to_lobby/lobbytrack.py +++ b/plugins/aeon_to_lobby/lobbytrack.py @@ -18,13 +18,18 @@ def lobby_post(**kwargs): key="return_value", task_ids="transform_aeon_data_to_lobby_json" ) + responses = [] + for user in lobby_users: + logging.info(user) response = requests.post(lobby_url, headers=lobby_headers, json=user) - if response.status_code != 200: - logging.error( - "lobbytrack api rsponded with:" f"{response.status_code}, {response.text}" - ) - response.raise_for_status() + if response.status_code != 200: + logging.error( + "lobbytrack api rsponded with:" f"{response.status_code}, {response.text}" + ) + response.raise_for_status() + + responses.append(response) - return response.json() + return responses diff --git a/plugins/tests/test_aeon_to_lobby.py b/plugins/tests/test_aeon_to_lobby.py index 31c52f6fd..9d51ae244 100644 --- a/plugins/tests/test_aeon_to_lobby.py +++ b/plugins/tests/test_aeon_to_lobby.py @@ -29,24 +29,32 @@ def mock_aeon_queue_data(): { "transactionNumber": 0, "creationDate": today, - "username": "aeonuser1", + "username": "aeonuser1@stanford.edu", }, { "transactionNumber": 1, "creationDate": today, - "username": "aeonuser2", + "username": "aeonuser1@stanford.edu", + }, + { + "transactionNumber": 2, + "creationDate": today, + "username": "aeonuser2@gmail.com", } ] -# Mock xcom -def mock_xcom_pull_user_data(*args, **kwargs): +def mock_aeon_route_post_response(): + return {"transactionNumber": 2, "username": "aeonuser2@gmail.com"} + + +def mock_xcom_pull_user_data(**kwargs): return [ { - "username": "aeonuser", + "username": "aeonuser2@gmail.com", "lastName": "User", "firstName": "Aeon", - "eMailAddress": "aeonu@mail.edu", + "eMailAddress": "aeonuser2@gmail.com", "phone": "999-999-9999", "address": "123 Charm St", "address2": "Apt A", @@ -58,10 +66,26 @@ def mock_xcom_pull_user_data(*args, **kwargs): ] -class MockTaskInstance(pydantic.BaseModel): +def mock_xcom_pull_aeon_users(**kwargs): + return ['aeonuser1@stanford.edu', 'aeonuser1@stanford.edu', 'aesonuser2@gmail.com'] + + +def mock_xcom_pull_aeon_users_transactions(**kwargs): + return [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ['aesonuser2@gmail.com', 2]] + + +class MockTaskInstanceUserData(pydantic.BaseModel): xcom_pull = mock_xcom_pull_user_data +class MockTaskInstanceAeonUsers(pydantic.BaseModel): + xcom_pull = mock_xcom_pull_aeon_users + + +class MockTaskInstanceAeonUsersTransactions(pydantic.BaseModel): + xcom_pull = mock_xcom_pull_aeon_users_transactions + + @pytest.fixture def mock_queue_requests(monkeypatch, mocker: MockerFixture): def mock_get_queue_data(*args, **kwargs): @@ -74,31 +98,68 @@ def mock_get_queue_data(*args, **kwargs): monkeypatch.setattr(requests, "get", mock_get_queue_data) +@pytest.fixture +def mock_aeon_post(monkeypatch, mocker: MockerFixture): + def mock_post_transaction_routing(*args, **kwargs): + post_response = mocker.stub() + post_response.status_code = 200 + post_response.json = mock_aeon_route_post_response + + return post_response + + monkeypatch.setattr(requests, "post", mock_post_transaction_routing) + + def test_user_data(mock_queue_requests, mock_aeon_variable): - from plugins.aeon_to_lobby.aeon import user_data - user_data = user_data(aeon_url="https://aeon.example.com", aeon_key="123") + from plugins.aeon_to_lobby.aeon import user_transaction_data + user_data = user_transaction_data( + aeon_url="https://aeon.example.us", aeon_key="123", queue_id="1" + ) - assert lambda: user_data[0] == mock_aeon_queue_data[0] + # assert lambda: user_data[0] == mock_aeon_queue_data[0] + assert lambda: user_data[0] == ["aeonuser2@gmail.com", 2] + assert len(user_data) == 3 -def test_aeon_request_params(caplog): - from plugins.aeon_to_lobby.aeon import aeon_user # noqa +def test_filtered_user_data(mock_queue_requests, mock_aeon_variable): + from plugins.aeon_to_lobby.aeon import filtered_users + filtered_users = filtered_users( + aeon_url="https://aeon.example.us", aeon_key="123", task_instance=MockTaskInstanceAeonUsers + ) - assert "aeon rsponded with" not in caplog.text + for user in filtered_users: + assert "@stanford.edu" not in user def test_find_user_from_request_queue(mock_queue_requests, mock_aeon_variable): from plugins.aeon_to_lobby.aeon import user_requests_in_queue - user_list = user_requests_in_queue(aeon_url="https://aeon.example.com", aeon_key="123") + user_list = user_requests_in_queue( + aeon_url="https://aeon.example.us", aeon_key="123", queue_id="1" + ) - assert user_list == ['aeonuser1', 'aeonuser2'] + assert user_list == [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ['aeonuser2@gmail.com', 2]] def test_transform_data(mock_lobby_variable): from dags.aeon_to_lobby import transform_data - lobby_users = transform_data(task_instance=MockTaskInstance) + lobby_users = transform_data(task_instance=MockTaskInstanceUserData) assert lobby_users[0]['LastName'] == "User" + assert lobby_users[0]['Email'] == "aeonuser2@gmail.com" assert lobby_users[0]['CustomFields'][0]['Name'] == "Address (Street)" assert lobby_users[0]['CustomFields'][0]['Value'] == "123 Charm St, Apt A" + + +def test_route_aeon_post(mock_queue_requests, mock_aeon_variable, mock_aeon_post, caplog): + from plugins.aeon_to_lobby.aeon import route_aeon_post + route_aeon_post( + aeon_url="https://aeon.example.us", + aeon_key="123", + queue_id="1", + final_queue="Awaiting Request Processing", + task_instance=MockTaskInstanceAeonUsersTransactions + ) + + assert "{'newStatus': {'Awaiting Request Processing'}}" in caplog.text + assert "aeon rsponded with" not in caplog.text