Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : add external http sensor at optimus #150

Merged
merged 12 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "da1e4831cdd697935fbd8724850fcf22be13e0f9"
PROTON_COMMIT := "0d4304b57f799951c9577339664e87ba993daa5f"

.PHONY: build test generate pack-files generate-proto unit-test smoke-test integration-test vet coverage clean install lint

Expand Down
36 changes: 30 additions & 6 deletions api/handler/v1beta1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,23 @@ func (adapt *Adapter) FromJobProto(spec *pb.JobSpecification) (models.JobSpec, e
}
endDate = &end
}

// prep external dependencies
var externalDependencies models.ExternalDependency
// prep dirty dependencies
dependencies := map[string]models.JobSpecDependency{}
for _, dep := range spec.Dependencies {
dependencies[dep.GetName()] = models.JobSpecDependency{
Type: models.JobSpecDependencyType(dep.GetType()),
if dep.GetName() != "" {
dependencies[dep.GetName()] = models.JobSpecDependency{
Type: models.JobSpecDependencyType(dep.GetType()),
}
}
if dep.HttpDependency != nil {
externalDependencies.HTTPDependencies = append(externalDependencies.HTTPDependencies, models.HTTPDependency{
Name: dep.HttpDependency.Name,
RequestParams: dep.HttpDependency.Params,
URL: dep.HttpDependency.Url,
Headers: dep.HttpDependency.Headers,
})
}
}

Expand Down Expand Up @@ -122,9 +133,10 @@ func (adapt *Adapter) FromJobProto(spec *pb.JobSpecification) (models.JobSpec, e
Config: taskConfigs,
Window: window,
},
Dependencies: dependencies,
Hooks: hooks,
Metadata: metadata,
Dependencies: dependencies,
Hooks: hooks,
Metadata: metadata,
ExternalDependencies: externalDependencies,
}, nil
}

Expand Down Expand Up @@ -207,6 +219,18 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err
})
}

//prep external dependencies for proto
for _, httpDep := range spec.ExternalDependencies.HTTPDependencies {
conf.Dependencies = append(conf.Dependencies, &pb.JobDependency{
HttpDependency: &pb.HttpDependency{
Name: httpDep.Name,
Url: httpDep.URL,
Headers: httpDep.Headers,
Params: httpDep.RequestParams,
},
})
}

var taskConfigs []*pb.JobConfigItem
for _, c := range spec.Task.Config {
taskConfigs = append(taskConfigs, &pb.JobConfigItem{
Expand Down
14 changes: 14 additions & 0 deletions api/handler/v1beta1/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ func TestAdapter(t *testing.T) {
Unit: &models.Plugin{Base: execUnit1},
},
},
ExternalDependencies: models.ExternalDependency{
HTTPDependencies: []models.HTTPDependency{
{
Name: "test_http_sensor_1",
RequestParams: map[string]string{
"key_test": "value_test",
},
URL: "http://test/optimus/status/1",
Headers: map[string]string{
"Content-Type": "application/json",
},
},
},
},
}

inProto, err := adapter.ToJobProto(jobSpec)
Expand Down
2,778 changes: 1,460 additions & 1,318 deletions api/proto/odpf/optimus/core/v1beta1/runtime.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1784,6 +1784,29 @@
}
}
},
"v1beta1HttpDependency": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"url": {
"type": "string"
},
"headers": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"params": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
"v1beta1InstanceContext": {
"type": "object",
"properties": {
Expand All @@ -1798,6 +1821,12 @@
"additionalProperties": {
"type": "string"
}
},
"secrets": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
Expand Down Expand Up @@ -1877,6 +1906,9 @@
},
"type": {
"type": "string"
},
"httpDependency": {
"$ref": "#/definitions/v1beta1HttpDependency"
}
}
},
Expand Down
38 changes: 38 additions & 0 deletions ext/scheduler/airflow/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,41 @@ def _xcom_value_has_error(_xcom) -> bool:
blocks=blocks,
)
return failed_alert.execute(context=context)


class ExternalHttpSensor(BaseSensorOperator):
"""
Executes a HTTP GET statement and returns False on failure caused by
404 Not Found

:param method: The HTTP request method to use
:param endpoint: The relative part of the full url
:param request_params: The parameters to be added to the GET url
:param headers: The HTTP headers to be added to the GET request

"""

template_fields = ('endpoint', 'request_params', 'headers')

def __init__(
self,
endpoint: str,
method: str = 'GET',
request_params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
*args,
**kwargs,
) -> None:
kwargs['mode'] = kwargs.get('mode', 'reschedule')
super().__init__(**kwargs)
self.endpoint = endpoint
self.request_params = request_params or {}
self.headers = headers or {}

def poke(self, context: 'Context') -> bool:
self.log.info('Poking: %s', self.endpoint)
r = requests.get(self.endpoint, headers=self.headers, params=self.request_params)
if (r.status_code >= 200 and r.status_code <=300):
return True
return False

22 changes: 21 additions & 1 deletion ext/scheduler/airflow/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from airflow.utils.weight_rule import WeightRule
from kubernetes.client import models as k8s


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -194,6 +195,21 @@
{{- end -}}
{{- end}}

{{- range $_, $httpDependency := $.Job.ExternalDependencies.HTTPDependencies}}
headers_dict_{{$httpDependency.Name}} = { {{- range $k, $v := $httpDependency.Headers}} '{{$k}}': '{{$v}}', {{- end}} }
siddhanta-rath marked this conversation as resolved.
Show resolved Hide resolved
request_params_dict_{{$httpDependency.Name}} = { {{- range $key, $value := $httpDependency.RequestParams}} '{{$key}}': '{{$value}}', {{- end}} }

