Skip to content

Commit

Permalink
migrate k8s hooks to SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
invincibleJai committed Nov 25, 2021
1 parent 9124162 commit a4014dd
Show file tree
Hide file tree
Showing 31 changed files with 1,332 additions and 1,155 deletions.
@@ -0,0 +1,298 @@
import * as _ from 'lodash';
import { Dispatch } from 'react-redux';
import { ActionType as Action, action } from 'typesafe-actions';
// import { k8sWatch } from '@console/internal/module/k8s/resource';
import { K8sModel, MatchLabels, Selector } from '../../../api/common-types';
import { K8sResourceCommon, FilterValue } from '../../../extensions/console-types';
import { getReferenceForModel } from '../../../utils/k8s/k8s-ref';
import { k8sList, k8sGet } from '../../../utils/k8s/k8s-resource';
import { k8sWatch } from '../../../utils/k8s/k8s-utils';
import { getImpersonate } from '../../core/reducers/coreSelectors';

type K8sResourceKind = K8sResourceCommon & {
spec?: {
selector?: Selector | MatchLabels;
[key: string]: any;
};
status?: { [key: string]: any };
data?: { [key: string]: any };
};

export enum ActionType {
SetResourcesInFlight = 'setResourcesInFlight',
StartWatchK8sObject = 'startWatchK8sObject',
StartWatchK8sList = 'startWatchK8sList',
ModifyObject = 'modifyObject',
StopWatchK8s = 'stopWatchK8s',

Errored = 'errored',
Loaded = 'loaded',
BulkAddToList = 'bulkAddToList',
UpdateListFromWS = 'updateListFromWS',
FilterList = 'filterList',
}

type K8sEvent = { type: 'ADDED' | 'DELETED' | 'MODIFIED'; object: K8sResourceKind };

export const updateListFromWS = (id: string, k8sObjects: K8sEvent[]) =>
action(ActionType.UpdateListFromWS, { id, k8sObjects });
export const loaded = (id: string, k8sObjects: K8sResourceKind | K8sResourceKind[]) =>
action(ActionType.Loaded, { id, k8sObjects });

export const bulkAddToList = (id: string, k8sObjects: K8sResourceKind[]) =>
action(ActionType.BulkAddToList, { id, k8sObjects });

export const setResourcesInFlight = () => action(ActionType.SetResourcesInFlight);
export const startWatchK8sObject = (id: string) => action(ActionType.StartWatchK8sObject, { id });
export const startWatchK8sList = (id: string, query: { [key: string]: string }) =>
action(ActionType.StartWatchK8sList, { id, query });
export const modifyObject = (id: string, k8sObjects: K8sResourceKind) =>
action(ActionType.ModifyObject, { id, k8sObjects });
export const stopWatchK8s = (id: string) => action(ActionType.StopWatchK8s, { id });

export const errored = (id: string, k8sObjects: any) =>
action(ActionType.Errored, { id, k8sObjects });
export const filterList = (id: string, name: string, value: FilterValue) =>
action(ActionType.FilterList, { id, name, value });

// dump
const WS = {} as { [id: string]: WebSocket & any };
const POLLs = {};
const REF_COUNTS = {};

const nop = () => {};
const paginationLimit = 250;

// eslint-disable-next-line consistent-return
export const stopK8sWatch = (id: string) => (dispatch: Dispatch) => {
REF_COUNTS[id] -= 1;
if (REF_COUNTS[id] > 0) {
return nop;
}

const ws = WS[id];
if (ws) {
ws.destroy();
delete WS[id];
}
const poller = POLLs[id];
clearInterval(poller);
delete POLLs[id];
delete REF_COUNTS[id];
dispatch(stopWatchK8s(id));
// return null;
};

