Skip to content

Commit

Permalink
Merge 078006f into 517939a
Browse files Browse the repository at this point in the history
  • Loading branch information
jgreben committed Sep 20, 2022
2 parents 517939a + 078006f commit b2acf08
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 42 deletions.
28 changes: 23 additions & 5 deletions dags/aeon_to_lobby.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from plugins.aeon_to_lobby.aeon import user_data
from plugins.aeon_to_lobby.aeon import (
user_transaction_data, route_aeon_post, filtered_users
)
from plugins.aeon_to_lobby.lobbytrack import lobby_post


Expand All @@ -23,8 +25,8 @@ def transform_data(*args, **kwargs):
lobby_users = []
task_instance = kwargs["task_instance"]
aeon_users = task_instance.xcom_pull(
key="return_value", task_ids="get_user_data_from_aeon"
)
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # [['aeonuser2', 111], ['aeonuser2', 222]]

for aeon_user in aeon_users:
# map keys and values for user
Expand Down Expand Up @@ -69,13 +71,18 @@ def transform_data(*args, **kwargs):
) as dag:

aeon_user_data = PythonOperator(
task_id="get_user_data_from_aeon", python_callable=user_data,
task_id="get_user_transaction_data_from_aeon", python_callable=user_transaction_data,
op_kwargs={
"aeon_url": Variable.get("AEON_URL"),
"aeon_key": Variable.get("AEON_KEY"),
"queue_id": Variable.get("SOURCE_QUEUE_ID")
}
)

filtered_user_data = PythonOperator(
task_id="filter_aeon_user_data", python_callable=filtered_users
)

transform_to_lobby_data = PythonOperator(
task_id="transform_aeon_data_to_lobby_json", python_callable=transform_data
)
Expand All @@ -84,5 +91,16 @@ def transform_data(*args, **kwargs):
task_id="post_to_lobbytrack", python_callable=lobby_post
)

route_aeon_post = PythonOperator(
task_id="route_aeon_post", python_callable=route_aeon_post,
op_kwargs={
"aeon_url": Variable.get("AEON_URL"),
"aeon_key": Variable.get("AEON_KEY"),
"queue_id": Variable.get("SOURCE_QUEUE_ID"),
"final_queue": Variable.get("FINAL_QUEUE")
}
)


aeon_user_data >> transform_to_lobby_data >> post_to_lobbytrack
aeon_user_data >> route_aeon_post
aeon_user_data >> filtered_user_data >> transform_to_lobby_data >> post_to_lobbytrack
80 changes: 65 additions & 15 deletions plugins/aeon_to_lobby/aeon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,95 @@
import requests


def user_data(**kwargs):
def user_transaction_data(**kwargs):
users = []
for username in user_requests_in_queue(**kwargs):
user = aeon_user(**kwargs, user=username)
users.append(user)
queue_users = user_requests_in_queue(**kwargs)
for user_transaction in queue_users: # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]]
users.append(user_transaction[0])

return users


def filtered_users(**kwargs):
users = []
task_instance = kwargs["task_instance"]
queue_users = task_instance.xcom_pull(
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # ['aeonuser1@stanford.edu', 'aeonuser1@stanford.edu', 'aesonuser2@gmail.com']
for user in queue_users:
if "@stanford.edu" not in user:
user = aeon_user(**kwargs, user=user)
logging.info(f"Adding {user}")
users.append(user)
else:
logging.info(f"Skipping {user}")

return users


def route_aeon_post(**kwargs):
task_instance = kwargs["task_instance"]
aeon_data = task_instance.xcom_pull(
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]]
aeon_url = kwargs["aeon_url"]
queue = kwargs["final_queue"]
aeon_headers = {"X-AEON-API-KEY": kwargs["aeon_key"], "Accept": "application/json"}

responses = []
for user_transactions in aeon_data:
id = user_transactions[1]
logging.info(f"Routing transactionNumber {id} : {aeon_url}/Requests/{id}/route")
logging.info({"newStatus": {queue}})

response = requests.post(f"{aeon_url}/Requests/{id}/route", headers=aeon_headers, json={"newStatus": queue})

