From e5cd9e673250ab2ae41b874416eef4072f11c22b Mon Sep 17 00:00:00 2001 From: Peter <60992304+pleeko@users.noreply.github.com> Date: Tue, 4 Oct 2022 13:05:28 -0400 Subject: [PATCH] Add filter to listDatums (#120) * add filter to listDatum * bump version * pr feedback --- package.json | 2 +- src/lib/types.ts | 2 + src/services/__tests__/pps.test.ts | 85 ++++++++++++++++++++++++++++-- src/services/pps.ts | 19 ++++--- version.json | 2 +- 5 files changed, 96 insertions(+), 14 deletions(-) diff --git a/package.json b/package.json index 6938503..a836f46 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@pachyderm/node-pachyderm", - "version": "0.32.0", + "version": "0.32.1", "description": "node client for pachyderm", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/src/lib/types.ts b/src/lib/types.ts index a084d2f..b53ef2f 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -21,6 +21,7 @@ import { StartCommitRequest, SubscribeCommitRequest, } from '../proto/pfs/pfs_pb'; +import {DatumState} from '../proto/pps/pps_pb'; export interface GRPCPlugin { onCall?: (args: {requestName: string}) => void; @@ -138,6 +139,7 @@ export type InspectDatumRequestArgs = { export type ListDatumsRequestArgs = { jobId: string; pipelineName: string; + filter?: DatumState[]; }; export type RenewFileSetRequestArgs = { diff --git a/src/services/__tests__/pps.test.ts b/src/services/__tests__/pps.test.ts index 6b694c5..de5cd74 100644 --- a/src/services/__tests__/pps.test.ts +++ b/src/services/__tests__/pps.test.ts @@ -165,7 +165,7 @@ describe('services/pps', () => { describe('listDatums + inspectDatum', () => { jest.setTimeout(60000); - it('should list datums for a pipeline job', async () => { + it('should inspect a datum for a pipeline job', async () => { const {pachClient, inputRepoName} = await createSandBox('listDatums'); const commit = await pachClient.pfs().startCommit({ branch: {name: 'master', repo: {name: inputRepoName}}, @@ -197,9 +197,6 @@ describe('services/pps', () => { }); expect(datums).toHaveLength(1); - expect(datums[0].state).toEqual(DatumState.SUCCESS); - expect(datums[0].dataList[0]?.file?.path).toEqual('/dummyData.csv'); - expect(datums[0].dataList[0]?.sizeBytes).toEqual(5); const datum = await pachClient.pps().inspectDatum({ id: datums[0]?.datum?.id || '', @@ -213,5 +210,85 @@ describe('services/pps', () => { expect(datumObject.dataList[0]?.file?.path).toEqual('/dummyData.csv'); expect(datumObject.dataList[0]?.sizeBytes).toEqual(5); }); + + it('should list datums for a pipeline job', async () => { + const {pachClient, inputRepoName} = await createSandBox('listDatums'); + const commit = await pachClient.pfs().startCommit({ + branch: {name: 'master', repo: {name: inputRepoName}}, + }); + + const fileClient = await pachClient.pfs().modifyFile(); + + await fileClient + .setCommit(commit) + .putFileFromBytes('dummyData.csv', Buffer.from('a,b,c')) + .end(); + + await pachClient.pfs().finishCommit({commit}); + const jobs = await pachClient.pps().listJobs(); + + const jobId = jobs[0]?.job?.id; + expect(jobId).toBeDefined(); + + await pachClient.pps().inspectJob({ + id: jobId || '', + pipelineName: 'listDatums', + wait: true, + projectId: 'default', + }); + + const datums = await pachClient.pps().listDatums({ + jobId: jobId || '', + pipelineName: 'listDatums', + }); + + expect(datums).toHaveLength(1); + expect(datums[0].state).toEqual(DatumState.SUCCESS); + expect(datums[0].dataList[0]?.file?.path).toEqual('/dummyData.csv'); + expect(datums[0].dataList[0]?.sizeBytes).toEqual(5); + }); + + it('should filter datum list', async () => { + const {pachClient, inputRepoName} = await createSandBox('listDatums'); + const commit = await pachClient.pfs().startCommit({ + branch: {name: 'master', repo: {name: inputRepoName}}, + }); + + const fileClient = await pachClient.pfs().modifyFile(); + + await fileClient + .setCommit(commit) + .putFileFromBytes('dummyData.csv', Buffer.from('a,b,c')) + .end(); + + await pachClient.pfs().finishCommit({commit}); + const jobs = await pachClient.pps().listJobs(); + + const jobId = jobs[0]?.job?.id; + expect(jobId).toBeDefined(); + + await pachClient.pps().inspectJob({ + id: jobId || '', + pipelineName: 'listDatums', + wait: true, + projectId: 'default', + }); + + let datums = await pachClient.pps().listDatums({ + jobId: jobId || '', + pipelineName: 'listDatums', + filter: [DatumState.FAILED], + }); + + expect(datums).toHaveLength(0); + + datums = await pachClient.pps().listDatums({ + jobId: jobId || '', + pipelineName: 'listDatums', + filter: [DatumState.FAILED, DatumState.SUCCESS], + }); + + expect(datums).toHaveLength(1); + }); }); }); diff --git a/src/services/pps.ts b/src/services/pps.ts index d0992f1..161d6e3 100644 --- a/src/services/pps.ts +++ b/src/services/pps.ts @@ -420,16 +420,19 @@ const pps = ({ }); }, - listDatums: ({jobId, pipelineName}: ListDatumsRequestArgs) => { - const stream = client.listDatum( - new ListDatumRequest().setJob( - new Job() - .setId(jobId) - .setPipeline(new Pipeline().setName(pipelineName)), - ), - credentialMetadata, + listDatums: ({jobId, pipelineName, filter}: ListDatumsRequestArgs) => { + const request = new ListDatumRequest().setJob( + new Job() + .setId(jobId) + .setPipeline(new Pipeline().setName(pipelineName)), ); + if (filter) { + request.setFilter(new ListDatumRequest.Filter().setStateList(filter)); + } + + const stream = client.listDatum(request, credentialMetadata); + return streamToObjectArray(stream); }, }; diff --git a/version.json b/version.json index 03f9f83..d685d06 100644 --- a/version.json +++ b/version.json @@ -1,4 +1,4 @@ { - "pachyderm": "2.3.0", + "pachyderm": "2.3.5", "kind": "0.12.0" }