New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flow: SYN/FIN detection #382
Conversation
|
ok to test |
flow/flow.go
Outdated
| @@ -240,7 +243,7 @@ func networkID(p *gopacket.Packet) int64 { | |||
| return id | |||
| } | |||
|
|
|||
| // NewFlow creates a new empty flow | |||
| // NewFlow returns extended with memory-only attributes Flow instance | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"// NewFlow creates a new empty flow" is enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
etc/skydive.yml.default
Outdated
| @@ -104,6 +104,8 @@ agent: | |||
| # stats_update: 1 | |||
| metadata: | |||
| info: This is compute node | |||
| tcp_len_by_seq: true | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that your patch for tcp_len_by_seq, counting flow bytes based on the tcp len and avoid counting the retransmission. Why leaving the flow bytes as it's done today and add a TCPRetransmissionLen in TCPFlowMetric
as we can store these datas and see number of retransmitted packets and bytes in a separated TCP only statistics
I would rather prefer to have TCP stack counters like, overlap, retransmission that we could add in TCP Metric, and enable them by default. Basically add counter like wireshark TCP analysis.
Another interesting subject would be IP fragmentation
What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My patch doesn't deal so much with retransmissions. It refers more to the case when for example we capture only partial traffic (such as SYN/FIN) and then using sequence numbers we calculate traffic length.
Thing that you propose looks like good thing to have, but I see it as rather big and independent feature. So, I propose opening it as a new open issue and starting discussing it via our meetings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I understand your point
But if you don't take in account overlapping, retransmission, reordering packets, you flow tcp size metric would not be correct.
So let's do this patch step by step, and just focus on the syn/fin/rst flag and ttl only, this part sound good for me except few exception.
Could you remove the "unrelated" parts to tcp len calculation, I will prefer to see that in a separate patch
It's a pity that we have a big timezone difference, I will try catch you on IRC tonight (morning for you)
flow/flow.go
Outdated
| @@ -499,6 +545,167 @@ func (f *Flow) updateMetricsWithNetworkLayer(packet *gopacket.Packet) error { | |||
| return errors.New("Unable to decode the IP layer") | |||
| } | |||
|
|
|||
| func (f *Flow) updateMetricsSynCapture(packet *gopacket.Packet) error { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateTCPMetrics() name will fit nicer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow.go
Outdated
| } | ||
|
|
||
| if transportPacket.FIN { | ||
| logging.GetLogger().Notice("FIN", f.state.aBExpectedSeq, f.state.bAExpectedSeq, f.Metric.ABBytes, f.Metric.BABytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice is too verbose, you will see 1 log per tcp flow
As you don't dump the flowid I think you can remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow.go
Outdated
| @@ -528,6 +735,13 @@ func (f *Flow) newTransportLayer(packet *gopacket.Packet) error { | |||
| transportPacket, _ := transportLayer.(*layers.TCP) | |||
| f.Transport.A = strconv.Itoa(int(transportPacket.SrcPort)) | |||
| f.Transport.B = strconv.Itoa(int(transportPacket.DstPort)) | |||
| if config.GetConfig().GetBool("agent.capture_syn") { | |||
| f.TCPFlowMetric = &TCPMetric{ | |||
| ABSynStart: int64(0), | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need to initialize at 0, it's already the default in go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow.go
Outdated
| } | ||
|
|
||
| switch field { | ||
| case "ABSynData": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, u r right, I will remove it
flow/flow.go
Outdated
|
|
||
| length := uint32Distance(transportPacket.Seq, expectedSeq) | ||
|
|
||
| if transportPacket.Seq < TCPMaxSegmentSize { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to see a logic that track the TCP segment, that will address the TCP partial segment retransmission, etc ...
Add this sliding window logic tracking in flowState
With these information, we could count number of restransmitted and overlaped packets and bytes easily and correctly
In the current version you only retransmission when the restransmit packet if old by TCPMaxSegmentSize, that it different than TCP window size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like additional feature...
In this feature we don't detect retransmittings and with BPF it is even impossible to detect them this way, we just calculate length of data that passes in the session in a more exact way using sequence numbers, rather than just calculating passed bytes, because not always we look at all the packets that pass (only SYN/FIN/RST, if BPF is used).
Its my fault that I have added retransmit to the title, I have removed it.
I think that thing that you propose to calculate is more complex to do and I propose to open it as an issue (plan for the next feature).
flow/flow.go
Outdated
| logging.GetLogger().Notice("Overlap", transportPacket.Seq, expectedSeq) | ||
| } | ||
|
|
||
| if length > 0x80010000 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why 0x80010000 ? -TCPMaxSegmentSize ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wont pass compilation in go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the idea was have a signed distance
flow/flow.proto
Outdated
| @@ -131,7 +145,7 @@ message Flow { | |||
| string BNodeTID = 35; | |||
|
|
|||
| /* raw packets, will not be exported, see Makefile */ | |||
| repeated RawPacket LastRawPackets = 36; | |||
| repeated RawPacket LastRawPackets = 38; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't change the protobuf ID from other field
flow/flow_test.go
Outdated
| @@ -79,6 +79,12 @@ func TestFlowJSON(t *testing.T) { | |||
| }, | |||
| Start: 1111, | |||
| Last: 222, | |||
| LastUpdateMetric: &FlowMetric{ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't declare field that are 0 or "", golang guarantee that by design, just LastUpdateMetric: &FlowMetric{}
| @@ -79,6 +79,12 @@ func TestFlowJSON(t *testing.T) { | |||
| }, | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should add a test based based on simple pcap trace like tests : TestFlowSimpleIPv4 and TestFlowSimpleIPv6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/flow_test.go
Outdated
| @@ -314,6 +339,12 @@ func TestPCAP1(t *testing.T) { | |||
| BAPackets: 1, | |||
| BABytes: 42, | |||
| }, | |||
| LastUpdateMetric: &FlowMetric{ | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
| @@ -81,6 +81,54 @@ const flowMapping = ` | |||
| "type": "date", "format": "epoch_millis" | |||
| } | |||
| } | |||
| }, | |||
| { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just declare these with simple regex like match by modify the "start"
like this :
{
"start": {
"match": "*Start",
"mapping": {
"type": "date", "format": "epoch_millis"
}
}
},
flow/storage/orientdb/orientdb.go
Outdated
| "LastUpdateMetric": lastMetricDoc, | ||
| "Metric": metricDoc, | ||
| "TCPFlowMetric": tcpMetric, | ||
| "Start": flow.Start, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here "TCPMetrics don't have Start and Last"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/storage/orientdb/orientdb.go
Outdated
| class := orient.ClassDefinition{ | ||
| Name: "TCPMetric", | ||
| Properties: []orient.Property{ | ||
| {Name: "Start", Type: "LONG", Mandatory: true, NotNull: true}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here "TCPMetrics don't have Start and Last"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, uploading change
flow/flow.go
Outdated
| if f.state.lenBySeq { | ||
| f.updateMetricsBySeq(packet) | ||
| } else if updated := f.updateMetricsWithLinkLayer(packet, length); !updated { | ||
| logging.GetLogger().Error("Update metrics with network\n") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this log
etc/skydive.yml.default
Outdated
| @@ -104,6 +104,8 @@ agent: | |||
| # stats_update: 1 | |||
| metadata: | |||
| info: This is compute node | |||
| tcp_len_by_seq: true | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I understand your point
But if you don't take in account overlapping, retransmission, reordering packets, you flow tcp size metric would not be correct.
So let's do this patch step by step, and just focus on the syn/fin/rst flag and ttl only, this part sound good for me except few exception.
Could you remove the "unrelated" parts to tcp len calculation, I will prefer to see that in a separate patch
It's a pity that we have a big timezone difference, I will try catch you on IRC tonight (morning for you)
flow/flow.go
Outdated
| logging.GetLogger().Notice("Overlap", transportPacket.Seq, expectedSeq) | ||
| } | ||
|
|
||
| if length > 0x80010000 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the idea was have a signed distance
62ce494
to
78d5b53
Compare
flow/flow.go
Outdated
|
|
||
| } | ||
|
|
||
| var captureTime int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you already test metadata != nil a the begining already, reuse it and replace the whole section by 👍
captureTime := common.UnixMillis(metadata.CaptureInfo.Timestamp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow.go
Outdated
| func (f *Flow) updateTCPMetrics(packet *gopacket.Packet) error { | ||
| // capture content of SYN packets | ||
| //bypass if not TCP | ||
| if f.Transport == nil || f.Transport.Protocol != FlowProtocol_TCPPORT { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may add "|| f.Network == nil"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow.go
Outdated
| @@ -607,6 +698,7 @@ func PacketsFromGoPacket(packet *gopacket.Packet, outerLength int64, t int64, bp | |||
|
|
|||
| if len(flowPackets.Packets) > 0 { | |||
| p := gopacket.NewPacket(packetData[start:], topLayer.LayerType(), gopacket.NoCopy) | |||
| p.Metadata().CaptureInfo = packet_capture_info | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do it all at once here (and check metadata)
if metadata := (*packet).Metadata(); metadata != nil {
p.Metadata().CaptureInfo = metadata.CaptureInfo
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do it here because, p is a packet copy that is created and scoped only here and NewPacket is not our function (we cant change it) and it doesn't copy the metadata that we need (the timestemps), so we loose here the data. But you are right, there is some mess in the code, I will fix it and also add check for metadata.
flow/flow.go
Outdated
| @@ -770,6 +890,10 @@ func (f *Flow) GetFieldInt64(field string) (_ int64, err error) { | |||
| return f.Metric.GetFieldInt64(fields[1]) | |||
| case "LastUpdateMetric": | |||
| return f.LastUpdateMetric.GetFieldInt64(fields[1]) | |||
| case "TCPFlowMetric": | |||
| if f.TCPFlowMetric != nil { | |||
| return f.TCPFlowMetric.GetFieldInt64(fields[1]) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can call directly the return f.TCPFlowMetric.GetFieldInt64( .... as you do the test already line 794
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/flow.proto
Outdated
| @@ -80,6 +80,17 @@ message RawPacket { | |||
| bytes Data = 3; | |||
| } | |||
|
|
|||
| message TCPMetric { | |||
| int64 ABSynStart = 7; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not start protobuf ID at 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/storage/orientdb/orientdb.go
Outdated
| "RawPacketsCaptured": flow.RawPacketsCaptured, | ||
| tcpMetric := flowTCPMetricToDocument(flow, flow.TCPFlowMetric) | ||
| var flowDoc orient.Document | ||
| if tcpMetric != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't do a big if here, just do after flowDoc declaration
if tcpMetric != nil {
flowDoc["TCPFlowMetric"] = tcpMetric
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/storage/orientdb/orientdb.go
Outdated
| class := orient.ClassDefinition{ | ||
| Name: "TCPMetric", | ||
| Properties: []orient.Property{ | ||
| {Name: "ABSynStart", Type: "LONG", Mandatory: false, NotNull: false}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put the NotNull to true for all TCPMetric
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/flow_test.go
Outdated
| @@ -273,9 +285,8 @@ func validateAllParentChains(t *testing.T, table *Table) { | |||
|
|
|||
| func flowsFromPCAP(t *testing.T, filename string, linkType layers.LinkType, bpf *BPF) []*Flow { | |||
| table := NewTable(nil, nil, NewEnhancerPipeline(), "", TableOpts{}) | |||
|
|
|||
| table.SetSynCapture(true) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as the flag is global to the agent, and will be set only by the user, set the config flag directly here for the purpose of the test, so place this code in your unit test (TestFlowSimpleSynFin)
config.GetConfig().Set("agent.capture_syn", true)
defer config.GetConfig().Set("agent.capture_syn", false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
flow/table.go
Outdated
| @@ -349,6 +349,11 @@ func (ft *Table) flowPacketToFlow(packet *Packet, parentUUID string, t int64, L2 | |||
| return flow | |||
| } | |||
|
|
|||
| // Enable/Disable capturing SYN packets | |||
| func (ft *Table) SetSynCapture(enableSynCapture bool) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this class method, see comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I removed it.
The bug was that I used two different folders of config, now, everything works well.
Thanks!
flow/flow_test.go
Outdated
| if p.ErrorLayer() != nil { | ||
| t.Fatalf("Failed to decode packet with layer path '%s': %s\n", layerPathFromGoPacket(&p), p.ErrorLayer().Error()) | ||
| } | ||
| pcapPacketNB++ | ||
| if strings.Contains(layerPathFromGoPacket(&p), "DecodeFailure") { | ||
| t.Fatalf("GoPacket decode this pcap packet %d as DecodeFailure :\n%s", pcapPacketNB, p.Dump()) | ||
| } | ||
|
|
||
| config.GetConfig().Set("agent.capture_syn", true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why forcing the flag here, old debuging trace I assume ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right :)
8f8d3b7
to
4e01253
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except few typo/cosmetic glitch
flow/flow.proto
Outdated
| @@ -80,6 +80,17 @@ message RawPacket { | |||
| bytes Data = 3; | |||
| } | |||
|
|
|||
| message TCPMetric { | |||
| int64 ABSynStart = 1; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ident this block like others, prefix with 2 spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/flow.proto
Outdated
| @@ -106,6 +117,9 @@ message Flow { | |||
| /* Total amount of data for the whole flow duration */ | |||
| FlowMetric Metric = 32; | |||
|
|
|||
| /* Floe metric specific to the TCP Protocol only and optional */ | |||
| TCPMetric TCPFlowMetric = 38; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ID 33
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NodeTID is 33, I think it is impossible in the same struct to have two elements with the same ID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beforehand you asked not to change any existing IDs, and I tried to chenge ID to 33, as I thought it is impossible...
I also didnt want to move this filed down in the list of fileds, because it is its logical location...
flow/flow_test.go
Outdated
| func TestFlowSimpleSynFin(t *testing.T) { | ||
| flows := flowsFromPCAP(t, "pcaptraces/simple-tcpv4.pcap", layers.LinkTypeEthernet, nil) | ||
| // In test pcap SYNs happen at 2017-03-21 10:58:23.768977 +0200 IST | ||
| synTimestemp := int64(1490086703768) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Timestemp/Timestamp/g
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/metrics.go
Outdated
| // Copy extended metric | ||
| func (fm *TCPMetric) Copy() *TCPMetric { | ||
| return &TCPMetric{ | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/storage/orientdb/orientdb.go
Outdated
| @@ -67,11 +67,32 @@ func flowMetricToDocument(flow *flow.Flow, metric *flow.FlowMetric) orient.Docum | |||
| } | |||
| } | |||
|
|
|||
| func flowTCPMetricToDocument(flow *flow.Flow, tcp_metric *flow.TCPMetric) orient.Document { | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
flow/table.go
Outdated
| @@ -208,7 +208,6 @@ func (ft *Table) updateMetric(f *Flow, start, last int64) { | |||
| } else { | |||
| f.LastUpdateStart = f.Start | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't remove this line, cosmetic
Capturing of SYN/FIN/RST timestamps
Additional detailes that are captured:
1. Add support for elastic search scheme in date format for ABSynStart
2. Use accurate clock from the packet info (and not from flow-table)
3. Make extended capturing configurable via agent paramewqters in .yml
4. Change tests to reflect changes in the tables that we create
Related issue: skydive-project#323
Added fixes after first round of reviews