From e1a82ec2c027b183ff4468697383daefec9a19fc Mon Sep 17 00:00:00 2001 From: ricoberger Date: Tue, 7 Sep 2021 21:21:47 +0200 Subject: [PATCH] Add option to stream (follow) logs via WebSockets A new option follow was added for the logs of a container. By selecting the "follow" option in the logs modal, we will establish a WebSocket connection. Via this connection we will then stream all incoming logs of the selected container and show them in the terminal. This is like the "-f" option for kubectl. --- CHANGELOG.md | 1 + pkg/api/clusters/cluster/cluster.go | 50 +++++++++++ plugins/resources/resources.go | 51 +++++++++++- .../components/panel/details/actions/Logs.tsx | 82 ++++++++++++++++++- .../panel/details/actions/Terminal.tsx | 4 +- 5 files changed, 182 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ed596d4d0..e77613297 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan - [#131](https://github.com/kobsio/kobs/pull/131): Add chart which shows the distribution of the logs lines in the selected time range for the ClickHouse plugin. - [#132](https://github.com/kobsio/kobs/pull/132): Support the download of log lines in their JSON representation in the ClickHouse and Elasticsearch plugin. - [#136](https://github.com/kobsio/kobs/pull/136): Allow custom order for the returned logs and add `!~` and `_exists_` operator for ClickHouse plugin. +- [#138](https://github.com/kobsio/kobs/pull/138): Add option to stream (follow) logs via WebSockets. ### Fixed diff --git a/pkg/api/clusters/cluster/cluster.go b/pkg/api/clusters/cluster/cluster.go index 1eb497ab3..283575c16 100644 --- a/pkg/api/clusters/cluster/cluster.go +++ b/pkg/api/clusters/cluster/cluster.go @@ -1,6 +1,7 @@ package cluster import ( + "bufio" "context" "encoding/json" "fmt" @@ -245,6 +246,55 @@ func (c *Cluster) GetLogs(ctx context.Context, namespace, name, container, regex return strings.Join(logs, "\n\r") + "\n\r", nil } +// StreamLogs can be used to stream the logs of the selected Container. For that we are using the passed in WebSocket +// connection an write each line returned by the Kubernetes API to this connection. +func (c *Cluster) StreamLogs(ctx context.Context, conn *websocket.Conn, namespace, name, container string, since, tail int64, follow bool) error { + options := &corev1.PodLogOptions{ + Container: container, + SinceSeconds: &since, + Follow: follow, + } + + if tail > 0 { + options.TailLines = &tail + } + + stream, err := c.clientset.CoreV1().Pods(namespace).GetLogs(name, options).Stream(ctx) + if err != nil { + return err + } + + defer stream.Close() + reader := bufio.NewReaderSize(stream, 16) + lastLine := "" + + for { + data, isPrefix, err := reader.ReadLine() + if err != nil { + return err + } + + lines := strings.Split(string(data), "\r") + length := len(lines) + + if len(lastLine) > 0 { + lines[0] = lastLine + lines[0] + lastLine = "" + } + + if isPrefix { + lastLine = lines[length-1] + lines = lines[:(length - 1)] + } + + for _, line := range lines { + if err := conn.WriteMessage(websocket.TextMessage, []byte(line)); err != nil { + return err + } + } + } +} + // GetTerminal starts a new terminal session via the given WebSocket connection. func (c *Cluster) GetTerminal(conn *websocket.Conn, namespace, name, container, shell string) error { reqURL, err := url.Parse(fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/exec?container=%s&command=%s&stdin=true&stdout=true&stderr=true&tty=true", c.config.Host, namespace, name, container, shell)) diff --git a/plugins/resources/resources.go b/plugins/resources/resources.go index 08eef7936..2476e55ce 100644 --- a/plugins/resources/resources.go +++ b/plugins/resources/resources.go @@ -279,8 +279,9 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) { since := r.URL.Query().Get("since") tail := r.URL.Query().Get("tail") previous := r.URL.Query().Get("previous") + follow := r.URL.Query().Get("follow") - log.WithFields(logrus.Fields{"cluster": clusterName, "namespace": namespace, "name": name, "container": container, "regex": regex, "since": since, "previous": previous}).Tracef("getLogs") + log.WithFields(logrus.Fields{"cluster": clusterName, "namespace": namespace, "name": name, "container": container, "regex": regex, "since": since, "previous": previous, "follow": follow}).Tracef("getLogs") cluster := router.clusters.GetCluster(clusterName) if cluster == nil { @@ -306,6 +307,54 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) { return } + parsedFollow, err := strconv.ParseBool(follow) + if err != nil { + errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse follow parameter") + return + } + + // If the parsedFollow parameter was set to true, we stream the logs via an WebSocket connection instead of + // returning a json response. + if parsedFollow { + var upgrader = websocket.Upgrader{} + + if router.config.WebSocket.AllowAllOrigins { + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + } + + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.WithError(err).Errorf("Could not upgrade connection") + return + } + defer c.Close() + + c.SetPongHandler(func(string) error { return nil }) + + go func() { + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } + }() + + err = cluster.StreamLogs(r.Context(), c, namespace, name, container, parsedSince, parsedTail, parsedFollow) + if err != nil { + c.WriteMessage(websocket.TextMessage, []byte("Could not stream logs: "+err.Error())) + return + } + + log.Tracef("Logs stream was closed") + return + } + logs, err := cluster.GetLogs(r.Context(), namespace, name, container, regex, parsedSince, parsedTail, parsedPrevious) if err != nil { errresponse.Render(w, r, err, http.StatusBadGateway, "Could not get logs") diff --git a/plugins/resources/src/components/panel/details/actions/Logs.tsx b/plugins/resources/src/components/panel/details/actions/Logs.tsx index 378c86de7..37cde11ca 100644 --- a/plugins/resources/src/components/panel/details/actions/Logs.tsx +++ b/plugins/resources/src/components/panel/details/actions/Logs.tsx @@ -15,7 +15,14 @@ import { IRow } from '@patternfly/react-table'; import { V1Pod } from '@kubernetes/client-node'; import { Terminal as xTerm } from 'xterm'; -import { IResource, ITerminalContext, TERMINAL_OPTIONS, TerminalsContext } from '@kobsio/plugin-core'; +import { + IPluginsContext, + IResource, + ITerminalContext, + PluginsContext, + TERMINAL_OPTIONS, + TerminalsContext, +} from '@kobsio/plugin-core'; // getContainers returns a list with all container names for the given Pod. It contains all specified init containers // and the "normal" containers. @@ -53,12 +60,65 @@ interface ILogsProps { const Logs: React.FunctionComponent = ({ request, resource, show, setShow }: ILogsProps) => { const containers = getContainers(resource.props); + const pluginsContext = useContext(PluginsContext); const terminalsContext = useContext(TerminalsContext); const [isLoading, setIsLoading] = useState(false); const [container, setContainer] = useState(containers[0]); const [since, setSince] = useState(900); const [regex, setRegex] = useState(''); const [previous, setPrevious] = useState(false); + const [follow, setFollow] = useState(false); + + const streamLogs = async (): Promise => { + setShow(false); + + const term = new xTerm(TERMINAL_OPTIONS); + + try { + const pluginDetails = pluginsContext.getPluginDetails('resources'); + const configuredWebSocketAddress = + pluginDetails && pluginDetails.options && pluginDetails.options && pluginDetails.options.webSocketAddress + ? pluginDetails.options.webSocketAddress + : undefined; + const host = configuredWebSocketAddress || `wss://${window.location.host}`; + + const ws = new WebSocket( + `${host}/api/plugins/resources/logs?cluster=${resource.cluster.title}${ + resource.namespace ? `&namespace=${resource.namespace.title}` : '' + }&name=${resource.name.title}&container=${container}&since=${since}&tail=${ + TERMINAL_OPTIONS.scrollback + }&previous=false&follow=true`, + ); + + term.reset(); + + term.onData((str) => { + if (str === '\r') { + term.write('\n\r'); + } else { + term.write(str); + } + }); + + ws.onmessage = (event): void => { + term.write(`${event.data}\n\r`); + }; + + terminalsContext.addTerminal({ + name: `${resource.name.title}: ${container}`, + terminal: term, + webSocket: ws, + }); + } catch (err) { + if (err.message) { + term.write(`${err.message}\n\r`); + terminalsContext.addTerminal({ + name: `${resource.name.title}: ${container}`, + terminal: term, + }); + } + } + }; const getLogs = async (): Promise => { setIsLoading(true); @@ -70,7 +130,7 @@ const Logs: React.FunctionComponent = ({ request, resource, show, se resource.namespace ? `&namespace=${resource.namespace.title}` : '' }&name=${resource.name.title}&container=${container}®ex=${encodeURIComponent(regex)}&since=${since}&tail=${ TERMINAL_OPTIONS.scrollback - }&previous=${previous}`, + }&previous=${previous}&follow=true`, { method: 'get' }, ); const json = await response.json(); @@ -110,7 +170,12 @@ const Logs: React.FunctionComponent = ({ request, resource, show, se isOpen={show} onClose={(): void => setShow(false)} actions={[ - ,