From 74c7773ca40decfd0d4ed40dc93a6af591bbc190 Mon Sep 17 00:00:00 2001 From: James Liu <37026441+zijianjoy@users.noreply.github.com> Date: Thu, 7 Oct 2021 00:55:22 -0700 Subject: [PATCH] feat(frontend): Runtime DAG in RunDetailsV2. Fix #6673 (#6694) * feat(frontend): Runtime DAG in RunDetailsV2 * remove debug log --- frontend/README.md | 6 + frontend/mock-backend/fixed-data.ts | 93 ++++++++- frontend/mock-backend/mock-api-server.ts | 36 ++++ frontend/src/components/Router.tsx | 42 ++--- frontend/src/lib/v2/DynamicFlow.test.ts | 103 ++++++++++ frontend/src/lib/v2/DynamicFlow.ts | 176 ++++++++++++++++++ frontend/src/lib/v2/StaticFlow.ts | 2 +- frontend/src/mlmd/MlmdUtils.test.ts | 65 +++++++ frontend/src/mlmd/MlmdUtils.ts | 53 +++++- frontend/src/pages/PipelineDetailsV2.test.tsx | 4 +- frontend/src/pages/PipelineDetailsV2.tsx | 7 +- frontend/src/pages/RunDetailsRouter.tsx | 84 +++++++++ frontend/src/pages/RunDetailsV2.test.tsx | 153 +++++++++++++++ frontend/src/pages/RunDetailsV2.tsx | 136 ++++++++++++++ .../v2/{StaticCanvas.tsx => DagCanvas.tsx} | 25 ++- 15 files changed, 946 insertions(+), 39 deletions(-) create mode 100644 frontend/src/lib/v2/DynamicFlow.test.ts create mode 100644 frontend/src/lib/v2/DynamicFlow.ts create mode 100644 frontend/src/pages/RunDetailsRouter.tsx create mode 100644 frontend/src/pages/RunDetailsV2.test.tsx create mode 100644 frontend/src/pages/RunDetailsV2.tsx rename frontend/src/pages/v2/{StaticCanvas.tsx => DagCanvas.tsx} (81%) diff --git a/frontend/README.md b/frontend/README.md index d7d16ec6930..1a64dee5867 100644 --- a/frontend/README.md +++ b/frontend/README.md @@ -55,6 +55,12 @@ development. Run `npm run mock:api` to start a mock backend api server handler so it can serve basic api calls with mock data. +If you want to port real MLMD store to be used for mock backend scenario, you can run the following command. Note that a mock MLMD store doesn't exist yet. + +``` +kubectl port-forward svc/metadata-envoy-service 9090:9090 +``` + ### Proxy to a real cluster This requires you already have a real KFP cluster, you can proxy requests to it. diff --git a/frontend/mock-backend/fixed-data.ts b/frontend/mock-backend/fixed-data.ts index 72f7a1027d6..98e72ef3b90 100644 --- a/frontend/mock-backend/fixed-data.ts +++ b/frontend/mock-backend/fixed-data.ts @@ -16,14 +16,16 @@ import { ApiExperiment } from '../src/apis/experiment'; import { ApiJob } from '../src/apis/job'; import { ApiPipeline, ApiPipelineVersion } from '../src/apis/pipeline'; import { ApiRelationship, ApiResourceType, ApiRunDetail, RunMetricFormat } from '../src/apis/run'; +import v2_lightweight_python_pipeline from './data/v2/pipeline/mock_lightweight_python_functions_v2_pipeline.json'; +import xgboost_sample_pipeline from './data/v2/pipeline/xgboost_sample_pipeline.json'; import helloWorldRun from './hello-world-runtime'; import helloWorldWithStepsRun from './hello-world-with-steps-runtime'; import jsonRun from './json-runtime'; +import largeGraph from './large-graph-runtime'; import coinflipRun from './mock-coinflip-runtime'; import errorRun from './mock-error-runtime'; -import xgboostRun from './mock-xgboost-runtime'; -import largeGraph from './large-graph-runtime'; import retryRun from './mock-retry-runtime'; +import xgboostRun from './mock-xgboost-runtime'; function padStartTwoZeroes(str: string): string { let padded = str || ''; @@ -359,6 +361,11 @@ const jobs: ApiJob[] = [ jobs.push(...generateNJobs()); const experiments: ApiExperiment[] = [ + { + description: 'This experiment includes KFP v2 runs', + id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b', + name: 'KFP v2 Runs', + }, { description: 'This experiment has no runs', id: '7fc01714-4a13-4c05-5902-a8a72c14253b', @@ -408,6 +415,88 @@ const versions: ApiPipelineVersion[] = [ ]; const runs: ApiRunDetail[] = [ + { + pipeline_runtime: { + // workflow_manifest: JSON.stringify(coinflipRun), + }, + run: { + created_at: new Date('2021-05-17T20:58:23.000Z'), + description: 'V2 xgboost', + finished_at: new Date('2021-05-18T21:01:23.000Z'), + id: 'e0115ac1-0479-4194-a22d-01e65e09a32b', + name: 'v2-xgboost-ilbo', + pipeline_spec: { + pipeline_id: PIPELINE_V2_XGBOOST.id, + pipeline_name: PIPELINE_V2_XGBOOST_DEFAULT.name, + workflow_manifest: JSON.stringify(xgboost_sample_pipeline), + }, + resource_references: [ + { + key: { + id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b', + type: ApiResourceType.EXPERIMENT, + }, + relationship: ApiRelationship.OWNER, + }, + ], + scheduled_at: new Date('2021-05-17T20:58:23.000Z'), + status: 'Succeeded', + }, + }, + { + pipeline_runtime: { + // workflow_manifest: JSON.stringify(coinflipRun), + }, + run: { + created_at: new Date('2021-04-17T20:58:23.000Z'), + description: 'V2 two steps run from pipeline template', + finished_at: new Date('2021-04-18T21:01:23.000Z'), + id: 'c1e11ff7-e1af-4a8d-a9e4-718f32934ae0', + name: 'v2-lightweight-two-steps-i5jk', + pipeline_spec: { + pipeline_id: PIPELINE_V2_PYTHON_TWO_STEPS_DEFAULT.id, + pipeline_name: PIPELINE_V2_PYTHON_TWO_STEPS_DEFAULT.name, + workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline), + }, + resource_references: [ + { + key: { + id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b', + type: ApiResourceType.EXPERIMENT, + }, + relationship: ApiRelationship.OWNER, + }, + ], + scheduled_at: new Date('2021-04-17T20:58:23.000Z'), + status: 'Succeeded', + }, + }, + { + pipeline_runtime: { + // workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline), + }, + run: { + created_at: new Date('2021-03-17T20:58:23.000Z'), + description: 'V2 two steps run from SDK', + finished_at: new Date('2021-03-18T21:01:23.000Z'), + id: '3308d0ec-f1b3-4488-a2d3-8ad0f91e88f8', + name: 'v2-lightweight-two-steps-jk4u', + pipeline_spec: { + workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline), + }, + resource_references: [ + { + key: { + id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b', + type: ApiResourceType.EXPERIMENT, + }, + relationship: ApiRelationship.OWNER, + }, + ], + scheduled_at: new Date('2021-03-17T20:58:23.000Z'), + status: 'Succeeded', + }, + }, { pipeline_runtime: { workflow_manifest: JSON.stringify(coinflipRun), diff --git a/frontend/mock-backend/mock-api-server.ts b/frontend/mock-backend/mock-api-server.ts index d51dd953295..84718ca88e4 100644 --- a/frontend/mock-backend/mock-api-server.ts +++ b/frontend/mock-backend/mock-api-server.ts @@ -13,6 +13,7 @@ // limitations under the License. import express from 'express'; +import proxy from 'http-proxy-middleware'; import mockApiMiddleware from './mock-api-middleware'; const app = express(); @@ -28,9 +29,44 @@ app.use((_: any, res: any, next: any) => { next(); }); +export const HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS = { + Connection: 'keep-alive', +}; + +// To enable porting MLMD to mock backend, run following command: +// kubectl port-forward svc/metadata-envoy-service 9090:9090 +/** Proxy metadata requests to the Envoy instance which will handle routing to the metadata gRPC server */ +app.all( + '/ml_metadata.*', + proxy({ + changeOrigin: true, + onProxyReq: proxyReq => { + console.log('Metadata proxied request: ', (proxyReq as any).path); + }, + headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS, + target: getAddress({ host: 'localhost', port: '9090' }), + }), +); + mockApiMiddleware(app as any); app.listen(port, () => { // tslint:disable-next-line:no-console console.log('Server listening at http://localhost:' + port); }); + +export function getAddress({ + host, + port, + namespace, + schema = 'http', +}: { + host: string; + port?: string | number; + namespace?: string; + schema?: string; +}) { + namespace = namespace ? `.${namespace}` : ''; + port = port ? `:${port}` : ''; + return `${schema}://${host}${namespace}${port}`; +} diff --git a/frontend/src/components/Router.tsx b/frontend/src/components/Router.tsx index 92e13710600..27c076f6d94 100644 --- a/frontend/src/components/Router.tsx +++ b/frontend/src/components/Router.tsx @@ -14,41 +14,41 @@ * limitations under the License. */ -import * as React from 'react'; -import ArtifactList from '../pages/ArtifactList'; -import ArtifactDetails from '../pages/ArtifactDetails'; -import Banner, { BannerProps } from '../components/Banner'; import Button from '@material-ui/core/Button'; -import Compare from '../pages/Compare'; import Dialog from '@material-ui/core/Dialog'; import DialogActions from '@material-ui/core/DialogActions'; import DialogContent from '@material-ui/core/DialogContent'; import DialogTitle from '@material-ui/core/DialogTitle'; -import ExecutionList from '../pages/ExecutionList'; -import ExecutionDetails from '../pages/ExecutionDetails'; -import ExperimentDetails from '../pages/ExperimentDetails'; +import Snackbar, { SnackbarProps } from '@material-ui/core/Snackbar'; +import * as React from 'react'; +import { Redirect, Route, Switch } from 'react-router-dom'; +import FrontendFeatures from 'src/pages/FrontendFeatures'; +import RunDetailsRouter from 'src/pages/RunDetailsRouter'; +import { classes, stylesheet } from 'typestyle'; +import Banner, { BannerProps } from '../components/Banner'; +import { commonCss } from '../Css'; +import { Deployments, KFP_FLAGS } from '../lib/Flags'; +import Page404 from '../pages/404'; import AllExperimentsAndArchive, { AllExperimentsAndArchiveTab, } from '../pages/AllExperimentsAndArchive'; -import AllRunsAndArchive, { AllRunsAndArchiveTab } from '../pages/AllRunsAndArchive'; import AllRecurringRunsList from '../pages/AllRecurringRunsList'; +import AllRunsAndArchive, { AllRunsAndArchiveTab } from '../pages/AllRunsAndArchive'; +import ArtifactDetails from '../pages/ArtifactDetails'; +import ArtifactList from '../pages/ArtifactList'; +import Compare from '../pages/Compare'; +import ExecutionDetails from '../pages/ExecutionDetails'; +import ExecutionList from '../pages/ExecutionList'; +import ExperimentDetails from '../pages/ExperimentDetails'; +import { GettingStarted } from '../pages/GettingStarted'; import NewExperiment from '../pages/NewExperiment'; +import NewPipelineVersion from '../pages/NewPipelineVersion'; import NewRun from '../pages/NewRun'; -import Page404 from '../pages/404'; import PipelineDetails from '../pages/PipelineDetails'; import PipelineList from '../pages/PipelineList'; import RecurringRunDetails from '../pages/RecurringRunDetails'; -import RunDetails from '../pages/RunDetails'; import SideNav from './SideNav'; -import Snackbar, { SnackbarProps } from '@material-ui/core/Snackbar'; import Toolbar, { ToolbarProps } from './Toolbar'; -import { Route, Switch, Redirect } from 'react-router-dom'; -import { classes, stylesheet } from 'typestyle'; -import { commonCss } from '../Css'; -import NewPipelineVersion from '../pages/NewPipelineVersion'; -import { GettingStarted } from '../pages/GettingStarted'; -import { KFP_FLAGS, Deployments } from '../lib/Flags'; -import FrontendFeatures from 'src/pages/FrontendFeatures'; export type RouteConfig = { path: string; @@ -193,8 +193,8 @@ const Router: React.FC = ({ configs }) => { { path: RoutePage.RUNS, Component: AllRunsAndArchive, view: AllRunsAndArchiveTab.RUNS }, { path: RoutePage.RECURRING_RUNS, Component: AllRecurringRunsList }, { path: RoutePage.RECURRING_RUN_DETAILS, Component: RecurringRunDetails }, - { path: RoutePage.RUN_DETAILS, Component: RunDetails }, - { path: RoutePage.RUN_DETAILS_WITH_EXECUTION, Component: RunDetails }, + { path: RoutePage.RUN_DETAILS, Component: RunDetailsRouter }, + { path: RoutePage.RUN_DETAILS_WITH_EXECUTION, Component: RunDetailsRouter }, { path: RoutePage.COMPARE, Component: Compare }, { path: RoutePage.FRONTEND_FEATURES, Component: FrontendFeatures }, ]; diff --git a/frontend/src/lib/v2/DynamicFlow.test.ts b/frontend/src/lib/v2/DynamicFlow.test.ts new file mode 100644 index 00000000000..7f1b558b9cf --- /dev/null +++ b/frontend/src/lib/v2/DynamicFlow.test.ts @@ -0,0 +1,103 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as TWO_STEP_PIPELINE from 'src/data/test/mock_lightweight_python_functions_v2_pipeline.json'; +import { PipelineSpec } from 'src/generated/pipeline_spec'; +import { ml_pipelines } from 'src/generated/pipeline_spec/pbjs_ml_pipelines'; +import { Artifact, Event, Execution, Value } from 'src/third_party/mlmd'; +import { TASK_NAME_KEY, updateFlowElementsState } from './DynamicFlow'; +import { convertFlowElements } from './StaticFlow'; + +describe('DynamicFlow', () => { + it('update node status based on MLMD', () => { + // Prepare MLMD objects. + const EXECUTION_PREPROCESS = new Execution() + .setId(3) + .setLastKnownState(Execution.State.COMPLETE); + EXECUTION_PREPROCESS.getCustomPropertiesMap().set( + TASK_NAME_KEY, + new Value().setStringValue('preprocess'), + ); + const EXECUTION_TRAIN = new Execution().setId(4).setLastKnownState(Execution.State.FAILED); + EXECUTION_TRAIN.getCustomPropertiesMap().set( + TASK_NAME_KEY, + new Value().setStringValue('train'), + ); + + const ARTIFACT_OUTPUT_DATA_ONE = new Artifact().setId(1).setState(Artifact.State.LIVE); + const ARTIFACT_OUTPUT_DATA_TWO = new Artifact().setId(2).setState(Artifact.State.PENDING); + const ARTIFACT_MODEL = new Artifact().setId(3).setState(Artifact.State.DELETED); + + const EVENT_PREPROCESS_OUTPUT_DATA_ONE = new Event() + .setExecutionId(3) + .setArtifactId(1) + .setType(Event.Type.OUTPUT) + .setPath(new Event.Path().setStepsList([new Event.Path.Step().setKey('output_dataset_one')])); + const EVENT_PREPROCESS_OUTPUT_DATA_TWO = new Event() + .setExecutionId(3) + .setArtifactId(2) + .setType(Event.Type.OUTPUT) + .setPath( + new Event.Path().setStepsList([new Event.Path.Step().setKey('output_dataset_two_path')]), + ); + const EVENT_OUTPUT_DATA_ONE_TRAIN = new Event().setExecutionId(4).setArtifactId(1); + const EVENT_OUTPUT_DATA_TWO_TRAIN = new Event().setExecutionId(4).setArtifactId(2); + const EVENT_TRAIN_MODEL = new Event() + .setExecutionId(4) + .setArtifactId(3) + .setType(Event.Type.OUTPUT) + .setPath(new Event.Path().setStepsList([new Event.Path.Step().setKey('model')])); + + // Converts to static graph first, its type is Elements. + const jsonObject = TWO_STEP_PIPELINE; + const message = ml_pipelines.PipelineSpec.fromObject(jsonObject['pipelineSpec']); + const buffer = ml_pipelines.PipelineSpec.encode(message).finish(); + const pipelineSpec = PipelineSpec.deserializeBinary(buffer); + const graph = convertFlowElements(pipelineSpec); + + // MLMD objects to provide node states. + const executions: Execution[] = [EXECUTION_PREPROCESS, EXECUTION_TRAIN]; + const events: Event[] = [ + EVENT_PREPROCESS_OUTPUT_DATA_ONE, + EVENT_PREPROCESS_OUTPUT_DATA_TWO, + EVENT_OUTPUT_DATA_ONE_TRAIN, + EVENT_OUTPUT_DATA_TWO_TRAIN, + EVENT_TRAIN_MODEL, + ]; + const artifacts: Artifact[] = [ + ARTIFACT_OUTPUT_DATA_ONE, + ARTIFACT_OUTPUT_DATA_TWO, + ARTIFACT_MODEL, + ]; + + updateFlowElementsState(graph, executions, events, artifacts); + for (let element of graph) { + graph + .filter(e => e.id === element.id) + .forEach(e => { + if (e.id === 'task.preprocess') { + expect(e.data.state).toEqual(EXECUTION_PREPROCESS.getLastKnownState()); + } else if (e.id === 'task.train') { + expect(e.data.state).toEqual(EXECUTION_TRAIN.getLastKnownState()); + } else if (e.id === 'artifact.preprocess.output_dataset_one') { + expect(e.data.state).toEqual(ARTIFACT_OUTPUT_DATA_ONE.getState()); + } else if (e.id === 'artifact.preprocess.output_dataset_two_path') { + expect(e.data.state).toEqual(ARTIFACT_OUTPUT_DATA_TWO.getState()); + } else if (e.id === 'artifact.train.model') { + expect(e.data.state).toEqual(ARTIFACT_MODEL.getState()); + } + }); + } + }); +}); diff --git a/frontend/src/lib/v2/DynamicFlow.ts b/frontend/src/lib/v2/DynamicFlow.ts new file mode 100644 index 00000000000..32fc7c198e2 --- /dev/null +++ b/frontend/src/lib/v2/DynamicFlow.ts @@ -0,0 +1,176 @@ +/* + * Copyright 2021 The Kubeflow Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { ArtifactFlowElementData, ExecutionFlowElementData } from 'src/components/graph/Constants'; +import { + getArtifactNodeKey, + getTaskKeyFromNodeKey, + NodeTypeNames, + PipelineFlowElement, +} from 'src/lib/v2/StaticFlow'; +import { getArtifactNameFromEvent } from 'src/mlmd/MlmdUtils'; +import { Artifact, Event, Execution, Value } from 'src/third_party/mlmd'; + +export const TASK_NAME_KEY = 'task_name'; + +// 1. Get the Pipeline Run context using run ID (FOR subDAG, we need to wait for design) +// 2. Fetch all executions by context. Create Map for task_name => Execution +// 3. Fetch all Events by Context. Create Map for OUTPUT events: execution_id => Events +// 5. Fetch all Artifacts by Context. +// 6. Create Map for artifacts: artifact_id => Artifact +// a. For each task in the flowElements, find its execution state. +// b. For each artifact node, get its task name. +// c. get Execution from Map, then get execution_id. +// d. get Events from Map, then get artifact name from path. +// e. for the Event which matches artifact name, get artifact_id. +// f. get Artifact and update the state. + +// Construct ArtifactNodeKey -> Artifact Map +// for each OUTPUT event, get execution id and artifact id +// get execution task_name from Execution map +// get artifact name from Event path +// get Artifact from Artifact map +// set ArtifactNodeKey -> Artifact. +// Elements change to Map node key => node, edge key => edge +// For each node: (DAG execution doesn't have design yet) +// If TASK: +// Find exeuction from using task_name +// Update with execution state +// If ARTIFACT: +// Get task_name and artifact_name +// Get artifact from Master Map +// Update with artifact state +// IF SUBDAG: (Not designed) +// similar to TASK, but needs to determine subDAG type. +// IMPORTANT: All the updates are in-place for PipelineFlowElement. Therefore it is no return value. + +// Questions: +// How to handle DAG state? +// How to handle subDAG input artifacts and parameters? +// How to handle if-condition? and show the state +// How to handle parallel-for? and list of workers. + +export function updateFlowElementsState( + elems: PipelineFlowElement[], + executions: Execution[], + events: Event[], + artifacts: Artifact[], +) { + // IMPORTANT: PipelineFlowElement update is in-place. + const taskNameToExecution = getTaskNameToExecution(executions); + const executionIdToExectuion = getExectuionIdToExecution(executions); + const artifactIdToArtifact = getArtifactIdToArtifact(artifacts); + const artifactNodeKeyToArtifact = getArtifactNodeKeyToArtifact( + events, + executionIdToExectuion, + artifactIdToArtifact, + ); + + for (let elem of elems) { + let updatedElem = elem; + if (NodeTypeNames.EXECUTION === elem.type) { + const taskName = getTaskKeyFromNodeKey(elem.id); + const execution = taskNameToExecution.get(taskName); + (updatedElem.data as ExecutionFlowElementData).state = execution?.getLastKnownState(); + } else if (NodeTypeNames.ARTIFACT === elem.type) { + const artifact = artifactNodeKeyToArtifact.get(elem.id); + (updatedElem.data as ArtifactFlowElementData).state = artifact?.getState(); + } else if (NodeTypeNames.SUB_DAG === elem.type) { + // TODO: Update sub-dag state based on future design. + } else { + // Edges don't have types yet. + // For any element that don't match the above types, copy over directly. + } + } +} + +function getTaskNameToExecution(executions: Execution[]): Map { + const map = new Map(); + for (let exec of executions) { + const taskName = getTaskName(exec); + if (!taskName) { + continue; + } + map.set(taskName.getStringValue(), exec); + } + return map; +} + +function getExectuionIdToExecution(executions: Execution[]): Map { + const map = new Map(); + for (let exec of executions) { + map.set(exec.getId(), exec); + } + return map; +} + +function getArtifactIdToArtifact(artifacts: Artifact[]): Map { + const map = new Map(); + for (let artifact of artifacts) { + map.set(artifact.getId(), artifact); + } + return map; +} + +function getArtifactNodeKeyToArtifact( + events: Event[], + executionIdToExectuion: Map, + artifactIdToArtifact: Map, +): Map { + const map = new Map(); + const outputEvents = events.filter(event => event.getType() === Event.Type.OUTPUT); + for (let event of outputEvents) { + const executionId = event.getExecutionId(); + const execution = executionIdToExectuion.get(executionId); + if (!execution) { + console.warn("Execution doesn't exist for ID " + executionId); + continue; + } + const taskName = getTaskName(execution); + if (!taskName) { + continue; + } + const artifactId = event.getArtifactId(); + const artifact = artifactIdToArtifact.get(artifactId); + if (!artifact) { + console.warn("Artifact doesn't exist for ID " + artifactId); + continue; + } + const artifactName = getArtifactNameFromEvent(event); + if (!artifactName) { + console.warn("Artifact name doesn't exist in Event. Artifact ID " + artifactId); + continue; + } + const key = getArtifactNodeKey(taskName.getStringValue(), artifactName); + map.set(key, artifact); + } + return map; +} + +function getTaskName(exec: Execution): Value | undefined { + const customProperties = exec.getCustomPropertiesMap(); + if (!customProperties.has(TASK_NAME_KEY)) { + console.warn("task_name key doesn't exist for custom properties of Execution " + exec.getId()); + return undefined; + } + const taskName = customProperties.get(TASK_NAME_KEY); + if (!taskName) { + console.warn( + "task_name value doesn't exist for custom properties of Execution " + exec.getId(), + ); + return undefined; + } + return taskName; +} diff --git a/frontend/src/lib/v2/StaticFlow.ts b/frontend/src/lib/v2/StaticFlow.ts index 9d86850ca18..b187bcc0180 100644 --- a/frontend/src/lib/v2/StaticFlow.ts +++ b/frontend/src/lib/v2/StaticFlow.ts @@ -398,7 +398,7 @@ export function isTaskNode(nodeKey: string) { } const ARTIFACT_NODE_KEY_PREFIX = 'artifact.'; -function getArtifactNodeKey(taskKey: string, artifactKey: string): string { +export function getArtifactNodeKey(taskKey: string, artifactKey: string): string { // id is in pattern artifact.producerTaskKey.outputArtifactKey // Because task name and artifact name cannot contain dot in python. return ARTIFACT_NODE_KEY_PREFIX + taskKey + '.' + artifactKey; diff --git a/frontend/src/mlmd/MlmdUtils.test.ts b/frontend/src/mlmd/MlmdUtils.test.ts index 7bc2dde0bf9..a1d921a7682 100644 --- a/frontend/src/mlmd/MlmdUtils.test.ts +++ b/frontend/src/mlmd/MlmdUtils.test.ts @@ -19,8 +19,11 @@ import { EXECUTION_KEY_CACHED_EXECUTION_ID, filterLinkedArtifactsByType, getArtifactName, + getArtifactNameFromEvent, getContextByExecution, getRunContext, + getArtifactsFromContext, + getEventsByExecutions, } from 'src/mlmd/MlmdUtils'; import { expectWarnings, testBestPractices } from 'src/TestUtils'; import { @@ -34,10 +37,14 @@ import { GetContextByTypeAndNameResponse, } from 'src/third_party/mlmd'; import { + GetArtifactsByContextRequest, + GetArtifactsByContextResponse, GetContextsByExecutionRequest, GetContextsByExecutionResponse, GetContextTypeRequest, GetContextTypeResponse, + GetEventsByExecutionIDsRequest, + GetEventsByExecutionIDsResponse, } from 'src/third_party/mlmd/generated/ml_metadata/proto/metadata_store_service_pb'; import { Workflow, WorkflowSpec, WorkflowStatus } from 'third_party/argo-ui/argo_template'; @@ -158,6 +165,39 @@ describe('MlmdUtils', () => { }); }); + describe('getArtifactNameFromEvent', () => { + it('get the first key of steps list', () => { + const path = new Event.Path(); + path.getStepsList().push(new Event.Path.Step().setKey('key1')); + path.getStepsList().push(new Event.Path.Step().setKey('key2')); + const event = new Event(); + event.setPath(path); + expect(getArtifactNameFromEvent(event)).toEqual('key1'); + }); + }); + + describe('getActifactsFromContext', () => { + it('returns list of artifacts', async () => { + const context = new Context(); + context.setId(2); + const artifacts = [new Artifact().setId(10), new Artifact().setId(20)]; + mockGetArtifactsByContext(context, artifacts); + const artifactResult = await getArtifactsFromContext(context); + expect(artifactResult).toEqual(artifacts); + }); + }); + + describe('getEventsByExecutions', () => { + it('returns list of events', async () => { + const executions = [new Execution().setId(1), new Execution().setId(2)]; + const events = [new Event().setExecutionId(1), new Event().setExecutionId(2)]; + + mockGetEventsByExecutions(executions, events); + const eventsResult = await getEventsByExecutions(executions); + expect(eventsResult).toEqual(events); + }); + }); + describe('filterLinkedArtifactsByType', () => { it('filter input artifacts', () => { const artifactTypeName = 'INPUT'; @@ -224,3 +264,28 @@ function mockGetContextByTypeAndName(contexts: Context[]) { return response; }); } + +function mockGetArtifactsByContext(context: Context, artifacts: Artifact[]) { + jest + .spyOn(Api.getInstance().metadataStoreService, 'getArtifactsByContext') + .mockImplementation((req: GetArtifactsByContextRequest) => { + const response = new GetArtifactsByContextResponse(); + if (req.getContextId() === context.getId()) { + response.setArtifactsList(artifacts); + } + return response; + }); +} + +function mockGetEventsByExecutions(executions: Execution[], events: Event[]) { + jest + .spyOn(Api.getInstance().metadataStoreService, 'getEventsByExecutionIDs') + .mockImplementation((req: GetEventsByExecutionIDsRequest) => { + const response = new GetEventsByExecutionIDsResponse(); + const executionIds = executions.map(e => e.getId()); + if (req.getExecutionIdsList().every((val, index) => val === executionIds[index])) { + response.setEventsList(events); + } + return response; + }); +} diff --git a/frontend/src/mlmd/MlmdUtils.ts b/frontend/src/mlmd/MlmdUtils.ts index 40d5ef6baa3..b9bf2f475a4 100644 --- a/frontend/src/mlmd/MlmdUtils.ts +++ b/frontend/src/mlmd/MlmdUtils.ts @@ -43,6 +43,7 @@ import { GetExecutionsByContextRequest, } from 'src/third_party/mlmd'; import { + GetArtifactsByContextRequest, GetContextsByExecutionRequest, GetContextsByExecutionResponse, GetContextTypeRequest, @@ -92,7 +93,7 @@ async function getKfpRunContext(argoWorkflowName: string): Promise { return await getContext({ name: argoWorkflowName, type: 'KfpRun' }); } -async function getKfpV2RunContext(runID: string): Promise { +export async function getKfpV2RunContext(runID: string): Promise { return await getContext({ name: runID, type: KFP_V2_RUN_CONTEXT_TYPE }); } @@ -205,7 +206,7 @@ function getStringValue(value?: string | number | Struct | null): string | undef return value; } -async function getEventByExecution(execution: Execution): Promise { +export async function getEventByExecution(execution: Execution): Promise { const executionId = execution.getId(); if (!executionId) { throw new Error('Execution must have an ID'); @@ -373,8 +374,54 @@ export function filterLinkedArtifactsByType( } export function getArtifactName(linkedArtifact: LinkedArtifact): string | undefined { - return linkedArtifact.event + return getArtifactNameFromEvent(linkedArtifact.event); +} + +export function getArtifactNameFromEvent(event: Event): string | undefined { + return event .getPath() ?.getStepsList()[0] .getKey(); } + +export async function getArtifactsFromContext(context: Context): Promise { + const request = new GetArtifactsByContextRequest(); + request.setContextId(context.getId()); + try { + const res = await Api.getInstance().metadataStoreService.getArtifactsByContext(request); + const list = res.getArtifactsList(); + if (list == null) { + throw new Error('response.getExecutionsList() is empty'); + } + // Display name of artifact exists in getCustomPropertiesMap().get('display_name').getStringValue(). + // Note that the actual artifact name is in Event which generates this artifact. + return list; + } catch (err) { + err.message = + `Cannot find executions by context ${context.getId()} with name ${context.getName()}: ` + + err.message; + throw err; + } +} + +export async function getEventsByExecutions(executions: Execution[] | undefined): Promise { + if (!executions) { + return []; + } + const request = new GetEventsByExecutionIDsRequest(); + for (let exec of executions) { + const execId = exec.getId(); + if (!execId) { + throw new Error('Execution must have an ID'); + } + request.addExecutionIds(execId); + } + let response: GetEventsByExecutionIDsResponse; + try { + response = await Api.getInstance().metadataStoreService.getEventsByExecutionIDs(request); + } catch (err) { + err.message = 'Failed to getEventsByExecutionIDs: ' + err.message; + throw err; + } + return response.getEventsList(); +} diff --git a/frontend/src/pages/PipelineDetailsV2.test.tsx b/frontend/src/pages/PipelineDetailsV2.test.tsx index 00c69066c85..e91f9ffd790 100644 --- a/frontend/src/pages/PipelineDetailsV2.test.tsx +++ b/frontend/src/pages/PipelineDetailsV2.test.tsx @@ -45,7 +45,7 @@ describe('PipelineDetailsV2', () => { > , ); - expect(screen.getByTestId('StaticCanvas')).not.toBeNull(); + expect(screen.getByTestId('DagCanvas')).not.toBeNull(); }); it('Render summary card', async () => { @@ -91,7 +91,7 @@ describe('PipelineDetailsV2', () => { > , ); - expect(screen.getByTestId('StaticCanvas')).not.toBeNull(); + expect(screen.getByTestId('DagCanvas')).not.toBeNull(); screen.getByText('flip-coin-op'); }); diff --git a/frontend/src/pages/PipelineDetailsV2.tsx b/frontend/src/pages/PipelineDetailsV2.tsx index 5959895a9a2..aecf3ddab29 100644 --- a/frontend/src/pages/PipelineDetailsV2.tsx +++ b/frontend/src/pages/PipelineDetailsV2.tsx @@ -25,7 +25,7 @@ import { StaticNodeDetailsV2 } from 'src/components/tabs/StaticNodeDetailsV2'; import { isSafari } from 'src/lib/Utils'; import { PipelineFlowElement } from 'src/lib/v2/StaticFlow'; import { commonCss, padding } from '../Css'; -import StaticCanvas from './v2/StaticCanvas'; +import DagCanvas from './v2/DagCanvas'; const TAB_NAMES = ['Graph', 'Pipeline Spec']; @@ -83,12 +83,13 @@ function PipelineDetailsV2({ {selectedTab === 0 && (
- + setFlowElements={() => {}} + > ( + ['run_detail', { id: runId }], + () => Apis.runServiceApi.getRun(runId), + { staleTime: 30000 }, + ); + + if ( + isSuccess && + data && + data.run && + data.run.pipeline_spec && + data.run.pipeline_spec.workflow_manifest + ) { + // TODO(zijianjoy): We need to switch to use pipeline_manifest for new API implementation. + const isIR = isIrPipeline(data.run.pipeline_spec.workflow_manifest); + if (isIR) { + return ( + + ); + } + } + + return ; +} + +// This needs to be changed to use pipeline_manifest vs workflow_manifest to distinguish V1 and V2. +function isIrPipeline(templateString: string) { + if (!templateString) { + return false; + } + try { + const template = jsyaml.safeLoad(templateString); + if (WorkflowUtils.isArgoWorkflowTemplate(template)) { + StaticGraphParser.createGraph(template!); + return false; + } else if (isFeatureEnabled(FeatureKey.V2)) { + const pipelineSpec = WorkflowUtils.convertJsonToV2PipelineSpec(templateString); + convertFlowElements(pipelineSpec); + return true; + } else { + return false; + } + } catch (err) { + return false; + } +} diff --git a/frontend/src/pages/RunDetailsV2.test.tsx b/frontend/src/pages/RunDetailsV2.test.tsx new file mode 100644 index 00000000000..d98f778ddce --- /dev/null +++ b/frontend/src/pages/RunDetailsV2.test.tsx @@ -0,0 +1,153 @@ +/* + * Copyright 2021 The Kubeflow Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { render, screen, waitFor } from '@testing-library/react'; +import React from 'react'; +import { RouteParams } from 'src/components/Router'; +import * as v2PipelineSpec from 'src/data/test/mock_lightweight_python_functions_v2_pipeline.json'; +import { Api } from 'src/mlmd/Api'; +import { KFP_V2_RUN_CONTEXT_TYPE } from 'src/mlmd/MlmdUtils'; +import { mockResizeObserver, testBestPractices } from 'src/TestUtils'; +import { CommonTestWrapper } from 'src/TestWrapper'; +import { + Context, + GetContextByTypeAndNameRequest, + GetContextByTypeAndNameResponse, + GetExecutionsByContextResponse, +} from 'src/third_party/mlmd'; +import { + GetArtifactsByContextResponse, + GetEventsByExecutionIDsResponse, +} from 'src/third_party/mlmd/generated/ml_metadata/proto/metadata_store_service_pb'; +import { PageProps } from './Page'; +import { RunDetailsInternalProps } from './RunDetails'; +import { RunDetailsV2 } from './RunDetailsV2'; + +testBestPractices(); +describe('RunDetailsV2', () => { + const RUN_ID = '1'; + + let updateBannerSpy: any; + let updateDialogSpy: any; + let updateSnackbarSpy: any; + let updateToolbarSpy: any; + let historyPushSpy: any; + + function generateProps(): RunDetailsInternalProps & PageProps { + const pageProps: PageProps = { + history: { push: historyPushSpy } as any, + location: '' as any, + match: { + params: { + [RouteParams.runId]: RUN_ID, + }, + isExact: true, + path: '', + url: '', + }, + toolbarProps: { actions: {}, breadcrumbs: [], pageTitle: '' }, + updateBanner: updateBannerSpy, + updateDialog: updateDialogSpy, + updateSnackbar: updateSnackbarSpy, + updateToolbar: updateToolbarSpy, + }; + return Object.assign(pageProps, { + gkeMetadata: {}, + }); + } + + beforeEach(() => { + mockResizeObserver(); + + updateBannerSpy = jest.fn(); + }); + + it('Render detail page with reactflow', async () => { + render( + + + , + ); + expect(screen.getByTestId('DagCanvas')).not.toBeNull(); + }); + + it('Shows error banner when disconnected from MLMD', async () => { + jest + .spyOn(Api.getInstance().metadataStoreService, 'getContextByTypeAndName') + .mockRejectedValue(new Error('Not connected to MLMD')); + + render( + + + , + ); + + await waitFor(() => + expect(updateBannerSpy).toHaveBeenLastCalledWith( + expect.objectContaining({ + additionalInfo: + 'Cannot find context with {"typeName":"system.PipelineRun","contextName":"1"}: Not connected to MLMD', + message: 'Cannot get MLMD objects from Metadata store.', + mode: 'error', + }), + ), + ); + }); + + it('Shows no banner when connected from MLMD', async () => { + jest + .spyOn(Api.getInstance().metadataStoreService, 'getContextByTypeAndName') + .mockImplementation((request: GetContextByTypeAndNameRequest) => { + const response = new GetContextByTypeAndNameResponse(); + if ( + request.getTypeName() === KFP_V2_RUN_CONTEXT_TYPE && + request.getContextName() === RUN_ID + ) { + response.setContext(new Context()); + } + return response; + }); + jest + .spyOn(Api.getInstance().metadataStoreService, 'getExecutionsByContext') + .mockResolvedValue(new GetExecutionsByContextResponse()); + jest + .spyOn(Api.getInstance().metadataStoreService, 'getArtifactsByContext') + .mockResolvedValue(new GetArtifactsByContextResponse()); + jest + .spyOn(Api.getInstance().metadataStoreService, 'getEventsByExecutionIDs') + .mockResolvedValue(new GetEventsByExecutionIDsResponse()); + + render( + + + , + ); + + await waitFor(() => expect(updateBannerSpy).toHaveBeenLastCalledWith({})); + }); +}); diff --git a/frontend/src/pages/RunDetailsV2.tsx b/frontend/src/pages/RunDetailsV2.tsx new file mode 100644 index 00000000000..ea40dcb53f2 --- /dev/null +++ b/frontend/src/pages/RunDetailsV2.tsx @@ -0,0 +1,136 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as React from 'react'; +import { useState } from 'react'; +import { Elements, FlowElement } from 'react-flow-renderer'; +import { useQuery } from 'react-query'; +import MD2Tabs from 'src/atoms/MD2Tabs'; +import { FlowElementDataBase } from 'src/components/graph/Constants'; +import SidePanel from 'src/components/SidePanel'; +import { commonCss, padding } from 'src/Css'; +import { updateFlowElementsState } from 'src/lib/v2/DynamicFlow'; +import { convertFlowElements } from 'src/lib/v2/StaticFlow'; +import * as WorkflowUtils from 'src/lib/v2/WorkflowUtils'; +import { + getArtifactsFromContext, + getEventsByExecutions, + getExecutionsFromContext, + getKfpV2RunContext, +} from 'src/mlmd/MlmdUtils'; +import { Artifact, Event, Execution } from 'src/third_party/mlmd'; +import { classes } from 'typestyle'; +import { RunDetailsProps } from './RunDetails'; +import DagCanvas from './v2/DagCanvas'; + +interface MlmdPackage { + executions: Execution[]; + artifacts: Artifact[]; + events: Event[]; +} + +interface RunDetailsV2Info { + pipeline_job: string; + runId: string; +} + +export type RunDetailsV2Props = RunDetailsV2Info & RunDetailsProps; + +export function RunDetailsV2(props: RunDetailsV2Props) { + const pipelineJobStr = props.pipeline_job; + const pipelineSpec = WorkflowUtils.convertJsonToV2PipelineSpec(pipelineJobStr); + const elements = convertFlowElements(pipelineSpec); + + const [flowElements, setFlowElements] = useState(elements); + const [layers, setLayers] = useState(['root']); + const [selectedTab, setSelectedTab] = useState(0); + const [selectedNode, setSelectedNode] = useState | null>(null); + + // TODO(zijianjoy): Update elements and states when layers change. + const layerChange = (layers: string[]) => { + setSelectedNode(null); + setLayers(layers); + }; + + const onSelectionChange = (elements: Elements | null) => { + if (!elements || elements?.length === 0) { + setSelectedNode(null); + return; + } + if (elements && elements.length === 1) { + setSelectedNode(elements[0]); + } + }; + + const getNodeName = function(element: FlowElement | null): string { + if (element && element.data && element.data.label) { + return element.data.label; + } + + return 'unknown'; + }; + + // Retrieves MLMD states from the MLMD store. + const { isSuccess, data } = useQuery( + ['mlmd_package', { id: props.runId }], + async () => { + const context = await getKfpV2RunContext(props.runId); + const executions = await getExecutionsFromContext(context); + const artifacts = await getArtifactsFromContext(context); + const events = await getEventsByExecutions(executions); + + return { executions, artifacts, events }; + }, + { + staleTime: 10000, + onError: error => + props.updateBanner({ + message: 'Cannot get MLMD objects from Metadata store.', + additionalInfo: error.message, + mode: 'error', + }), + onSuccess: () => props.updateBanner({}), + }, + ); + + if (isSuccess && data && data.executions && data.events && data.artifacts) { + updateFlowElementsState(flowElements, data.executions, data.events, data.artifacts); + } + + return ( +
+ + {selectedTab === 0 && ( +
+ setFlowElements(elems)} + > + +
+ onSelectionChange(null)} + defaultWidth={'50%'} + > +
+
+ )} +
+ ); +} diff --git a/frontend/src/pages/v2/StaticCanvas.tsx b/frontend/src/pages/v2/DagCanvas.tsx similarity index 81% rename from frontend/src/pages/v2/StaticCanvas.tsx rename to frontend/src/pages/v2/DagCanvas.tsx index e6c85d24978..e14782db410 100644 --- a/frontend/src/pages/v2/StaticCanvas.tsx +++ b/frontend/src/pages/v2/DagCanvas.tsx @@ -28,19 +28,21 @@ import SubDagLayer from 'src/components/graph/SubDagLayer'; import { color } from 'src/Css'; import { getTaskKeyFromNodeKey, NodeTypeNames, NODE_TYPES } from 'src/lib/v2/StaticFlow'; -export interface StaticCanvasProps { +export interface DagCanvasProps { elements: Elements; + setFlowElements: (elements: Elements) => void; + onSelectionChange: (elements: Elements | null) => void; layers: string[]; onLayersUpdate: (layers: string[]) => void; - onSelectionChange: (elements: Elements | null) => void; } -const StaticCanvas = ({ +export default function DagCanvas({ elements, layers, onLayersUpdate, onSelectionChange, -}: StaticCanvasProps) => { + setFlowElements, +}: DagCanvasProps) { const onLoad = (reactFlowInstance: OnLoadParams) => { reactFlowInstance.fitView(); }; @@ -63,7 +65,7 @@ const StaticCanvas = ({ return ( <> -
+
{ + setFlowElements( + elements.map(value => { + if (value.id === node.id) { + return node; + } + return value; + }), + ); + }} > @@ -82,5 +94,4 @@ const StaticCanvas = ({
); -}; -export default StaticCanvas; +}