Skip to content

Commit

Permalink
Fix #11888: Apache flink connector (#16601)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 authored and harshach committed Jun 30, 2024
1 parent d937839 commit 4b12cd7
Show file tree
Hide file tree
Showing 20 changed files with 819 additions and 0 deletions.
33 changes: 33 additions & 0 deletions ingestion/src/metadata/examples/workflows/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
source:
type: flink
serviceName: local_flink
serviceConnection:
config:
type: Flink
hostPort: http://127.0.0.1:8081
verifySSL: no-ssl
sslConfig:
caCertificate: |
-----BEGIN CERTIFICATE-----
sample certificate
-----END CERTIFICATE-----
sslCertificate: |
-----BEGIN CERTIFICATE-----
sample certificate
-----END CERTIFICATE-----
sslKey: |
-----BEGIN PRIVATE KEY-----
sample certificate
-----END PRIVATE KEY-----
sourceConfig:
config:
type: PipelineMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
53 changes: 53 additions & 0 deletions ingestion/src/metadata/ingestion/source/pipeline/flink/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client to interact with flink apis
"""

from typing import List, Optional

from metadata.generated.schema.entity.services.connections.pipeline.flinkConnection import (
FlinkConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.source.pipeline.flink.models import (
FlinkPipeline,
FlinkPipelineList,
)
from metadata.utils.constants import AUTHORIZATION_HEADER
from metadata.utils.helpers import clean_uri
from metadata.utils.ssl_registry import get_verify_ssl_fn


class FlinkClient:
"""
Client to interact with flink apis
"""

def __init__(self, config: FlinkConnection):
self.config = config
get_verify_ssl = get_verify_ssl_fn(config.verifySSL)
client_config: ClientConfig = ClientConfig(
base_url=clean_uri(self.config.hostPort),
api_version="",
auth_header=AUTHORIZATION_HEADER,
auth_token=lambda: ("no_token", 0),
verify=get_verify_ssl(config.sslConfig),
)
self.client = REST(client_config)

def get_jobs(self) -> Optional[List[FlinkPipelineList]]:
response = self.client.get("jobs/overview")
return FlinkPipelineList(**response)

def get_pipeline_info(self, pipeline_id: str) -> FlinkPipeline:
response = self.client.get(f"jobs/{pipeline_id}")
return FlinkPipeline(**response)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Source connection handler
"""
from typing import Optional

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.flinkConnection import (
FlinkConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.flink.client import FlinkClient


def get_connection(connection: FlinkConnection) -> FlinkClient:
"""
Create connection
"""
return FlinkClient(connection)


def test_connection(
metadata: OpenMetadata,
client: FlinkClient,
service_connection: FlinkConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
"""
Test connection. This can be executed either as part
of a metadata workflow or during an Automation Workflow
"""
test_fn = {"GetPipelines": client.get_jobs}

test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)
188 changes: 188 additions & 0 deletions ingestion/src/metadata/ingestion/source/pipeline/flink/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Airbyte source to extract metadata
"""
import traceback
from typing import Any, Iterable, List, Optional

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import (
Pipeline,
PipelineStatus,
StatusType,
Task,
TaskStatus,
)
from metadata.generated.schema.entity.services.connections.pipeline.flinkConnection import (
FlinkConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
SourceUrl,
Timestamp,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.flink.models import FlinkPipeline
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()

TASK_STATUS_MAP = {
"RUNNING": StatusType.Pending,
"FAILED": StatusType.Failed,
"CANCELED": StatusType.Failed,
}


class FlinkSource(PipelineServiceSource):
"""
Implements the necessary methods ot extract
Pipeline metadata from Flink's REST API
"""

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: FlinkConnection = config.serviceConnection.root.config
if not isinstance(connection, FlinkConnection):
raise InvalidSourceException(
f"Expected FlinkConnection, but got {connection}"
)
return cls(config, metadata)

def get_connections_jobs(
self, pipeline_details: FlinkPipeline
) -> Optional[List[Task]]:
"""Returns the list of tasks linked to connection"""
pipeline_info = self.client.get_pipeline_info(pipeline_details.id)
return [
Task(
name=f"{task.name}_{task.id}",
)
for task in pipeline_info.tasks
]

def yield_pipeline(
self, pipeline_details: FlinkPipeline
) -> Iterable[Either[CreatePipelineRequest]]:
"""
Convert a Connection into a Pipeline Entity
:param pipeline_details: pipeline_details object from Flink
:return: Create Pipeline request with tasks
"""
try:
pipeline_request = CreatePipelineRequest(
name=EntityName(pipeline_details.name),
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
sourceUrl=SourceUrl(self.get_source_url(pipeline_details)),
tasks=self.get_connections_jobs(pipeline_details),
)
yield Either(right=pipeline_request)
self.register_record(pipeline_request=pipeline_request)
except TypeError as err:
yield Either(
left=StackTraceError(
name=pipeline_details.name,
error=f"Error to yield pipeline for {pipeline_details}: {err}",
stackTrace=traceback.format_exc(),
)
)

def get_pipelines_list(self) -> Iterable[FlinkPipeline]:
"""Get List of all pipelines"""
for pipeline in self.client.get_jobs().pipelines:
yield pipeline

def get_pipeline_name(self, pipeline_details: FlinkPipeline) -> str:
return pipeline_details.name

def yield_pipeline_lineage_details(
self, pipeline_details: Any
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage between pipeline and data sources"""

def yield_pipeline_status(
self, pipeline_details: FlinkPipeline
) -> Iterable[Either[OMetaPipelineStatus]]:
"""
Get Pipeline Status
"""
try:
task_status = []
for task in self.client.get_pipeline_info(pipeline_details.id).tasks:
task_status.append(
TaskStatus(
name=f"{task.name}_{task.id}",
executionStatus=TASK_STATUS_MAP.get(task.status),
startTime=Timestamp(task.start_time),
endTime=Timestamp(task.end_time),
)
)

pipeline_status = PipelineStatus(
executionStatus=StatusType.Successful,
taskStatus=task_status,
timestamp=Timestamp(pipeline_details.start_time),
)

pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
pipeline_status=pipeline_status,
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.name,
error=f"Wild error ingesting pipeline status {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)

def get_source_url(self, pipeline_details: FlinkPipeline) -> Optional[str]:
try:
pipeline_status = pipeline_details.state.lower()
url_status = None
if pipeline_status == "finished" or pipeline_status == "failed":
url_status = "completed"
elif pipeline_status == "running":
url_status = "running"

if url_status:
return f"{self.client.config.hostPort}/#/job/{url_status}/{pipeline_details.id}/overview"
return f"{self.client.config.hostPort}/#/overview"
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unable to get source url: {exc}")
return None
44 changes: 44 additions & 0 deletions ingestion/src/metadata/ingestion/source/pipeline/flink/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Flink Models
"""

from typing import List, Optional

from pydantic import BaseModel, Field


class FlinkTask(BaseModel):
"""Flink task model"""

id: str
name: str
status: str
start_time: Optional[int] = Field(alias="start-time", default=None)
end_time: Optional[int] = Field(alias="end-time", default=None)


class FlinkPipeline(BaseModel):
"""Flink job model"""

state: str
name: str
id: str = Field(alias="jid")
start_time: Optional[int] = Field(alias="start-time", default=None)
end_time: Optional[int] = Field(alias="end-time", default=None)
tasks: Optional[List[FlinkTask]] = Field(alias="vertices", default=[])


class FlinkPipelineList(BaseModel):
"""Flink Pipelines List"""

pipelines: Optional[List[FlinkPipeline]] = Field(alias="jobs", default=[])
Loading

0 comments on commit 4b12cd7

Please sign in to comment.