Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
094ea79
refactor: Leverage Job Run API in sensor
Mryashbhardwaj Mar 31, 2022
86f77b2
fix: use UTC time format for query params of job_run api
Mryashbhardwaj Mar 31, 2022
5be7038
fix: use UTC time format for query params of job_run api
Mryashbhardwaj Mar 31, 2022
3ceba72
refactor: CrossTenantDependencySensor to use job_run api
Mryashbhardwaj Apr 1, 2022
6a26481
refactor: merging CrossTenantDependencySensor SuperExternalTaskSensor
Mryashbhardwaj Apr 1, 2022
1c9b88e
Merge branch 'main' of https://github.com/odpf/optimus into merging_C…
Mryashbhardwaj Apr 4, 2022
d85515c
test: replace SuperExternalTaskSensor with CrossTenantDependencySenso…
Mryashbhardwaj Apr 4, 2022
d65d215
Merge branch 'main' of https://github.com/odpf/optimus into merging_C…
Mryashbhardwaj Apr 5, 2022
61332df
Merge branch 'main' of https://github.com/odpf/optimus
Mryashbhardwaj Apr 6, 2022
afe492a
refactor: remove CrossTenantDependencySensor Dependence on SuperExter…
Mryashbhardwaj Apr 6, 2022
f858483
Merge pull request #1 from Mryashbhardwaj/merging_CrossTenantDependen…
Mryashbhardwaj Apr 6, 2022
174080d
refactor: merge CrossTenantDependencySensor with SuperExternalTaskSensor
Mryashbhardwaj Apr 6, 2022
9c78b7e
fix: addressing PR comments
Mryashbhardwaj Apr 6, 2022
b3b8a56
fix: SuperExternalTaskSensor
Mryashbhardwaj Apr 7, 2022
c4d72c8
fix: remomved commented code
Mryashbhardwaj Apr 8, 2022
a621fca
Merge branch 'main' of https://github.com/odpf/optimus into Sensor_me…
Mryashbhardwaj Apr 8, 2022
cec224c
fix: revert to older version of salt
siddhanta-rath Apr 8, 2022
e1dcb0e
test: fix failling testcase
siddhanta-rath Apr 8, 2022
2e80d21
test: sync loder test with main
siddhanta-rath Apr 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (s *ConfigTestSuite) TestLoadClientConfig() {
defer s.a.WriteFile(currFilePath, []byte(clientConfig), fs.ModeTemporary)

conf, err := config.LoadClientConfig(config.EmptyPath, config.EmptyFlags)
s.Assert().Error(err)
s.Assert().Nil(conf)
s.Assert().Nil(err)
s.Assert().NotNil(conf)
})
})

Expand Down Expand Up @@ -184,8 +184,8 @@ func (s *ConfigTestSuite) TestLoadServerConfig() {

conf, err := config.LoadServerConfig(config.EmptyPath, config.EmptyFlags)

s.Assert().Error(err)
s.Assert().Nil(conf)
s.Assert().Nil(err)
s.Assert().NotNil(conf)
})
})

Expand Down
219 changes: 48 additions & 171 deletions ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,136 +113,6 @@ def execute(self, context):
except AirflowException as ex:
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))


class SuperExternalTaskSensor(BaseSensorOperator):
"""
Waits for a different DAG or a task in a different DAG to complete for a
specific execution window

:param external_dag_id: The dag_id that contains the task you want to
wait for
:type external_dag_id: str
:param allowed_states: list of allowed states, default is ``['success']``
:type allowed_states: list
:param window_size: size of the window in hours to look for successful
runs in upstream dag. E.g, "24h" will check for last 24 hours from
current execution date of this dag. It checks for number of successful
iterations of upstream dag in provided window. All of them needs to be
successful for this sensor to complete. Defaults to a day of window(24)
:type window_size: str
"""

@apply_defaults
def __init__(self,
external_dag_id,
window_size: str,
window_offset: str,
window_truncate_to: str,
optimus_hostname: str,
*args,
**kwargs):

# Sensor's have two mode of operations: 'poke' and 'reschedule'. 'poke'
# mode is like having a while loop. when the scheduler runs the task, the
# sensor keeps checking for predicate condition until it becomes true. This
# has the effect that once a sensor starts, it keeps taking resources until
# it senses that the predicate has been met. when set to 'reschedule' it exits
# immediately if the predicate is false and is scheduled at a later time.
# see the documentation for BaseSensorOperator for more information
kwargs['mode'] = kwargs.get('mode', 'reschedule')

