Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-926: Added key to kafka message for connection tracking consistency #107

Merged

Conversation

OlivierCazade
Copy link
Contributor

@OlivierCazade OlivierCazade commented Apr 4, 2023

Add keys to each kafka message. Kafka then ensure that all messages with the same key will be consumed by the same client.

We use src and destination IP as key so all messages from a connection will be consumed by the same transformer so connection tracking metrics will be accurate.

@openshift-ci-robot
Copy link
Collaborator

openshift-ci-robot commented Apr 4, 2023

@OlivierCazade: This pull request references NETOBSERV-926 which is a valid jira issue.

In response to this:

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@OlivierCazade OlivierCazade force-pushed the kafka-connection-tracking branch 2 times, most recently from bfc7ccf to 915de15 Compare April 4, 2023 15:43
@codecov
Copy link

codecov bot commented Apr 4, 2023

Codecov Report

Merging #107 (b6ca6ad) into main (c62173a) will increase coverage by 0.20%.
The diff coverage is 90.00%.

@@            Coverage Diff             @@
##             main     #107      +/-   ##
==========================================
+ Coverage   41.59%   41.80%   +0.20%     
==========================================
  Files          30       30              
  Lines        2041     2050       +9     
==========================================
+ Hits          849      857       +8     
- Misses       1154     1155       +1     
  Partials       38       38              
Flag Coverage Δ
unittests 41.80% <90.00%> (+0.20%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pkg/exporter/kafka_proto.go 73.07% <90.00%> (+8.37%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@@ -28,6 +29,26 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
}
}

func getIPKey(ip [16]uint8) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we move this util to record.go where we keep other tools for conversion ?
also I didn't take a close look at net package to see if there is another ways to doing this but I assume u checked ? probably it can be called IPAddrToStriing() or something like that too ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that we are transforming from byte array to string and then back to byte array.

I reworked the PR to directly work with byte arrays.

Copy link
Contributor

@msherif1234 msherif1234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add to the PR the description to the issue and how this PR fix it too Thanks!!

@openshift-ci-robot
Copy link
Collaborator

openshift-ci-robot commented Apr 5, 2023

@OlivierCazade: This pull request references NETOBSERV-926 which is a valid jira issue.

In response to this:

Add keys to each kafka message. Kafka then ensure that all messages with the same key will be consumed by the same client.

We use src and destination IP as key so all messages from a connection will be consumed by the same transformer so connection tracking metrics will be accurate.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@OlivierCazade
Copy link
Contributor Author

pls add to the PR the description to the issue and how this PR fix it too Thanks!!

Done, thanks.

for k := range record.Id.SrcIp {
if record.Id.SrcIp[k] < record.Id.DstIp[k] {
return append(record.Id.SrcIp[:], record.Id.DstIp[:]...)
} else if record.Id.SrcIp[k] > record.Id.DstIp[k] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just check for > condition only since == and < both appends the same way so we can avoid if else ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if both bytes are equal we first want to iterate another time. But we return as soon as there is a difference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok Thanks for the explaining it

@OlivierCazade OlivierCazade requested a review from jotak April 6, 2023 07:42
@@ -28,6 +28,17 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
}
}

func getFlowKey(record *flow.Record) []byte {
for k := range record.Id.SrcIp {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumes number of bytes in src and dest are equal .. what if there's an ipv4 and an ipv6 ?

Copy link
Member

@jotak jotak Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe check len first to return the smallest first (or last) if they aren't equal size?
Also, can you add a comment about why we need to sort this? I fear if we look at the code later we might hav forgotten :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SrcIp and DstIp are not slice but fixed length array:

And more general question, is it possible to have a src ipv4 and a dst ipv6? I would have said no but you made me wonder

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is probably impossible to have a pkt header "flow" with src and dst from different address family.
/lgtm

@jotak
Copy link
Member

jotak commented Apr 6, 2023

/lgtm

@openshift-ci openshift-ci bot removed the lgtm label Apr 6, 2023
@openshift-ci openshift-ci bot added the lgtm label Apr 6, 2023
@OlivierCazade
Copy link
Contributor Author

/approve

@openshift-ci
Copy link

openshift-ci bot commented Apr 6, 2023

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: OlivierCazade

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@openshift-ci openshift-ci bot added the approved label Apr 6, 2023
@openshift-merge-robot openshift-merge-robot merged commit 2dca243 into netobserv:main Apr 6, 2023
9 checks passed
dushyantbehl pushed a commit to dushyantbehl/netobserv-ebpf-agent that referenced this pull request Apr 28, 2023
…sistency (netobserv#107)

* Added key to kafka message for connection tracking consistency

* Added comment about why we are sorting IP address for generating kafka key
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants