Skip to content

Commit

Permalink
Fix xcom return values being passed and do not return anything from l…
Browse files Browse the repository at this point in the history
…obbytrack post
  • Loading branch information
jgreben committed Sep 20, 2022
1 parent 078006f commit a9b2022
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 15 deletions.
10 changes: 7 additions & 3 deletions dags/aeon_to_lobby.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def transform_data(*args, **kwargs):
lobby_users = []
task_instance = kwargs["task_instance"]
aeon_users = task_instance.xcom_pull(
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # [['aeonuser2', 111], ['aeonuser2', 222]]
key="return_value", task_ids="filter_aeon_user_data"
)

for aeon_user in aeon_users:
# map keys and values for user
Expand Down Expand Up @@ -80,7 +80,11 @@ def transform_data(*args, **kwargs):
)

filtered_user_data = PythonOperator(
task_id="filter_aeon_user_data", python_callable=filtered_users
task_id="filter_aeon_user_data", python_callable=filtered_users,
op_kwargs={
"aeon_url": Variable.get("AEON_URL"),
"aeon_key": Variable.get("AEON_KEY")
}
)

transform_to_lobby_data = PythonOperator(
Expand Down
15 changes: 9 additions & 6 deletions plugins/aeon_to_lobby/aeon.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def user_transaction_data(**kwargs):
users = []
queue_users = user_requests_in_queue(**kwargs)
for user_transaction in queue_users: # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]]
users.append(user_transaction[0])
users.append(user_transaction)

return users

Expand All @@ -18,14 +18,16 @@ def filtered_users(**kwargs):
task_instance = kwargs["task_instance"]
queue_users = task_instance.xcom_pull(
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # ['aeonuser1@stanford.edu', 'aeonuser1@stanford.edu', 'aesonuser2@gmail.com']
) # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]]

for user in queue_users:
if "@stanford.edu" not in user:
user = aeon_user(**kwargs, user=user)
logging.info(f"Adding {user}")
username = user[0]
if "@stanford.edu" not in username:
user = aeon_user(**kwargs, user=username)
logging.info(f"Adding {username}")
users.append(user)
else:
logging.info(f"Skipping {user}")
logging.info(f"Skipping {username}")

return users

Expand All @@ -35,6 +37,7 @@ def route_aeon_post(**kwargs):
aeon_data = task_instance.xcom_pull(
key="return_value", task_ids="get_user_transaction_data_from_aeon"
) # [['aeonuser1@stanford.edu', 0], ['aeonuser1@stanford.edu', 1], ["aesonuser2@gmail.com", 2]]

aeon_url = kwargs["aeon_url"]
queue = kwargs["final_queue"]
aeon_headers = {"X-AEON-API-KEY": kwargs["aeon_key"], "Accept": "application/json"}
Expand Down
7 changes: 1 addition & 6 deletions plugins/aeon_to_lobby/lobbytrack.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ def lobby_post(**kwargs):
key="return_value", task_ids="transform_aeon_data_to_lobby_json"
)

responses = []

for user in lobby_users:
logging.info(user)
response = requests.post(lobby_url, headers=lobby_headers, json=user)

if response.status_code != 200:
Expand All @@ -30,6 +27,4 @@ def lobby_post(**kwargs):
)
response.raise_for_status()

responses.append(response)

return responses
logging.info(f"{response.status_code}, {response.text}, {user}")

0 comments on commit a9b2022

Please sign in to comment.