Skip to content

Commit

Permalink
Add filter to listDatums (#120)
Browse files Browse the repository at this point in the history
* add filter to listDatum

* bump version

* pr feedback
  • Loading branch information
pleeko committed Oct 4, 2022
1 parent 7528bed commit e5cd9e6
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 14 deletions.
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "@pachyderm/node-pachyderm",
"version": "0.32.0",
"version": "0.32.1",
"description": "node client for pachyderm",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
2 changes: 2 additions & 0 deletions src/lib/types.ts
Expand Up @@ -21,6 +21,7 @@ import {
StartCommitRequest,
SubscribeCommitRequest,
} from '../proto/pfs/pfs_pb';
import {DatumState} from '../proto/pps/pps_pb';

export interface GRPCPlugin {
onCall?: (args: {requestName: string}) => void;
Expand Down Expand Up @@ -138,6 +139,7 @@ export type InspectDatumRequestArgs = {
export type ListDatumsRequestArgs = {
jobId: string;
pipelineName: string;
filter?: DatumState[];
};

export type RenewFileSetRequestArgs = {
Expand Down
85 changes: 81 additions & 4 deletions src/services/__tests__/pps.test.ts
Expand Up @@ -165,7 +165,7 @@ describe('services/pps', () => {
describe('listDatums + inspectDatum', () => {
jest.setTimeout(60000);

it('should list datums for a pipeline job', async () => {
it('should inspect a datum for a pipeline job', async () => {
const {pachClient, inputRepoName} = await createSandBox('listDatums');
const commit = await pachClient.pfs().startCommit({
branch: {name: 'master', repo: {name: inputRepoName}},
Expand Down Expand Up @@ -197,9 +197,6 @@ describe('services/pps', () => {
});

expect(datums).toHaveLength(1);
expect(datums[0].state).toEqual(DatumState.SUCCESS);
expect(datums[0].dataList[0]?.file?.path).toEqual('/dummyData.csv');
expect(datums[0].dataList[0]?.sizeBytes).toEqual(5);

const datum = await pachClient.pps().inspectDatum({
id: datums[0]?.datum?.id || '',
Expand All @@ -213,5 +210,85 @@ describe('services/pps', () => {
expect(datumObject.dataList[0]?.file?.path).toEqual('/dummyData.csv');
expect(datumObject.dataList[0]?.sizeBytes).toEqual(5);
});

it('should list datums for a pipeline job', async () => {
const {pachClient, inputRepoName} = await createSandBox('listDatums');
const commit = await pachClient.pfs().startCommit({
branch: {name: 'master', repo: {name: inputRepoName}},
});

const fileClient = await pachClient.pfs().modifyFile();

await fileClient
.setCommit(commit)
.putFileFromBytes('dummyData.csv', Buffer.from('a,b,c'))
.end();

await pachClient.pfs().finishCommit({commit});
const jobs = await pachClient.pps().listJobs();

const jobId = jobs[0]?.job?.id;
expect(jobId).toBeDefined();

await pachClient.pps().inspectJob({
id: jobId || '',
pipelineName: 'listDatums',
wait: true,
projectId: 'default',
});

const datums = await pachClient.pps().listDatums({
jobId: jobId || '',
pipelineName: 'listDatums',
});

expect(datums).toHaveLength(1);
expect(datums[0].state).toEqual(DatumState.SUCCESS);
expect(datums[0].dataList[0]?.file?.path).toEqual('/dummyData.csv');
expect(datums[0].dataList[0]?.sizeBytes).toEqual(5);
});

it('should filter datum list', async () => {
const {pachClient, inputRepoName} = await createSandBox('listDatums');
const commit = await pachClient.pfs().startCommit({
branch: {name: 'master', repo: {name: inputRepoName}},
});

const fileClient = await pachClient.pfs().modifyFile();

await fileClient
.setCommit(commit)
.putFileFromBytes('dummyData.csv', Buffer.from('a,b,c'))
.end();

await pachClient.pfs().finishCommit({commit});
const jobs = await pachClient.pps().listJobs();

const jobId = jobs[0]?.job?.id;
expect(jobId).toBeDefined();

await pachClient.pps().inspectJob({
id: jobId || '',
pipelineName: 'listDatums',
wait: true,
projectId: 'default',
});

let datums = await pachClient.pps().listDatums({
jobId: jobId || '',
pipelineName: 'listDatums',
filter: [DatumState.FAILED],
});

expect(datums).toHaveLength(0);

datums = await pachClient.pps().listDatums({
jobId: jobId || '',
pipelineName: 'listDatums',
filter: [DatumState.FAILED, DatumState.SUCCESS],
});

expect(datums).toHaveLength(1);
});
});
});
19 changes: 11 additions & 8 deletions src/services/pps.ts
Expand Up @@ -420,16 +420,19 @@ const pps = ({
});
},

listDatums: ({jobId, pipelineName}: ListDatumsRequestArgs) => {
const stream = client.listDatum(
new ListDatumRequest().setJob(
new Job()
.setId(jobId)
.setPipeline(new Pipeline().setName(pipelineName)),
),
credentialMetadata,
listDatums: ({jobId, pipelineName, filter}: ListDatumsRequestArgs) => {
const request = new ListDatumRequest().setJob(
new Job()
.setId(jobId)
.setPipeline(new Pipeline().setName(pipelineName)),
);

if (filter) {
request.setFilter(new ListDatumRequest.Filter().setStateList(filter));
}

const stream = client.listDatum(request, credentialMetadata);

return streamToObjectArray<DatumInfo, DatumInfo.AsObject>(stream);
},
};
Expand Down
2 changes: 1 addition & 1 deletion version.json
@@ -1,4 +1,4 @@
{
"pachyderm": "2.3.0",
"pachyderm": "2.3.5",
"kind": "0.12.0"
}

0 comments on commit e5cd9e6

Please sign in to comment.