Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
- [#131](https://github.com/kobsio/kobs/pull/131): Add chart which shows the distribution of the logs lines in the selected time range for the ClickHouse plugin.
- [#132](https://github.com/kobsio/kobs/pull/132): Support the download of log lines in their JSON representation in the ClickHouse and Elasticsearch plugin.
- [#136](https://github.com/kobsio/kobs/pull/136): Allow custom order for the returned logs and add `!~` and `_exists_` operator for ClickHouse plugin.
- [#138](https://github.com/kobsio/kobs/pull/138): Add option to stream (follow) logs via WebSockets.

### Fixed

Expand Down
50 changes: 50 additions & 0 deletions pkg/api/clusters/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -245,6 +246,55 @@ func (c *Cluster) GetLogs(ctx context.Context, namespace, name, container, regex
return strings.Join(logs, "\n\r") + "\n\r", nil
}

// StreamLogs can be used to stream the logs of the selected Container. For that we are using the passed in WebSocket
// connection an write each line returned by the Kubernetes API to this connection.
func (c *Cluster) StreamLogs(ctx context.Context, conn *websocket.Conn, namespace, name, container string, since, tail int64, follow bool) error {
options := &corev1.PodLogOptions{
Container: container,
SinceSeconds: &since,
Follow: follow,
}

if tail > 0 {
options.TailLines = &tail
}

stream, err := c.clientset.CoreV1().Pods(namespace).GetLogs(name, options).Stream(ctx)
if err != nil {
return err
}

defer stream.Close()
reader := bufio.NewReaderSize(stream, 16)
lastLine := ""

for {
data, isPrefix, err := reader.ReadLine()
if err != nil {
return err
}

lines := strings.Split(string(data), "\r")
length := len(lines)

if len(lastLine) > 0 {
lines[0] = lastLine + lines[0]
lastLine = ""
}

if isPrefix {
lastLine = lines[length-1]
lines = lines[:(length - 1)]
}

for _, line := range lines {
if err := conn.WriteMessage(websocket.TextMessage, []byte(line)); err != nil {
return err
}
}
}
}

// GetTerminal starts a new terminal session via the given WebSocket connection.
func (c *Cluster) GetTerminal(conn *websocket.Conn, namespace, name, container, shell string) error {
reqURL, err := url.Parse(fmt.Sprintf("%s/api/v1/namespaces/%s/pods/%s/exec?container=%s&command=%s&stdin=true&stdout=true&stderr=true&tty=true", c.config.Host, namespace, name, container, shell))
Expand Down
51 changes: 50 additions & 1 deletion plugins/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
since := r.URL.Query().Get("since")
tail := r.URL.Query().Get("tail")
previous := r.URL.Query().Get("previous")
follow := r.URL.Query().Get("follow")

log.WithFields(logrus.Fields{"cluster": clusterName, "namespace": namespace, "name": name, "container": container, "regex": regex, "since": since, "previous": previous}).Tracef("getLogs")
log.WithFields(logrus.Fields{"cluster": clusterName, "namespace": namespace, "name": name, "container": container, "regex": regex, "since": since, "previous": previous, "follow": follow}).Tracef("getLogs")

cluster := router.clusters.GetCluster(clusterName)
if cluster == nil {
Expand All @@ -306,6 +307,54 @@ func (router *Router) getLogs(w http.ResponseWriter, r *http.Request) {
return
}

parsedFollow, err := strconv.ParseBool(follow)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadRequest, "Could not parse follow parameter")
return
}

// If the parsedFollow parameter was set to true, we stream the logs via an WebSocket connection instead of
// returning a json response.
if parsedFollow {
var upgrader = websocket.Upgrader{}

if router.config.WebSocket.AllowAllOrigins {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
}

c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.WithError(err).Errorf("Could not upgrade connection")
return
}
defer c.Close()

c.SetPongHandler(func(string) error { return nil })

go func() {
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := c.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}()

err = cluster.StreamLogs(r.Context(), c, namespace, name, container, parsedSince, parsedTail, parsedFollow)
if err != nil {
c.WriteMessage(websocket.TextMessage, []byte("Could not stream logs: "+err.Error()))
return
}

log.Tracef("Logs stream was closed")
return
}

logs, err := cluster.GetLogs(r.Context(), namespace, name, container, regex, parsedSince, parsedTail, parsedPrevious)
if err != nil {
errresponse.Render(w, r, err, http.StatusBadGateway, "Could not get logs")
Expand Down
82 changes: 79 additions & 3 deletions plugins/resources/src/components/panel/details/actions/Logs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ import { IRow } from '@patternfly/react-table';
import { V1Pod } from '@kubernetes/client-node';
import { Terminal as xTerm } from 'xterm';

import { IResource, ITerminalContext, TERMINAL_OPTIONS, TerminalsContext } from '@kobsio/plugin-core';
import {
IPluginsContext,
IResource,
ITerminalContext,
PluginsContext,
TERMINAL_OPTIONS,
TerminalsContext,
} from '@kobsio/plugin-core';

// getContainers returns a list with all container names for the given Pod. It contains all specified init containers
// and the "normal" containers.
Expand Down Expand Up @@ -53,12 +60,65 @@ interface ILogsProps {
const Logs: React.FunctionComponent<ILogsProps> = ({ request, resource, show, setShow }: ILogsProps) => {
const containers = getContainers(resource.props);

const pluginsContext = useContext<IPluginsContext>(PluginsContext);
const terminalsContext = useContext<ITerminalContext>(TerminalsContext);
const [isLoading, setIsLoading] = useState<boolean>(false);
const [container, setContainer] = useState<string>(containers[0]);
const [since, setSince] = useState<number>(900);
const [regex, setRegex] = useState<string>('');
const [previous, setPrevious] = useState<boolean>(false);
const [follow, setFollow] = useState<boolean>(false);

const streamLogs = async (): Promise<void> => {
setShow(false);

const term = new xTerm(TERMINAL_OPTIONS);

try {
const pluginDetails = pluginsContext.getPluginDetails('resources');
const configuredWebSocketAddress =
pluginDetails && pluginDetails.options && pluginDetails.options && pluginDetails.options.webSocketAddress
? pluginDetails.options.webSocketAddress
: undefined;
const host = configuredWebSocketAddress || `wss://${window.location.host}`;

const ws = new WebSocket(
`${host}/api/plugins/resources/logs?cluster=${resource.cluster.title}${
resource.namespace ? `&namespace=${resource.namespace.title}` : ''
}&name=${resource.name.title}&container=${container}&since=${since}&tail=${
TERMINAL_OPTIONS.scrollback
}&previous=false&follow=true`,
);

term.reset();

term.onData((str) => {
if (str === '\r') {
term.write('\n\r');
} else {
term.write(str);
}
});

ws.onmessage = (event): void => {
term.write(`${event.data}\n\r`);
};

terminalsContext.addTerminal({
name: `${resource.name.title}: ${container}`,
terminal: term,
webSocket: ws,
});
} catch (err) {
if (err.message) {
term.write(`${err.message}\n\r`);
terminalsContext.addTerminal({
name: `${resource.name.title}: ${container}`,
terminal: term,
});
}
}
};

const getLogs = async (): Promise<void> => {
setIsLoading(true);
Expand All @@ -70,7 +130,7 @@ const Logs: React.FunctionComponent<ILogsProps> = ({ request, resource, show, se
resource.namespace ? `&namespace=${resource.namespace.title}` : ''
}&name=${resource.name.title}&container=${container}&regex=${encodeURIComponent(regex)}&since=${since}&tail=${
TERMINAL_OPTIONS.scrollback
}&previous=${previous}`,
}&previous=${previous}&follow=true`,
{ method: 'get' },
);
const json = await response.json();
Expand Down Expand Up @@ -110,7 +170,12 @@ const Logs: React.FunctionComponent<ILogsProps> = ({ request, resource, show, se
isOpen={show}
onClose={(): void => setShow(false)}
actions={[
<Button key="getLogs" variant={ButtonVariant.primary} isLoading={isLoading} onClick={getLogs}>
<Button
key="getLogs"
variant={ButtonVariant.primary}
isLoading={isLoading}
onClick={(): Promise<void> => (follow ? streamLogs() : getLogs())}
>
Show Logs
</Button>,
<Button key="cancel" variant={ButtonVariant.link} onClick={(): void => setShow(false)}>
Expand Down Expand Up @@ -174,6 +239,17 @@ const Logs: React.FunctionComponent<ILogsProps> = ({ request, resource, show, se
name="logs-form-previous"
/>
</FormGroup>

<FormGroup label="Follow" fieldId="logs-form-follow">
<Checkbox
label="Follow"
isChecked={follow}
onChange={setFollow}
aria-label="Follow"
id="logs-form-follow"
name="logs-form-follow"
/>
</FormGroup>
</Form>
</Modal>
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,15 @@ const Terminal: React.FunctionComponent<ITerminalProps> = ({ request, resource,
};

terminalsContext.addTerminal({
name: `${resource.namespace.title}: ${container} (${shell})`,
name: `${resource.name.title}: ${container} (${shell})`,
terminal: term,
webSocket: ws,
});
} catch (err) {
if (err.message) {
term.write(`${err.message}\n\r`);
terminalsContext.addTerminal({
name: `${resource.namespace.title}: ${container} (${shell})`,
name: `${resource.name.title}: ${container} (${shell})`,
terminal: term,
});
}
Expand Down