if response.status_code != 200:
logging.error(f"aeon rsponded with: {response.status_code}, {response.text}")
return None

responses.append(response.json())

return responses


def user_requests_in_queue(**kwargs):
usernames = []
data = queue_requests(**kwargs, id=1)
result = []
data = queue_requests(**kwargs, queue=kwargs['queue_id'])
today = datetime.today().strftime("%Y-%m-%d")
tree = objectpath.Tree(data)
result = tree.execute(f"$.*[@.creationDate >= '{today}'].username")
for entry in result:
usernames.append(entry)
generator = tree.execute(f"$.*[@.creationDate >= '{today}']")

for entry in generator:
result.append([entry['username'], entry['transactionNumber']])

if len(result) < 1:
logging.info(f"No entries in queue_requests at this time: {today}")

return usernames
return result


def aeon_user(**kwargs):
aeon_user = kwargs["user"]
aeon_url = kwargs["aeon_url"]

return aeon_request(**kwargs, url=f"{aeon_url}/Users/{aeon_user}")
return aeon_get(**kwargs, url=f"{aeon_url}/Users/{aeon_user}")


def queue_requests(**kwargs):
queue = kwargs["id"]
queue = kwargs["queue"]
aeon_url = kwargs["aeon_url"]

return aeon_request(**kwargs, url=f"{aeon_url}/Queues/{queue}/requests")
return aeon_get(**kwargs, url=f"{aeon_url}/Queues/{queue}/requests")


def aeon_request(**kwargs):
def aeon_get(**kwargs):
aeon_key = kwargs["aeon_key"]
url = kwargs["url"]
aeon_headers = {"X-AEON-API-KEY": aeon_key, "Accept": "application/json"}
response = requests.get(kwargs["url"], headers=aeon_headers)
response = requests.get(url, headers=aeon_headers)
logging.info(f"aeon_get: {url} : {response.status_code}")
logging.info(response.json())

if response.status_code != 200:
logging.error(f"aeon rsponded with: {response.status_code}, {response.text}")
Expand Down
17 changes: 11 additions & 6 deletions plugins/aeon_to_lobby/lobbytrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ def lobby_post(**kwargs):
key="return_value", task_ids="transform_aeon_data_to_lobby_json"
)

responses = []

for user in lobby_users:
logging.info(user)
response = requests.post(lobby_url, headers=lobby_headers, json=user)

if response.status_code != 200:
logging.error(
"lobbytrack api rsponded with:" f"{response.status_code}, {response.text}"
)
response.raise_for_status()
if response.status_code != 200:
logging.error(
"lobbytrack api rsponded with:" f"{response.status_code}, {response.text}"
)
response.raise_for_status()

responses.append(response)

return response.json()
return responses
93 changes: 77 additions & 16 deletions plugins/tests/test_aeon_to_lobby.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,32 @@ def mock_aeon_queue_data():
{
"transactionNumber": 0,
"creationDate": today,
"username": "aeonuser1",
"username": "aeonuser1@stanford.edu",
},
{
"transactionNumber": 1,
"creationDate": today,
"username": "aeonuser2",
"username": "aeonuser1@stanford.edu",
},
{
"transactionNumber": 2,
"creationDate": today,
"username": "aeonuser2@gmail.com",
}
]


# Mock xcom
def mock_xcom_pull_user_data(*args, **kwargs):
def mock_aeon_route_post_response():
return {"transactionNumber": 2, "username": "aeonuser2@gmail.com"}