self.upstream_dag = external_dag_id
self.window_size = window_size
self.window_offset = window_offset
self.window_truncate_to = window_truncate_to
self.allowed_upstream_states = [State.SUCCESS]
self._optimus_client = OptimusAPIClient(optimus_hostname)

super(SuperExternalTaskSensor, self).__init__(*args, **kwargs)

@provide_session
def poke(self, context, session=None):

schedule_time = context['next_execution_date']
upstream_schedule = self.get_upstream_schedule_interval(session)

# calculate windows
_, last_upstream_execution_date = self.get_last_upstream_times(schedule_time, upstream_schedule)
task_window = JobSpecTaskWindow(self.window_size, 0, "m", self._optimus_client)
execution_date_window_start, execution_date_window_end = task_window.get(
last_upstream_execution_date.strftime(TIMESTAMP_FORMAT))

self.log.info(
"upstream interval: {}, window size: {}".format(upstream_schedule, self.window_size))
self.log.info(
"waiting for upstream runs between: {} - {} execution dates of airflow dag runs".format(
execution_date_window_start.isoformat(), execution_date_window_end.isoformat()))

# find success iterations we need in window
expected_upstream_executions = self.get_expected_upstream_executions(upstream_schedule,
execution_date_window_start,
execution_date_window_end)
self.log.info("expected upstream executions ({}): {}".format(len(expected_upstream_executions),
expected_upstream_executions))

# upstream dag runs between input window with success state
actual_upstream_executions = [r.execution_date for r in session.query(DagRun.execution_date)
.filter(
DagRun.dag_id == self.upstream_dag,
DagRun.execution_date > execution_date_window_start.replace(tzinfo=utc),
DagRun.execution_date <= execution_date_window_end.replace(tzinfo=utc),
DagRun.external_trigger == False,
DagRun.state.in_(self.allowed_upstream_states)
).order_by(DagRun.execution_date).all()]
self.log.info(
"actual upstream executions ({}): {}".format(len(actual_upstream_executions), actual_upstream_executions))

missing_upstream_executions = set(expected_upstream_executions) - set(actual_upstream_executions)
if len(missing_upstream_executions) > 0:
self.log.info("missing upstream executions : {}".format(missing_upstream_executions))
self.log.warning(
"unable to find enough DagRun instances for upstream '{}' dated between {} and {}(inclusive), rescheduling sensor"
.format(self.upstream_dag, execution_date_window_start.isoformat(),
execution_date_window_end.isoformat()))
return False

return True

def get_upstream_schedule_interval(self, session):
dag_to_wait = session.query(DagModel).filter(
DagModel.dag_id == self.upstream_dag
).first()
# check if valid upstream dag
if not dag_to_wait:
raise AirflowException('The external DAG '
'{} does not exist.'.format(self.upstream_dag))
else:
if not os.path.exists(dag_to_wait.fileloc):
raise AirflowException('The external DAG '
'{} was deleted.'.format(self.upstream_dag))
upstream_schedule = lookup_non_standard_cron_expression(dag_to_wait.schedule_interval)
return upstream_schedule

@staticmethod
def get_last_upstream_times(schedule_time_of_current_job, upstream_schedule_interval):
second_ahead_of_schedule_time = schedule_time_of_current_job + timedelta(seconds=1)
c = croniter(upstream_schedule_interval, second_ahead_of_schedule_time)
last_upstream_schedule_time = c.get_prev(datetime)
last_upstream_execution_date = c.get_prev(datetime)
return last_upstream_schedule_time, last_upstream_execution_date

@staticmethod
def get_expected_upstream_executions(cron_schedule, window_start, window_end):
expected_upstream_executions = []
dag_cron = croniter(cron_schedule, window_start.replace(tzinfo=None))
while True:
next_run = dag_cron.get_next(datetime)
if next_run > window_end.replace(tzinfo=None):
break
expected_upstream_executions.append(next_run.replace(tzinfo=utc))
return expected_upstream_executions


class OptimusAPIClient:
def __init__(self, optimus_host):
self.host = self._add_connection_adapter_if_absent(optimus_host)
Expand All @@ -252,13 +122,13 @@ def _add_connection_adapter_if_absent(self, host):
return host
return "http://" + host

