Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
20 changes: 15 additions & 5 deletions cmd/flow_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var (
func runFlowCapture(_ *cobra.Command, _ []string) {
go scanner()

captureType = "Flow"
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i := range ports {
Expand Down Expand Up @@ -105,12 +106,14 @@ func runFlowCaptureOnAddr(port int, filename string) {
if stopReceived {
return
}
// parse and display flow async
go parseGenericMapAndDisplay(fp.GenericMap.Value)

// Write flows to sqlite DB
err = queryFlowDB(fp.GenericMap.Value, db)
if err != nil {
log.Error("Error while writing to DB:", err.Error())
}
go manageFlowsDisplay(fp.GenericMap.Value)
// append new line between each record to read file easilly
bytes, err := f.Write(append(fp.GenericMap.Value, []byte(",\n")...))
if err != nil {
Expand All @@ -134,20 +137,27 @@ func runFlowCaptureOnAddr(port int, filename string) {
}
}

func manageFlowsDisplay(line []byte) {
func parseGenericMapAndDisplay(bytes []byte) {
genericMap := config.GenericMap{}
err := json.Unmarshal(line, &genericMap)
err := json.Unmarshal(bytes, &genericMap)
if err != nil {
log.Error("Error while parsing json", err)
return
}

manageFlowsDisplay(genericMap)
}

func manageFlowsDisplay(genericMap config.GenericMap) {
// lock since we are updating lastFlows concurrently
mutex.Lock()

lastFlows = append(lastFlows, genericMap)
sort.Slice(lastFlows, func(i, j int) bool {
return lastFlows[i]["TimeFlowEndMs"].(float64) < lastFlows[j]["TimeFlowEndMs"].(float64)
if captureType == "Flow" {
return toFloat64(lastFlows[i], "TimeFlowEndMs") < toFloat64(lastFlows[j], "TimeFlowEndMs")
}
return toFloat64(lastFlows[i], "Time") < toFloat64(lastFlows[j], "Time")
})
if len(regexes) > 0 {
// regexes may change during the render so we make a copy first
Expand Down Expand Up @@ -208,7 +218,7 @@ func updateTable() {

duration := now.Sub(startupTime)
if outputBuffer == nil {
fmt.Print("Running network-observability-cli as Flow Capture\n")
fmt.Printf("Running network-observability-cli as %s Capture\n", captureType)
fmt.Printf("Log level: %s ", logLevel)
fmt.Printf("Duration: %s ", duration.Round(time.Second))
fmt.Printf("Capture size: %s\n", sizestr.ToString(totalBytes))
Expand Down
8 changes: 4 additions & 4 deletions cmd/flow_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestFlowTableRefreshDelay(t *testing.T) {
buf := bytes.Buffer{}
setOutputBuffer(&buf)

manageFlowsDisplay([]byte(`{"TimeFlowEndMs": 1709741962017}`))
parseGenericMapAndDisplay([]byte(`{"TimeFlowEndMs": 1709741962017}`))

out := buf.String()
assert.Empty(t, out)
Expand All @@ -33,7 +33,7 @@ func TestFlowTableDefaultDisplay(t *testing.T) {
// add 1s to current time to avoid maxRefreshRate limit
tickTime()

manageFlowsDisplay([]byte(sampleFlow))
parseGenericMapAndDisplay([]byte(sampleFlow))

// get table output as string
rows := strings.Split(buf.String(), "\n")
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestFlowTableMultipleFlows(t *testing.T) {
bytes = bytes + 1000

// add flow to table
manageFlowsDisplay([]byte(fmt.Sprintf(`{
parseGenericMapAndDisplay([]byte(fmt.Sprintf(`{
"AgentIP":"10.0.1.1",
"Bytes":%d,
"DstAddr":"10.0.0.6",
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestFlowTableAdvancedDisplay(t *testing.T) {

// add one second to time and draw table
tickTime()
manageFlowsDisplay([]byte(sampleFlow))
parseGenericMapAndDisplay([]byte(sampleFlow))

// get table output per rows
return strings.Split(buf.String(), "\n")
Expand Down
20 changes: 18 additions & 2 deletions cmd/map_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,20 @@ func toText(genericMap config.GenericMap, fieldName string) interface{} {
return emptyText
}

func toFloat64(genericMap config.GenericMap, fieldName string) float64 {
v, ok := genericMap[fieldName]
if ok {
return v.(float64)
}
return 0
}

func toTimeString(genericMap config.GenericMap, fieldName string) string {
return time.UnixMilli(int64(genericMap[fieldName].(float64))).Format("15:04:05.000000")
v, ok := genericMap[fieldName]
if ok {
return time.UnixMilli(int64(v.(float64))).Format("15:04:05.000000")
}
return emptyText
}

func ToTableRow(genericMap config.GenericMap, cols []string) []interface{} {
Expand All @@ -411,7 +423,11 @@ func ToTableRow(genericMap config.GenericMap, cols []string) []interface{} {
// convert field name / value accordingly
switch col {
case "Time":
row = append(row, toTimeString(genericMap, "TimeFlowEndMs"))
if captureType == "Flow" {
row = append(row, toTimeString(genericMap, "TimeFlowEndMs"))
} else {
row = append(row, toTimeString(genericMap, "Time"))
}
case "SrcZone":
row = append(row, toText(genericMap, "SrcK8S_Zone"))
case "DstZone":
Expand Down
186 changes: 79 additions & 107 deletions cmd/packet_capture.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package cmd

import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/eiannone/keyboard"
"github.com/fatih/color"
"github.com/google/gopacket/layers"
"github.com/jpillora/sizestr"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/rodaine/table"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
"github.com/ryankurte/go-pcapng"
"github.com/ryankurte/go-pcapng/types"
"github.com/spf13/cobra"
)

Expand All @@ -32,14 +33,10 @@ type PcapResult struct {
ByteCount int64
}

var packets = []PcapResult{}

// Setting Snapshot length to 0 sets it to maximum packet size
var snapshotlen uint32

func runPacketCapture(_ *cobra.Command, _ []string) {
go packetCaptureScanner()
go scanner()

captureType = "Packet"
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i := range ports {
Expand Down Expand Up @@ -67,19 +64,23 @@ func runPacketCaptureOnAddr(port int, filename string) {
log.Errorf("Create directory failed: %v", err.Error())
log.Fatal(err)
}
f, err = os.Create("./output/pcap/" + filename + ".pcap")
pw, err := pcapng.NewFileWriter("./output/pcap/" + filename + ".pcapng")
if err != nil {
log.Errorf("Create file %s failed: %v", filename, err.Error())
log.Fatal(err)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
so := types.SectionHeaderOptions{
Comment: filename,
Application: "netobserv-cli",
}
err = pw.WriteSectionHeader(so)
if err != nil {
log.Fatal(err)
}
defer f.Close()

flowPackets := make(chan *pbpacket.Packet, 100)
flowPackets := make(chan *genericmap.Flow, 100)
collector, err := grpc.StartCollector(port, flowPackets)
if err != nil {
log.Error("StartCollector failed:", err.Error())
Expand All @@ -94,16 +95,46 @@ func runPacketCaptureOnAddr(port int, filename string) {
if stopReceived {
return
}

go managePacketsDisplay(PcapResult{Name: filename, ByteCount: int64(len(fp.Pcap.Value)), PacketCount: 1})
// append new line between each record to read file easilly
bytes, err := f.Write(fp.Pcap.Value)
genericMap := config.GenericMap{}
err := json.Unmarshal(fp.GenericMap.Value, &genericMap)
if err != nil {
log.Fatal(err)
log.Error("Error while parsing json", err)
return
}

data, ok := genericMap["Data"]
if ok {
// clear generic map data
delete(genericMap, "Data")

// display as flow async
go manageFlowsDisplay(genericMap)

// Get capture timestamp
ts := time.Unix(int64(genericMap["Time"].(float64)), 0)

// Decode b64 encoded data
b, err := base64.StdEncoding.DecodeString(data.(string))
if err != nil {
log.Error("Error while decoding data", err)
return
}

// write enriched data as interface
writeEnrichedData(pw, &genericMap)

// then append packet to file
err = pw.WriteEnhancedPacketBlock(0, ts, b, types.EnhancedPacketOptions{})
if err != nil {
log.Fatal(err)
}
} else {
// display as flow async
go manageFlowsDisplay(genericMap)
}

// terminate capture if max bytes reached
totalBytes = totalBytes + int64(bytes)
totalBytes = totalBytes + int64(len(fp.GenericMap.Value))
if totalBytes > maxBytes {
log.Infof("Capture reached %s, exiting now...", sizestr.ToString(maxBytes))
return
Expand All @@ -119,94 +150,35 @@ func runPacketCaptureOnAddr(port int, filename string) {
}
}

func managePacketsDisplay(result PcapResult) {
// lock since we are updating results concurrently
mutex.Lock()

// find result in array
found := false
for i, r := range packets {
if r.Name == result.Name {
found = true
// update existing result
packets[i].PacketCount += result.PacketCount
packets[i].ByteCount += result.ByteCount
break
func writeEnrichedData(pw *pcapng.FileWriter, genericMap *config.GenericMap) {
var io types.InterfaceOptions
srcType := toText(*genericMap, "SrcK8S_Type").(string)
if srcType != emptyText {
io = types.InterfaceOptions{
Name: fmt.Sprintf(
"%s: %s -> %s: %s",
srcType,
toText(*genericMap, "SrcK8S_Name"),
toText(*genericMap, "DstK8S_Type"),
toText(*genericMap, "DstK8S_Name")),
Description: fmt.Sprintf(
"%s: %s Namespace: %s -> %s: %s Namespace: %s",
toText(*genericMap, "SrcK8S_OwnerType"),
toText(*genericMap, "SrcK8S_OwnerName"),
toText(*genericMap, "SrcK8S_Namespace"),
toText(*genericMap, "DstK8S_OwnerType"),
toText(*genericMap, "DstK8S_OwnerName"),
toText(*genericMap, "DstK8S_Namespace"),
),
}
}
if !found {
packets = append(packets, result)
}

// don't refresh terminal too often to avoid blinking
now := currentTime()
if int(now.Sub(lastRefresh)) > int(maxRefreshRate) {
lastRefresh = now
resetTerminal()

duration := now.Sub(startupTime)
if outputBuffer == nil {
fmt.Print("Running network-observability-cli as Packet Capture\n")
fmt.Printf("Log level: %s ", logLevel)
fmt.Printf("Duration: %s ", duration.Round(time.Second))
fmt.Printf("Capture size: %s\n", sizestr.ToString(totalBytes))
if len(strings.TrimSpace(filter)) > 0 {
fmt.Printf("Filters: %s\n", filter)
}
}

// recreate table from scratch
headerFmt := color.New(color.BgHiBlue, color.Bold).SprintfFunc()
columnFmt := color.New(color.FgHiYellow).SprintfFunc()
tbl := table.New(
"Name",
"Packets",
"Bytes",
)
if outputBuffer != nil {
tbl.WithWriter(outputBuffer)
}
tbl.WithHeaderFormatter(headerFmt).WithFirstColumnFormatter(columnFmt).WithPadding(10)

for _, result := range packets {
tbl.AddRow(
result.Name,
result.PacketCount,
sizestr.ToString(result.ByteCount),
)
} else {
io.Name = "Unknown resource"
io = types.InterfaceOptions{
Name: "Unknown kubernetes resource",
}

// print table
tbl.Print()
}

if len(keyboardError) > 0 {
fmt.Println(keyboardError)
}

// unlock
mutex.Unlock()
}

func packetCaptureScanner() {
if err := keyboard.Open(); err != nil {
keyboardError = fmt.Sprintf("Keyboard not supported %v", err)
return
}
defer func() {
_ = keyboard.Close()
}()

for {
_, key, err := keyboard.GetKey()
if err != nil {
panic(err)
}
if key == keyboard.KeyCtrlC || stopReceived {
log.Info("Ctrl-C pressed, exiting program.")

// exit program
os.Exit(0)
}
err := pw.WriteInterfaceDescription(uint16(layers.LinkTypeEthernet), io)
if err != nil {
log.Fatal(err)
}
}
Loading