Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ztunnel logs with multiple nodes #7339

Merged
merged 7 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 128 additions & 113 deletions business/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func parseLogLine(line string, isProxy bool, engardeParser *parser.Parser) *LogE
return &entry
}

func parseZtunnelLine(line string) *LogEntry {
func parseZtunnelLine(line, name string) *LogEntry {
entry := LogEntry{
Message: "",
Timestamp: "",
Expand All @@ -587,16 +587,16 @@ func parseZtunnelLine(line string) *LogEntry {

msgSplit := strings.Split(line, "\t")

if len(msgSplit) < 4 {
if len(msgSplit) < 5 {
log.Debugf("Error splitting log line [%s]", line)
entry.Message = line
entry.Message = fmt.Sprintf("[%s] %s", name, line)
return &entry
}

entry.Message = msgSplit[4]
entry.Message = fmt.Sprintf("[%s] %s", name, msgSplit[4])
if entry.Message == "" {
log.Debugf("Skipping empty log line [%s]", line)
entry.Message = line
entry.Message = fmt.Sprintf("[%s] %s", name, line)
return &entry
}

Expand Down Expand Up @@ -2072,7 +2072,7 @@ func (in *WorkloadService) GetWorkloadAppName(ctx context.Context, cluster, name
// streamParsedLogs fetches logs from a container in a pod, parses and decorates each log line with some metadata (if possible) and
// sends the processed lines to the client in JSON format. Results are sent as processing is performed, so in case of any error when
// doing processing the JSON document will be truncated.
func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opts *LogOptions, w http.ResponseWriter) error {
func (in *WorkloadService) streamParsedLogs(cluster, namespace string, names []string, opts *LogOptions, w http.ResponseWriter) error {
userClient, ok := in.userClients[cluster]
if !ok {
return fmt.Errorf("user client for cluster [%s] not found", cluster)
Expand All @@ -2088,130 +2088,143 @@ func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opt
// discard the logs after sinceTime+duration
isBounded := opts.Duration != nil

logsReader, err := userClient.StreamPodLogs(namespace, name, &k8sOpts)
if err != nil {
return err
}

defer func() {
e := logsReader.Close()
if e != nil {
log.Errorf("Error when closing the connection streaming logs of a pod: %s", e.Error())
firstEntry := true
firstWritter := true
for i, name := range names {
logsReader, err := userClient.StreamPodLogs(namespace, name, &k8sOpts)
if err != nil {
return err
}
}()

bufferedReader := bufio.NewReader(logsReader)

var startTime *time.Time
var endTime *time.Time
if k8sOpts.SinceTime != nil {
startTime = &k8sOpts.SinceTime.Time
if isBounded {
end := startTime.Add(*opts.Duration)
endTime = &end
}
}
defer func() {
e := logsReader.Close()
if e != nil {
log.Errorf("Error when closing the connection streaming logs of a pod: %s", e.Error())
}
}()

// To avoid high memory usage, the JSON will be written
// to the HTTP Response as it's received from the cluster API.
// That is, each log line is parsed, decorated with Kiali's metadata,
// marshalled to JSON and immediately written to the HTTP Response.
// This means that it is needed to push HTTP headers and start writing
// the response body right now and any errors at the middle of the log
// processing can no longer be informed to the client. So, starting
// these lines, the best we can do if some error happens is to simply
// log the error and stop/truncate the response, which will have the
// effect of sending an incomplete JSON document that the browser will fail
// to parse. Hopefully, the client/UI can catch the parsing error and
// properly show an error message about the failure retrieving logs.
w.Header().Set("Content-Type", "application/json")
_, writeErr := w.Write([]byte("{\"entries\":[")) // This starts the JSON document
if writeErr != nil {
return writeErr
}
bufferedReader := bufio.NewReader(logsReader)

firstEntry := true
line, readErr := bufferedReader.ReadString('\n')
linesWritten := 0
for ; readErr == nil || (readErr == io.EOF && len(line) > 0); line, readErr = bufferedReader.ReadString('\n') {
// Abort if we already reached the requested max-lines limit
if opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
break
var startTime *time.Time
var endTime *time.Time
if k8sOpts.SinceTime != nil {
startTime = &k8sOpts.SinceTime.Time
if isBounded {
end := startTime.Add(*opts.Duration)
endTime = &end
}
}

var entry *LogEntry
if opts.LogType == models.LogTypeZtunnel {
entry = parseZtunnelLine(line)
} else {
entry = parseLogLine(line, opts.LogType == models.LogTypeProxy, engardeParser)
var writeErr error

if firstWritter {
// To avoid high memory usage, the JSON will be written
// to the HTTP Response as it's received from the cluster API.
// That is, each log line is parsed, decorated with Kiali's metadata,
// marshalled to JSON and immediately written to the HTTP Response.
// This means that it is needed to push HTTP headers and start writing
// the response body right now and any errors at the middle of the log
// processing can no longer be informed to the client. So, starting
// these lines, the best we can do if some error happens is to simply
// log the error and stop/truncate the response, which will have the
// effect of sending an incomplete JSON document that the browser will fail
// to parse. Hopefully, the client/UI can catch the parsing error and
// properly show an error message about the failure retrieving logs.
w.Header().Set("Content-Type", "application/json")
_, writeErr = w.Write([]byte("{\"entries\":[")) // This starts the JSON document
if writeErr != nil {
return writeErr
}
firstWritter = false
}

if entry == nil {
continue
}
line, readErr := bufferedReader.ReadString('\n')
linesWritten := 0
for ; readErr == nil || (readErr == io.EOF && len(line) > 0); line, readErr = bufferedReader.ReadString('\n') {
// Abort if we already reached the requested max-lines limit
if opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
break
}

if opts.LogType == models.LogTypeZtunnel && !filterMatches(entry.Message, opts.filter) {
continue
}
var entry *LogEntry
if opts.LogType == models.LogTypeZtunnel {
entry = parseZtunnelLine(line, name)
} else {
entry = parseLogLine(line, opts.LogType == models.LogTypeProxy, engardeParser)
}

// If we are past the requested time window then stop processing
if startTime == nil {
startTime = &entry.OriginalTime
}
if entry == nil {
continue
}

if isBounded {
if endTime == nil {
end := entry.OriginalTime.Add(*opts.Duration)
endTime = &end
if opts.LogType == models.LogTypeZtunnel && !filterMatches(entry.Message, opts.filter) {
continue
}

if entry.OriginalTime.After(*endTime) {
break
// If we are past the requested time window then stop processing
if startTime == nil {
startTime = &entry.OriginalTime
}
}

// Send to client the processed log line
if isBounded {
if endTime == nil {
end := entry.OriginalTime.Add(*opts.Duration)
endTime = &end
}

response, err := json.Marshal(entry)
if err != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when marshalling JSON while streaming pod logs: %s", err.Error())
return nil
}
if entry.OriginalTime.After(*endTime) {
break
}
}

if firstEntry {
firstEntry = false
} else {
_, writeErr = w.Write([]byte{','})
if writeErr != nil {
// Send to client the processed log line

response, err := json.Marshal(entry)
if err != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when writing log entries separator: %s", writeErr.Error())
log.Errorf("Error when marshalling JSON while streaming pod logs: %s", err.Error())
return nil
}
}

_, writeErr = w.Write(response)
if writeErr != nil {
log.Errorf("Error when writing a processed log entry while streaming pod logs: %s", writeErr.Error())
return nil
}
if firstEntry {
firstEntry = false
} else {
_, writeErr = w.Write([]byte{','})
if writeErr != nil {
// Remember that since the HTTP Response body is already being sent,
// it is not possible to change the response code. So, log the error
// and terminate early the response.
log.Errorf("Error when writing log entries separator: %s", writeErr.Error())
return nil
}
}

linesWritten++
}
_, writeErr = w.Write(response)
if writeErr != nil {
log.Errorf("Error when writing a processed log entry while streaming pod logs: %s", writeErr.Error())
return nil
}

if readErr == nil && opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
// End the JSON document, setting the max-lines truncated flag
_, writeErr = w.Write([]byte("], \"linesTruncated\": true}"))
} else {
// End the JSON document
_, writeErr = w.Write([]byte("]}"))
}
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
linesWritten++
}
if readErr == nil && opts.MaxLines != nil && linesWritten >= *opts.MaxLines {
// End the JSON document, setting the max-lines truncated flag
_, writeErr = w.Write([]byte("], \"linesTruncated\": true}"))
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
}
break
} else {
if i == len(names)-1 {
// End the JSON document
_, writeErr = w.Write([]byte("]}"))
if writeErr != nil {
log.Errorf("Error when writing the outro of the JSON document while streaming pod logs: %s", err.Error())
}
}
}
}

return nil
Expand All @@ -2220,26 +2233,28 @@ func (in *WorkloadService) streamParsedLogs(cluster, namespace, name string, opt
// StreamPodLogs streams pod logs to an HTTP Response given the provided options
func (in *WorkloadService) StreamPodLogs(cluster, namespace, name string, opts *LogOptions, w http.ResponseWriter) error {

names := []string{}
if opts.LogType == models.LogTypeZtunnel {
// First, get ztunnel namespace and containers
pods := in.cache.GetZtunnelPods(cluster)
// This is needed for the K8S client
opts.PodLogOptions.Container = models.IstioProxy
// The ztunnel line should include the pod and the namespace
fs := filterOpts{
destWk: fmt.Sprintf("dst.workload=\"%s\"", name),
destNs: fmt.Sprintf("dst.namespace=\"%s\"", namespace),
srcWk: fmt.Sprintf("src.workload=\"%s\"", name),
srcNs: fmt.Sprintf("src.namespace=\"%s\"", namespace),
destWk: fmt.Sprintf("dst.workload=%s", name),
destNs: fmt.Sprintf("dst.namespace=%s", namespace),
srcWk: fmt.Sprintf("src.workload=%s", name),
srcNs: fmt.Sprintf("src.namespace=%s", namespace),
}
opts.filter = fs
var streamErr error
for _, pod := range pods {
streamErr = in.streamParsedLogs(cluster, pod.Namespace, pod.Name, opts, w)
names = append(names, pod.Name)
}
return streamErr
// They should be all in the same ns
return in.streamParsedLogs(cluster, pods[0].Namespace, names, opts, w)
}
return in.streamParsedLogs(cluster, namespace, name, opts, w)
names = append(names, name)
return in.streamParsedLogs(cluster, namespace, names, opts, w)
}

// AND filter
Expand Down
2 changes: 1 addition & 1 deletion business/workloads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func TestGetZtunnelPodLogsProxy(t *testing.T) {
require.Equal(1, len(podLogs.Entries))
entry := podLogs.Entries[0]

assert.Equal("src.addr=10.244.0.16:51748 src.workload=\"productpage-v1-87d54dd59-fzflt\" src.namespace=\"bookinfo\" src.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-productpage\" dst.addr=10.244.0.11:15008 dst.service=\"details.bookinfo.svc.cluster.local\" dst.workload=\"details-v1-cf74bb974-wg44w\" dst.namespace=\"bookinfo\" dst.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-details\" direction=\"outbound\" bytes_sent=200 bytes_recv=358 duration=\"1ms\"\n", entry.Message)
assert.Equal("[ztunnel] src.addr=10.244.0.16:51748 src.workload=productpage-v1-87d54dd59-fzflt src.namespace=bookinfo src.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-productpage\" dst.addr=10.244.0.11:15008 dst.service=details.bookinfo.svc.cluster.local dst.workload=details-v1-cf74bb974-wg44w dst.namespace=bookinfo dst.identity=\"spiffe://cluster.local/ns/bookinfo/sa/bookinfo-details\" direction=\"outbound\" bytes_sent=200 bytes_recv=358 duration=\"1ms\"\n", entry.Message)
assert.Equal("2024-04-12 10:31:51.078", entry.Timestamp)
assert.NotNil(entry.AccessLog)
assert.Equal("358", entry.AccessLog.BytesReceived)
Expand Down
Loading