def get_job_run_status(self, optimus_project: str, optimus_job: str) -> dict:
url = '{optimus_host}/api/v1beta1/project/{optimus_project}/job/{optimus_job}/status'.format(
def get_job_run(self, optimus_project: str, optimus_job: str, startDate: str, endDate:str ) -> dict:
url = '{optimus_host}/api/v1beta1/project/{optimus_project}/job/{optimus_job}/run'.format(
optimus_host=self.host,
optimus_project=optimus_project,
optimus_job=optimus_job,
)
response = requests.get(url)
response = requests.get(url, params = { 'start_date': startDate,'end_date': endDate})
self._raise_error_if_request_failed(response)
return response.json()

Expand Down Expand Up @@ -322,15 +192,32 @@ def get(self, scheduled_at: str) -> (datetime, datetime):
self._parse_datetime(api_response['start']),
self._parse_datetime(api_response['end']),
)

# window start is inclusive
def get_schedule_window(self, scheduled_at: str , upstream_schedule: str) -> (str, str):
api_response = self._fetch_task_window(scheduled_at)

schedule_time_window_start = self._parse_datetime(api_response['start'])
schedule_time_window_end = self._parse_datetime(api_response['end'])

job_cron_iter = croniter(upstream_schedule, schedule_time_window_start)
schedule_time_window_inclusive_start = job_cron_iter.get_next(datetime)
return (
self._parse_datetime_utc_str(schedule_time_window_inclusive_start),
self._parse_datetime_utc_str(schedule_time_window_end),
)

def _parse_datetime(self, timestamp):
return datetime.strptime(timestamp, TIMESTAMP_FORMAT)

def _parse_datetime_utc_str(self, timestamp):
return timestamp.strftime(TIMESTAMP_FORMAT)

def _fetch_task_window(self, scheduled_at: str) -> dict:
return self._optimus_client.get_task_window(scheduled_at, self.size, self.offset, self.truncate_to)


class CrossTenantDependencySensor(BaseSensorOperator):
class SuperExternalTaskSensor(BaseSensorOperator):

@apply_defaults
def __init__(
Expand All @@ -355,44 +242,33 @@ def poke(self, context):
# TODO this needs to be updated to use optimus get job spec
upstream_schedule = self.get_schedule_interval(schedule_time)

_, last_upstream_execution_date = SuperExternalTaskSensor.get_last_upstream_times(
schedule_time,
upstream_schedule)
last_upstream_schedule_time, _ = self.get_last_upstream_times(
schedule_time, upstream_schedule)

# get schedule window
task_window = JobSpecTaskWindow(self.window_size, 0, "m", self._optimus_client)
execution_date_window_start, execution_date_window_end = task_window.get(
last_upstream_execution_date.strftime(TIMESTAMP_FORMAT))

self.log.info(
"upstream interval: {}, window size: {}".format(upstream_schedule, self.window_size))
self.log.info(
"waiting for upstream runs between: {} - {} execution dates of airflow dag run".format(
execution_date_window_start.isoformat(), execution_date_window_end.isoformat()))

# find success iterations we need in window
expected_upstream_executions = SuperExternalTaskSensor.get_expected_upstream_executions(upstream_schedule,
execution_date_window_start,
execution_date_window_end)
self.log.info("expected upstream executions ({}): {}".format(len(expected_upstream_executions),
expected_upstream_executions))

actual_upstream_success_executions = self._get_successful_job_executions()
self.log.info("actual upstream executions ({}): {}".format(len(actual_upstream_success_executions),
actual_upstream_success_executions))

# determine if all expected are present in actual
missing_upstream_executions = set(expected_upstream_executions) - set(actual_upstream_success_executions)
if len(missing_upstream_executions) > 0:
self.log.info("missing upstream executions : {}".format(missing_upstream_executions))
schedule_time_window_start, schedule_time_window_end = task_window.get_schedule_window(
last_upstream_schedule_time.strftime(TIMESTAMP_FORMAT),upstream_schedule)


self.log.info("waiting for upstream runs between: {} - {} schedule times of airflow dag run".format(
schedule_time_window_start, schedule_time_window_end))

if not self._are_all_job_runs_successful(schedule_time_window_start, schedule_time_window_end):
self.log.warning("unable to find enough successful executions for upstream '{}' in "
"'{}' dated between {} and {}(inclusive), rescheduling sensor".
format(self.optimus_job, self.optimus_project, execution_date_window_start.isoformat(),
execution_date_window_end.isoformat()))
format(self.optimus_job, self.optimus_project, schedule_time_window_start,
schedule_time_window_end))
return False