wait_{{$httpDependency.Name}} = ExternalHttpSensor(
endpoint='{{$httpDependency.URL}}',
headers=headers_dict_{{$httpDependency.Name}},
request_params=request_params_dict_{{$httpDependency.Name}},
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id='wait_{{$httpDependency.Name| trunc 200}}',
dag=dag
)
{{- end}}

# arrange inter task dependencies
####################################

Expand All @@ -202,6 +218,10 @@
wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}

{{- range $_, $t := $.Job.ExternalDependencies.HTTPDependencies }}
wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}

# set inter-dependencies between task and hooks
{{- range $_, $task := .Job.Hooks }}
{{- $hookSchema := $task.Unit.Info }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from airflow.utils.weight_rule import WeightRule
from kubernetes.client import models as k8s


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down
38 changes: 37 additions & 1 deletion ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime
from typing import List
import pendulum

from typing import Any, Callable, Dict, List, Optional
import requests
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.utils import pod_launcher
Expand Down Expand Up @@ -589,3 +589,39 @@ def _xcom_value_has_error(_xcom) -> bool:
channel=SLACK_CHANNEL
)
return failed_alert.execute(context=context)

class ExternalHttpSensor(BaseSensorOperator):
"""
Executes a HTTP GET statement and returns False on failure caused by
404 Not Found

:param method: The HTTP request method to use
:param endpoint: The relative part of the full url
:param request_params: The parameters to be added to the GET url
:param headers: The HTTP headers to be added to the GET request

"""

template_fields = ('endpoint', 'request_params', 'headers')

def __init__(
self,
endpoint: str,
method: str = 'GET',
request_params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
*args,
**kwargs,
) -> None:
kwargs['mode'] = kwargs.get('mode', 'reschedule')
super().__init__(**kwargs)
self.endpoint = endpoint
self.request_params = request_params or {}
self.headers = headers or {}

def poke(self, context: 'Context') -> bool:
self.log.info('Poking: %s', self.endpoint)
r = requests.get(url=self.endpoint, headers=self.headers, params=self.request_params)
if (r.status_code >= 200 and r.status_code <=300):
return True
return False
22 changes: 21 additions & 1 deletion ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from airflow.utils.weight_rule import WeightRule
from kubernetes.client import models as k8s


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down Expand Up @@ -203,6 +204,21 @@
{{- end -}}
{{- end}}

{{- range $_, $httpDependency := $.Job.ExternalDependencies.HTTPDependencies}}
headers_dict_{{$httpDependency.Name}} = { {{- range $k, $v := $httpDependency.Headers}} '{{$k}}': '{{$v}}', {{- end}} }
request_params_dict_{{$httpDependency.Name}} = { {{- range $key, $value := $httpDependency.RequestParams}} '{{$key}}': '{{$value}}', {{- end}} }

wait_{{$httpDependency.Name}} = ExternalHttpSensor(
endpoint='{{$httpDependency.URL}}',
headers=headers_dict_{{$httpDependency.Name}},
request_params=request_params_dict_{{$httpDependency.Name}},
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
task_id='wait_{{$httpDependency.Name| trunc 200}}',
dag=dag
)
{{- end}}

# arrange inter task dependencies
####################################

Expand All @@ -211,6 +227,10 @@
wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}

{{- range $_, $t := $.Job.ExternalDependencies.HTTPDependencies }}
wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}}
{{- end}}

# set inter-dependencies between task and hooks
{{- range $_, $task := .Job.Hooks }}
{{- $hookSchema := $task.Unit.Info }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from airflow.utils.weight_rule import WeightRule
from kubernetes.client import models as k8s


from __lib import optimus_failure_notify, optimus_sla_miss_notify, SuperKubernetesPodOperator, \
SuperExternalTaskSensor, CrossTenantDependencySensor
SuperExternalTaskSensor, CrossTenantDependencySensor, ExternalHttpSensor

SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60))
SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60))
Expand Down
38 changes: 25 additions & 13 deletions models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,20 @@ const (
// JobSpec represents a job
// internal representation of the job
type JobSpec struct {
ID uuid.UUID
Version int
Name string
Description string
Labels map[string]string
Owner string
Schedule JobSpecSchedule
Behavior JobSpecBehavior
Task JobSpecTask
Dependencies map[string]JobSpecDependency // job name to dependency
Assets JobAssets
Hooks []JobSpecHook
Metadata JobSpecMetadata
ID uuid.UUID
Version int
Name string
Description string
Labels map[string]string
Owner string
Schedule JobSpecSchedule
Behavior JobSpecBehavior
Task JobSpecTask
Dependencies map[string]JobSpecDependency // job name to dependency
Assets JobAssets
Hooks []JobSpecHook
Metadata JobSpecMetadata
ExternalDependencies ExternalDependency //external dependencies for http
}

func (js JobSpec) GetName() string {
Expand Down Expand Up @@ -299,6 +300,17 @@ type JobSpecDependency struct {
Type JobSpecDependencyType
}

type ExternalDependency struct {
HTTPDependencies []HTTPDependency
}

type HTTPDependency struct {
Name string
RequestParams map[string]string
URL string
Headers map[string]string
}

// JobService provides a high-level operations on DAGs
type JobService interface {
// Create constructs a Job and commits it to a storage
Expand Down