Skip to content

Commit

Permalink
metadata changes, ui changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 committed Jun 13, 2024
1 parent 5e54575 commit 4cea985
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
FlinkConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.source.pipeline.flink.models import FlinkPipeline
from metadata.utils.constants import AUTHORIZATION_HEADER


Expand All @@ -38,3 +39,7 @@ def __init__(self, config: FlinkConnection):
def get_jobs(self):
response = self.client.get(f"jobs/overview")
return response.get("jobs", {})

def get_pipeline_info(self, pipeline_details: FlinkPipeline):
response = self.client.get(f"jobs/{pipeline_details.id}")
return response
16 changes: 11 additions & 5 deletions ingestion/src/metadata/ingestion/source/pipeline/flink/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.flink.models import FlinkPipeline
from metadata.ingestion.source.pipeline.flink.models import FlinkPipeline, FlinkTask
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.logger import ingestion_logger

Expand All @@ -54,11 +54,17 @@ def create(

def get_connections_jobs(self, pipeline_details: FlinkPipeline):
"""Returns the list of tasks linked to connection"""
return [
Task(
name=pipeline_details.name,
pipeline_info = self.client.get_pipeline_info(pipeline_details)
tasks = pipeline_info.get("vertices", {})
om_tasks = []
for task in tasks:
task_obj = FlinkTask(**task)
om_tasks.append(
Task(
name=f"{task_obj.name}_{task_obj.id}",
),
)
]
return om_tasks

def yield_pipeline(
self, pipeline_details: FlinkPipeline
Expand Down
10 changes: 10 additions & 0 deletions ingestion/src/metadata/ingestion/source/pipeline/flink/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ class FlinkPipeline(BaseModel):
state: str
name: str
id: str = Field(alias="jid", default=None)


class FlinkTask(BaseModel):
"""Flink task model"""

id: str
name: str
status: str
start_time: str = Field(alias="start-time", default=None)
end_time: str = Field(alias="end-time", default=None)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import doris from '../assets/img/service-icon-doris.png';
import druid from '../assets/img/service-icon-druid.png';
import dynamodb from '../assets/img/service-icon-dynamodb.png';
import fivetran from '../assets/img/service-icon-fivetran.png';
import flink from '../assets/img/service-icon-flink.png';
import gcs from '../assets/img/service-icon-gcs.png';
import glue from '../assets/img/service-icon-glue.png';
import greenplum from '../assets/img/service-icon-greenplum.png';
Expand Down Expand Up @@ -203,6 +204,7 @@ export const PLUS = plus;
export const NOSERVICE = noService;
export const ICEBERGE = iceberge;
export const TERADATA = teradata;
export const FLINK = flink;
export const excludedService = [
MlModelServiceType.Sklearn,
MetadataServiceType.MetadataES,
Expand Down Expand Up @@ -393,6 +395,7 @@ export const BETA_SERVICES = [
PipelineServiceType.Spline,
PipelineServiceType.Spark,
PipelineServiceType.OpenLineage,
PipelineServiceType.Flink,
DashboardServiceType.QlikSense,
DashboardServiceType.QlikCloud,
DatabaseServiceType.Couchbase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/d
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import flinkConnection from '../jsons/connectionSchemas/connections/pipeline/flinkConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import KafkaConnectConnection from '../jsons/connectionSchemas/connections/pipeline/kafkaConnectConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
Expand Down Expand Up @@ -92,7 +93,11 @@ export const getPipelineConfig = (type: PipelineServiceType) => {

break;
}
case PipelineServiceType.Flink: {
schema = flinkConnection;

break;
}
default:
break;
}
Expand Down

0 comments on commit 4cea985

Please sign in to comment.