return True

def get_last_upstream_times(self, schedule_time_of_current_job, upstream_schedule_interval):
second_ahead_of_schedule_time = schedule_time_of_current_job + timedelta(seconds=1)
c = croniter(upstream_schedule_interval, second_ahead_of_schedule_time)
last_upstream_schedule_time = c.get_prev(datetime)
last_upstream_execution_date = c.get_prev(datetime)
return last_upstream_schedule_time, last_upstream_execution_date

def get_schedule_interval(self, schedule_time):
schedule_time_str = schedule_time.strftime(TIMESTAMP_FORMAT)
job_metadata = self._optimus_client.get_job_metadata(schedule_time_str, self.optimus_project, self.optimus_job)
Expand All @@ -401,13 +277,14 @@ def get_schedule_interval(self, schedule_time):

# TODO the api will be updated with getJobRuns even though the field here refers to scheduledAt
# it points to execution_date
def _get_successful_job_executions(self) -> List[datetime]:
api_response = self._optimus_client.get_job_run_status(self.optimus_project, self.optimus_job)
actual_upstream_success_executions = []
for job_run in api_response['statuses']:
if job_run['state'] == 'success':
actual_upstream_success_executions.append(self._parse_datetime(job_run['scheduledAt']))
return actual_upstream_success_executions
def _are_all_job_runs_successful(self, schedule_time_window_start, schedule_time_window_end) -> bool:
api_response = self._optimus_client.get_job_run(self.optimus_project, self.optimus_job, schedule_time_window_start, schedule_time_window_end)
self.log.info("job_run api response :: {}".format(api_response))
for job_run in api_response['jobRuns']:
if job_run['state'] != 'success':
self.log.info("failed for run :: {}".format(job_run))
return False
return True

def _parse_datetime(self, timestamp) -> datetime:
try:
Expand Down
13 changes: 6 additions & 7 deletions ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor
SuperExternalTaskSensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -183,20 +183,19 @@

{{- if eq $dependency.Type $.JobSpecDependencyTypeIntra }}
wait_{{$dependency.Job.Name | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
external_dag_id="{{$dependency.Job.Name}}",
window_size={{$baseWindow.Size.String | quote}},
window_offset={{$baseWindow.Offset.String | quote}},
window_truncate_to={{$baseWindow.TruncateTo | quote}},
optimus_hostname="{{$.Hostname}}",
task_id="wait_{{$dependency.Job.Name | trunc 200}}-{{$dependencySchema.Name}}",
upstream_optimus_project="{{$.Namespace.ProjectSpec.Name}}",
upstream_optimus_job="{{$dependency.Job.Name}}",
window_size="{{ $baseWindow.Size.String }}",
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id="wait_{{$dependency.Job.Name | trunc 200}}-{{$dependencySchema.Name}}",
dag=dag
)
{{- end -}}

{{- if eq $dependency.Type $.JobSpecDependencyTypeInter }}
wait_{{$dependency.Job.Name | replace "-" "__dash__" | replace "." "__dot__"}} = CrossTenantDependencySensor(
wait_{{$dependency.Job.Name | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
optimus_hostname="{{$.Hostname}}",
upstream_optimus_project="{{$dependency.Project.Name}}",
upstream_optimus_job="{{$dependency.Job.Name}}",
Expand Down
13 changes: 6 additions & 7 deletions ext/scheduler/airflow2/resources/expected_compiled_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor
SuperExternalTaskSensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -186,17 +186,16 @@
# create upstream sensors

wait_foo__dash__intra__dash__dep__dash__job = SuperExternalTaskSensor(
external_dag_id="foo-intra-dep-job",
window_size="1h0m0s",
window_offset="0s",
window_truncate_to="d",
optimus_hostname="http://airflow.example.io",
task_id="wait_foo-intra-dep-job-bq",
upstream_optimus_project="foo-project",
upstream_optimus_job="foo-intra-dep-job",
window_size="1h0m0s",
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id="wait_foo-intra-dep-job-bq",
dag=dag
)
wait_foo__dash__inter__dash__dep__dash__job = CrossTenantDependencySensor(
wait_foo__dash__inter__dash__dep__dash__job = SuperExternalTaskSensor(
optimus_hostname="http://airflow.example.io",
upstream_optimus_project="foo-external-project",
upstream_optimus_job="foo-inter-dep-job",
Expand Down
Loading