Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions examples/typescript/watch/watch-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ kc.loadFromDefault();
const watch = new k8s.Watch(kc);
watch.watch('/api/v1/namespaces',
// optional query parameters can go here.
{},
{
allowWatchBookmarks: true,
},
// callback is called for each received object.
(type, obj) => {
(type, apiObj, watchObj) => {
if (type === 'ADDED') {
// tslint:disable-next-line:no-console
console.log('new object:');
Expand All @@ -18,12 +20,15 @@ watch.watch('/api/v1/namespaces',
} else if (type === 'DELETED') {
// tslint:disable-next-line:no-console
console.log('deleted object:');
} else if (type === 'BOOKMARK') {
// tslint:disable-next-line:no-console
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
} else {
// tslint:disable-next-line:no-console
console.log('unknown type: ' + type);
}
// tslint:disable-next-line:no-console
console.log(obj);
console.log(apiObj);
},
// done callback is called if the watch terminates normally
(err) => {
Expand Down
17 changes: 16 additions & 1 deletion src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ObjectCache<T> {

export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
private objects: T[] = [];
private resourceVersion: string;
private readonly indexCache: { [key: string]: T[] } = {};
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};

Expand All @@ -24,6 +25,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
this.callbackCache[UPDATE] = [];
this.callbackCache[DELETE] = [];
this.callbackCache[ERROR] = [];
this.resourceVersion = '';
if (autoStart) {
this.doneHandler(null);
}
Expand Down Expand Up @@ -68,11 +70,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
return this.indexCache[namespace] as ReadonlyArray<T>;
}

public latestResourceVersion(): string {
return this.resourceVersion;
}

private async doneHandler(err: any) {
if (err) {
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
return;
}
// TODO: Don't always list here for efficiency
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
// Or if resourceVersion is empty.
const promise = this.listFn();
const result = await promise;
const list = result.body;
Expand Down Expand Up @@ -109,7 +118,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
addOrUpdateObject(namespaceList, obj);
}

private watchHandler(phase: string, obj: T) {
private watchHandler(phase: string, obj: T, watchObj?: any) {
switch (phase) {
case 'ADDED':
case 'MODIFIED':
Expand All @@ -132,6 +141,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
}
}
break;
case 'BOOKMARK':
// nothing to do, here for documentation, mostly.
break;
}
if (watchObj && watchObj.metadata) {
this.resourceVersion = watchObj.metadata.resourceVersion;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class Watch {
public async watch(
path: string,
queryParams: any,
callback: (phase: string, obj: any) => void,
callback: (phase: string, apiObj: any, watchObj?: any) => void,
done: (err: any) => void,
): Promise<any> {
const cluster = this.config.getCurrentCluster();
Expand All @@ -60,7 +60,7 @@ export class Watch {
stream.on('data', (line) => {
try {
const data = JSON.parse(line);
callback(data.type, data.object);
callback(data.type, data.object, data);
} catch (ignore) {
// ignore parse errors
}
Expand Down