Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
const promise = this.listFn();
const result = await promise;
const list = result.body;
deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
Object.keys(this.indexCache).forEach((key) => {
const updateObjects = deleteItems(this.indexCache[key], list.items);
if (updateObjects.length !== 0) {
this.indexCache[key] = updateObjects;
} else {
delete this.indexCache[key];
}
});
this.addOrUpdateItems(list.items);
await this.watch.watch(
this.path,
Expand Down
120 changes: 118 additions & 2 deletions src/cache_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ describe('ListWatchCache', () => {
{
metadata: {
name: 'name1',
namespace: 'default',
} as V1ObjectMeta,
} as V1Namespace,
{
metadata: {
name: 'name2',
namespace: 'default',
} as V1ObjectMeta,
} as V1Namespace,
];
Expand All @@ -55,13 +57,32 @@ describe('ListWatchCache', () => {
items: list,
} as V1NamespaceList;

const emptyObj = {
metadata: {
resourceVersion: '123456',
} as V1ListMeta,
items: [
{
metadata: {
name: 'name3',
namespace: 'default',
} as V1ObjectMeta,
} as V1Namespace,
],
} as V1NamespaceList;

var calls = 0;
const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
(resolve, reject) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
if (calls++ === 0) {
resolve({ response: {} as http.IncomingMessage, body: listObj });
} else {
resolve({ response: {} as http.IncomingMessage, body: emptyObj });
}
},
);
};
Expand All @@ -80,25 +101,34 @@ describe('ListWatchCache', () => {
});
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
await promise;
const [pathOut, , watchHandler] = mock.capture(fakeWatch.watch).last();
const [pathOut, , watchHandler, doneHandler] = mock.capture(fakeWatch.watch).last();
expect(pathOut).to.equal('/some/path');
expect(cache.list()).to.deep.equal(list);

expect(cache.get('name1')).to.equal(list[0]);
expect(cache.get('name2')).to.equal(list[1]);

expect(cache.list('default')).to.deep.equal(list);
expect(cache.list('non-existent')).to.be.undefined;

watchHandler('ADDED', {
metadata: {
name: 'name3',
namespace: 'other',
} as V1ObjectMeta,
} as V1Namespace);

expect(cache.list().length).to.equal(3);
expect(cache.get('name3')).to.not.equal(null);

expect(cache.list('default').length).to.equal(2);
expect(cache.list('other').length).to.equal(1);
expect(cache.list('non-existent')).to.be.undefined;

watchHandler('MODIFIED', {
metadata: {
name: 'name3',
namespace: 'other',
resourceVersion: 'baz',
} as V1ObjectMeta,
} as V1Namespace);
Expand All @@ -113,10 +143,26 @@ describe('ListWatchCache', () => {
watchHandler('DELETED', {
metadata: {
name: 'name2',
namespace: 'default',
} as V1ObjectMeta,
} as V1Namespace);
expect(cache.list().length).to.equal(2);
expect(cache.get('name2')).to.equal(undefined);

expect(cache.list('default').length).to.equal(1);
expect(cache.list('other').length).to.equal(1);

watchHandler('ADDED', {
metadata: {
name: 'name2',
namespace: 'default',
} as V1ObjectMeta,
} as V1Namespace);

await doneHandler();
expect(cache.list().length, 'all namespace list').to.equal(1);
expect(cache.list('default').length, 'default namespace list').to.equal(1);
expect(cache.list('other'), 'other namespace list').to.be.undefined;
});

it('should perform work as an informer', async () => {
Expand Down Expand Up @@ -754,6 +800,76 @@ describe('ListWatchCache', () => {
await startPromise;
expect(cache.list().length).to.equal(2);
});

it('should only call update handlers once', async () => {
const fakeWatch = mock.mock(Watch);
const list: V1Namespace[] = [];
const listObj = {
metadata: {
resourceVersion: '12345',
} as V1ListMeta,
items: list,
} as V1NamespaceList;
const listFn: ListPromise<V1Namespace> = function(): Promise<{
response: http.IncomingMessage;
body: V1NamespaceList;
}> {
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
resolve({ response: {} as http.IncomingMessage, body: listObj });
});
};
const watchCalled = new Promise((resolve) => {
mock.when(
fakeWatch.watch(
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
mock.anything(),
),
).thenCall(resolve);
});
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

const addedList1: V1Namespace[] = [];
const addToList1Fn = function(obj: V1Namespace) {
addedList1.push(obj);
};
const addedList2: V1Namespace[] = [];
const addToList2Fn = function(obj: V1Namespace) {
addedList2.push(obj);
};

informer.start();

await watchCalled;
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();

let adds = 0;
informer.on(ADD, () => adds++);
informer.on(ADD, addToList1Fn);
informer.on(ADD, addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name1',
namespace: 'something',
} as V1ObjectMeta,
} as V1Namespace);

informer.off(ADD, addToList2Fn);

watchHandler('ADDED', {
metadata: {
name: 'name2',
namespace: 'something',
} as V1ObjectMeta,
} as V1Namespace);

expect(adds).to.equal(2);
expect(addedList1.length).to.equal(2);
expect(addedList2.length).to.equal(1);
});
});

describe('delete items', () => {
Expand Down