export const watchK8sList = (
id: string,
query: { [key: string]: string },
k8skind: K8sModel,
extraAction?,
// eslint-disable-next-line consistent-return
) => (dispatch, getState) => {
// Only one watch per unique list ID
if (id in REF_COUNTS) {
REF_COUNTS[id] += 1;
return nop;
}

dispatch(startWatchK8sList(id, query));
REF_COUNTS[id] = 1;

const incrementallyLoad = async (continueToken = ''): Promise<string> => {
// the list may not still be around...
if (!REF_COUNTS[id]) {
// let .then handle the cleanup
return;
}

const response = await k8sList(
k8skind,
{
limit: paginationLimit,
...query,
...(continueToken ? { continue: continueToken } : {}),
},
true,
);

if (!REF_COUNTS[id]) {
return;
}

if (!continueToken) {
[loaded, extraAction].forEach((f) => f && dispatch(f(id, response.items)));
} else {
dispatch(bulkAddToList(id, response.items));
}

if (response.metadata.continue) {
// eslint-disable-next-line consistent-return
return incrementallyLoad(response.metadata.continue);
}
// eslint-disable-next-line consistent-return
return response.metadata.resourceVersion;
};
/**
* Incrementally fetch list (XHR) using k8s pagination then use its resourceVersion to
* start listening on a WS (?resourceVersion=$resourceVersion)
* start the process over when:
* 1. the WS closes abnormally
* 2. the WS can not establish a connection within $TIMEOUT
*/
const pollAndWatch = async () => {
delete POLLs[id];

try {
const resourceVersion = await incrementallyLoad();
// ensure this watch should still exist because pollAndWatch is recursiveish
if (!REF_COUNTS[id]) {
// eslint-disable-next-line no-console
console.log(`stopped watching ${id} before finishing incremental loading.`);
// call cleanup function out of abundance of caution...
dispatch(stopK8sWatch(id));
return;
}

if (WS[id]) {
// eslint-disable-next-line no-console
console.warn(`Attempted to create multiple websockets for ${id}.`);
return;
}

if (!_.get(k8skind, 'verbs', ['watch']).includes('watch')) {
// eslint-disable-next-line no-console
console.warn(
`${getReferenceForModel(k8skind)} does not support watching, falling back to polling.`,
);
if (!POLLs[id]) {
POLLs[id] = setTimeout(pollAndWatch, 15 * 1000);
}
return;
}

const { subprotocols } = getImpersonate(getState()) || {};
WS[id] = k8sWatch(
k8skind,
{ ...query, resourceVersion },
{ subprotocols, timeout: 60 * 1000 },
);
} catch (e) {
if (!REF_COUNTS[id]) {
// eslint-disable-next-line no-console
console.log(`stopped watching ${id} before finishing incremental loading with error ${e}!`);
// call cleanup function out of abundance of caution...
dispatch(stopK8sWatch(id));
return;
}

dispatch(errored(id, e));

if (!POLLs[id]) {
POLLs[id] = setTimeout(pollAndWatch, 15 * 1000);
}
return;
}

WS[id]
.onclose((event) => {
// Close Frame Status Codes: https://tools.ietf.org/html/rfc6455#section-7.4.1
if (event.code !== 1006) {
return;
}
// eslint-disable-next-line no-console
console.log('WS closed abnormally - starting polling loop over!');
const ws = WS[id];
// const timedOut = true;
ws && ws.destroy();
})
.ondestroy((timedOut) => {
if (!timedOut) {
return;
}
// If the WS is unsucessful for timeout duration, assume it is less work
// to update the entire list and then start the WS again

// eslint-disable-next-line no-console
console.log(`${id} timed out - restarting polling`);
delete WS[id];

if (POLLs[id]) {
return;
}

POLLs[id] = setTimeout(pollAndWatch, 15 * 1000);
})
.onbulkmessage((events) =>
[updateListFromWS, extraAction].forEach((f) => f && dispatch(f(id, events))),
);
};
pollAndWatch();
// return null;
};

// export { watchK8sObject } from '@console/internal/actions/k8s';
export const watchK8sObject = (
id: string,
name: string,
namespace: string,
query: { [key: string]: string },
k8sType: K8sModel,
// eslint-disable-next-line consistent-return
) => (dispatch: Dispatch, getState) => {
if (id in REF_COUNTS) {
REF_COUNTS[id] += 1;
return nop;
}
dispatch(startWatchK8sObject(id));
REF_COUNTS[id] = 1;

if (query.name) {
query.fieldSelector = `metadata.name=${query.name}`;
delete query.name;
}

const poller = () => {
k8sGet(k8sType, name, namespace)
.then(
(o) => dispatch(modifyObject(id, o)),
(e) => dispatch(errored(id, e)),
)
.catch((err) => {
// eslint-disable-next-line no-console
console.log(err);
});
};
POLLs[id] = setInterval(poller, 30 * 1000);
poller();

if (!_.get(k8sType, 'verbs', ['watch']).includes('watch')) {
// eslint-disable-next-line no-console
console.warn(`${getReferenceForModel(k8sType)} does not support watching`);
// eslint-disable-next-line consistent-return
return;
}

const { subprotocols } = getImpersonate(getState()) || {};

WS[id] = k8sWatch(k8sType, query, { subprotocols }).onbulkmessage((events) =>
events.forEach((e) => dispatch(modifyObject(id, e.object))),
);
// return null;
};
// dump

const k8sActions = {
setResourcesInFlight,
startWatchK8sObject,
startWatchK8sList,
modifyObject,
stopWatchK8s,
errored,
loaded,
bulkAddToList,
updateListFromWS,
filterList,
};

export type K8sAction = Action<typeof k8sActions>;

0 comments on commit a4014dd

Please sign in to comment.