Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify workflow success in CI on Airflow #48

Merged
merged 8 commits into from Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/provider.yaml
Expand Up @@ -64,6 +64,15 @@ jobs:
astro dev run dags trigger lakeFS_workflow
sleep 30

- name : Run DAG state check script
id : dag_status_id
run: |
python3 dag_status.py

- name : Check failed DAG satus
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved
if: steps.dag_status_id.outputs.status !=0
run : exit 1
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved

- name: Wait until Airflow makes output file available on main
env:
# To avoid the lack of region - see https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html
Expand Down
38 changes: 38 additions & 0 deletions dag_status.py
@@ -0,0 +1,38 @@
import time
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved
import requests

def get_latest_state():
url = "http://localhost:8080/api/v1/dags/lakeFS_workflow/dagRuns"
username="admin"
password="admin"
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved
response = requests.get( url, auth=(username, password))
dag_details = {}
for key in response.json()['dag_runs']:
dag_details[key['execution_date']] = key['state']

# Creates a sorted dictionary (sorted by key)
from collections import OrderedDict

dict1 = OrderedDict(sorted(dag_details.items(), reverse=True))
state = dict1[list(dict1.keys())[0]]
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved
return state

def dag_state():
print("Inside the dag state block")
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved

state=get_latest_state()
timeout = time.time() + 60 * 5 # 5 minutes from now
while ((state != 'success') and (state != 'failed') and (state != 'skipped')):
time.sleep(5)
if time.time() > timeout:
return 1
state=get_latest_state()
print("Dag details for LakeFS workflow",state)
continue
if state=='success':
return 0
else:
return 1

print(dag_state())
vaibhavk1992 marked this conversation as resolved.
Show resolved Hide resolved

1 change: 1 addition & 0 deletions requirements.txt
@@ -1,2 +1,3 @@
lakefs_client~=0.91.0
setuptools~=56.0.0
requests~=2.28.2