def mock_xcom_pull_user_data(**kwargs):
return [
{
"username": "aeonuser",
"username": "aeonuser2@gmail.com",
"lastName": "User",
"firstName": "Aeon",
"eMailAddress": "aeonu@mail.edu",
"eMailAddress": "aeonuser2@gmail.com",
"phone": "999-999-9999",
"address": "123 Charm St",
"address2": "Apt A",
Expand All @@ -58,10 +66,26 @@ def mock_xcom_pull_user_data(*args, **kwargs):
]


class MockTaskInstance(pydantic.BaseModel):
def mock_xcom_pull_aeon_users(**kwargs):
return ['aeonuser1@stanford.edu', 'aeonuser1@stanford.edu', 'aesonuser2@gmail.com']


def mock_xcom_pull_aeon_users_transactions(**kwargs):
return [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ['aesonuser2@gmail.com', 2]]


class MockTaskInstanceUserData(pydantic.BaseModel):
xcom_pull = mock_xcom_pull_user_data


class MockTaskInstanceAeonUsers(pydantic.BaseModel):
xcom_pull = mock_xcom_pull_aeon_users


class MockTaskInstanceAeonUsersTransactions(pydantic.BaseModel):
xcom_pull = mock_xcom_pull_aeon_users_transactions


@pytest.fixture
def mock_queue_requests(monkeypatch, mocker: MockerFixture):
def mock_get_queue_data(*args, **kwargs):
Expand All @@ -74,31 +98,68 @@ def mock_get_queue_data(*args, **kwargs):
monkeypatch.setattr(requests, "get", mock_get_queue_data)


@pytest.fixture
def mock_aeon_post(monkeypatch, mocker: MockerFixture):
def mock_post_transaction_routing(*args, **kwargs):
post_response = mocker.stub()
post_response.status_code = 200
post_response.json = mock_aeon_route_post_response

return post_response

monkeypatch.setattr(requests, "post", mock_post_transaction_routing)


def test_user_data(mock_queue_requests, mock_aeon_variable):
from plugins.aeon_to_lobby.aeon import user_data
user_data = user_data(aeon_url="https://aeon.example.com", aeon_key="123")
from plugins.aeon_to_lobby.aeon import user_transaction_data
user_data = user_transaction_data(
aeon_url="https://aeon.example.us", aeon_key="123", queue_id="1"
)

assert lambda: user_data[0] == mock_aeon_queue_data[0]
# assert lambda: user_data[0] == mock_aeon_queue_data[0]
assert lambda: user_data[0] == ["aeonuser2@gmail.com", 2]
assert len(user_data) == 3


def test_aeon_request_params(caplog):
from plugins.aeon_to_lobby.aeon import aeon_user # noqa
def test_filtered_user_data(mock_queue_requests, mock_aeon_variable):
from plugins.aeon_to_lobby.aeon import filtered_users
filtered_users = filtered_users(
aeon_url="https://aeon.example.us", aeon_key="123", task_instance=MockTaskInstanceAeonUsers
)

assert "aeon rsponded with" not in caplog.text
for user in filtered_users:
assert "@stanford.edu" not in user


def test_find_user_from_request_queue(mock_queue_requests, mock_aeon_variable):
from plugins.aeon_to_lobby.aeon import user_requests_in_queue
user_list = user_requests_in_queue(aeon_url="https://aeon.example.com", aeon_key="123")
user_list = user_requests_in_queue(
aeon_url="https://aeon.example.us", aeon_key="123", queue_id="1"
)

assert user_list == ['aeonuser1', 'aeonuser2']
assert user_list == [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ['aeonuser2@gmail.com', 2]]


def test_transform_data(mock_lobby_variable):
from dags.aeon_to_lobby import transform_data

lobby_users = transform_data(task_instance=MockTaskInstance)
lobby_users = transform_data(task_instance=MockTaskInstanceUserData)

assert lobby_users[0]['LastName'] == "User"
assert lobby_users[0]['Email'] == "aeonuser2@gmail.com"
assert lobby_users[0]['CustomFields'][0]['Name'] == "Address (Street)"
assert lobby_users[0]['CustomFields'][0]['Value'] == "123 Charm St, Apt A"


def test_route_aeon_post(mock_queue_requests, mock_aeon_variable, mock_aeon_post, caplog):
from plugins.aeon_to_lobby.aeon import route_aeon_post
route_aeon_post(
aeon_url="https://aeon.example.us",
aeon_key="123",
queue_id="1",
final_queue="Awaiting Request Processing",
task_instance=MockTaskInstanceAeonUsersTransactions
)

assert "{'newStatus': {'Awaiting Request Processing'}}" in caplog.text
assert "aeon rsponded with" not in caplog.text

0 comments on commit b2acf08

Please sign in to comment.