Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leaks memory #29

Closed
r58Playz opened this issue Mar 27, 2024 · 11 comments · Fixed by #33
Closed

Leaks memory #29

r58Playz opened this issue Mar 27, 2024 · 11 comments · Fixed by #33

Comments

@r58Playz
Copy link
Contributor

r58Playz commented Mar 27, 2024

Discovered this issue while working on whisper, a program that uses this crate.

The demo leaks memory, as seen in this heaptrack output (I restarted the nc command a couple times as occasionally it would stop transferring data, don't know if that's another issue):
heaptrack

It seems to be in ipstack::stream::tcb::Tcb::add_inflight_packet:
heaptrack-calltree

To reproduce:

  • Run the tun2 example (had to remove the required-features = ["log"] in the Cargo.toml to get it to compile)
  • Assign it an IP (I used sudo ip route add 10.0.10.0/32 dev tun0)
  • Run cat /dev/zero | nc 10.0.10.0 2048 > /dev/null to send some traffic through the TUN device
@SajjadPourali
Copy link
Collaborator

SajjadPourali commented Mar 27, 2024

I acknowledge this existing issue and thank you for the report. I believe that the 'change_last_ack' function is also responsible for this issue, where 'ipstack' checks the received acknowledgments and must deallocate the memory.

@SajjadPourali
Copy link
Collaborator

SajjadPourali commented Mar 27, 2024

I currently don't have access to a profiler. Could you please try this patch?

diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..a191a05 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -178,13 +178,17 @@ impl Tcb {
         let distance = ack.wrapping_sub(self.last_ack);

         if matches!(self.state, TcpState::Established) {
-            if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
-                let mut inflight_packet = self.inflight_packets.remove(i);
-                let distance = ack.wrapping_sub(inflight_packet.seq);
-                if (distance as usize) < inflight_packet.payload.len() {
-                    inflight_packet.payload.drain(0..distance as usize);
-                    inflight_packet.seq = ack;
-                    self.inflight_packets.push(inflight_packet);
+            loop {
+                if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
+                    let mut inflight_packet = self.inflight_packets.remove(i);
+                    let distance = ack.wrapping_sub(inflight_packet.seq);
+                    if (distance as usize) < inflight_packet.payload.len() {
+                        inflight_packet.payload.drain(0..distance as usize);
+                        inflight_packet.seq = ack;
+                        self.inflight_packets.push(inflight_packet);
+                    }
+                } else {
+                    break;
                 }
             }
         }

@r58Playz
Copy link
Contributor Author

Still seems to be leaking at the same place.

@SajjadPourali
Copy link
Collaborator

Unable to reproduce.
Screenshot 2024-03-27 at 8 38 37 PM

Could you please try with https://github.com/narrowlink/ipstack/tree/ManualShutdownIssue and let me know if you can reproduce it?

@r58Playz
Copy link
Contributor Author

I've applied the patch and am on the ManualShutdownIssue branch on Linux:

$ git diff
diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..a191a05 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -178,13 +178,17 @@ impl Tcb {
         let distance = ack.wrapping_sub(self.last_ack);
 
         if matches!(self.state, TcpState::Established) {
-            if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
-                let mut inflight_packet = self.inflight_packets.remove(i);
-                let distance = ack.wrapping_sub(inflight_packet.seq);
-                if (distance as usize) < inflight_packet.payload.len() {
-                    inflight_packet.payload.drain(0..distance as usize);
-                    inflight_packet.seq = ack;
-                    self.inflight_packets.push(inflight_packet);
+            loop {
+                if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
+                    let mut inflight_packet = self.inflight_packets.remove(i);
+                    let distance = ack.wrapping_sub(inflight_packet.seq);
+                    if (distance as usize) < inflight_packet.payload.len() {
+                        inflight_packet.payload.drain(0..distance as usize);
+                        inflight_packet.seq = ack;
+                        self.inflight_packets.push(inflight_packet);
+                    }
+                } else {
+                    break;
                 }
             }
         }
$ git branch
* ManualShutdownIssue
  main
$ cargo b --example tun2
$ sudo heaptrack ./target/debug/examples/tun2 --server-addr 127.0.0.1:8080
starting application, this might take some time...
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
unknown transport - Ip Protocol 2 (IGMP - Internet Group Management)
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
==== New UDP connection ====
==== New UDP connection ====
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
==== end UDP connection ====
==== end UDP connection ====
unknown transport - Ip Protocol 2 (IGMP - Internet Group Management)
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
==== New UDP connection ====
==== end UDP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
==== New TCP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
==== New TCP connection ====
====== end tcp connection ======
==== New TCP connection ====
==== New TCP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
Terminated
heaptrack stats:
        allocations:            1222232
        leaked allocations:     2286
        temporary allocations:  439457
Heaptrack finished!

Here's the dump file:
heaptrack.tun2.1097005.zip

@r58Playz
Copy link
Contributor Author

Re-ran with tun_wintun, making sure to pull and rebuild:

$ git pull
Already up to date.
$ git diff
diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..a191a05 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -178,13 +178,17 @@ impl Tcb {
         let distance = ack.wrapping_sub(self.last_ack);
 
         if matches!(self.state, TcpState::Established) {
-            if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
-                let mut inflight_packet = self.inflight_packets.remove(i);
-                let distance = ack.wrapping_sub(inflight_packet.seq);
-                if (distance as usize) < inflight_packet.payload.len() {
-                    inflight_packet.payload.drain(0..distance as usize);
-                    inflight_packet.seq = ack;
-                    self.inflight_packets.push(inflight_packet);
+            loop {
+                if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
+                    let mut inflight_packet = self.inflight_packets.remove(i);
+                    let distance = ack.wrapping_sub(inflight_packet.seq);
+                    if (distance as usize) < inflight_packet.payload.len() {
+                        inflight_packet.payload.drain(0..distance as usize);
+                        inflight_packet.seq = ack;
+                        self.inflight_packets.push(inflight_packet);
+                    }
+                } else {
+                    break;
                 }
             }
         }
$ git status
On branch main
Your branch is up to date with 'origin/main'.

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
        modified:   src/stream/tcb.rs

no changes added to commit (use "git add" and/or "git commit -a")
$ cargo b --example tun_wintun
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.02s
$ sudo heaptrack ./target/debug/examples/tun_wintun --server-addr 127.0.0.1:8080
heaptrack output will be written to "/home/r58playz/Documents/ipstack/heaptrack.tun_wintun.43488.zst"
starting application, this might take some time...
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
unknown transport - Ip Protocol 2 (IGMP - Internet Group Management)
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
==== New UDP connection ====
==== New UDP connection ====
unknown transport - Ip Protocol 2 (IGMP - Internet Group Management)
unknown transport - Ip Protocol 0 (HOPOPT - IPv6 Hop-by-Hop Option)
==== New UDP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
==== New TCP connection ====
==== New UDP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
==== New TCP connection ====
==== New TCP connection ====
unknown transport - Ip Protocol 58 (IPv6-ICMP - ICMP for IPv6)
==== New TCP connection ====
==== New TCP connection ====
==== end UDP connection ====
==== end UDP connection ====
==== New TCP connection ====
==== end UDP connection ====
==== New TCP connection ====
==== end UDP connection ====
Terminated
heaptrack stats:
        allocations:            2600923
        leaked allocations:     4454
        temporary allocations:  358344
Heaptrack finished!

Dump file: heaptrack.tun_wintun.43488.zip

@r58Playz
Copy link
Contributor Author

r58Playz commented Mar 29, 2024

The underlying issue for me seems to be that sometimes change_last_ack is called before the packet gets pushed into self.inflight_packets, which means it gets left inside the Vec.

Here's some debug logs I added to illustrate this issue:

acked - ack: 30615 seqs: []
acked - ack: 40364 seqs: ["30615..38807"]
acked - ack: 47319 seqs: ["30615..38807"]
acked - ack: 63703 seqs: ["47319..55511", "30615..38807"]
acked - ack: 72151 seqs: ["63703..65532", "47319..55511", "30615..38807"]
acked - ack: 80343 seqs: ["80343..88535", "63703..65532", "47319..55511", "30615..38807"]
acked - ack: 88535 seqs: ["63703..65532", "47319..55511", "30615..38807"]
acked - ack: 88845 seqs: ["63703..65532", "47319..55511", "30615..38807"]
acked - ack: 97037 seqs: ["97037..97143", "63703..65532", "47319..55511", "30615..38807"]
acked - ack: 97143 seqs: ["63703..65532", "47319..55511", "30615..38807"]
acked - ack: 105131 seqs: ["63703..65532", "47319..55511", "30615..38807"]

I fixed this with this patch:

diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..7d7b94f 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -186,6 +186,9 @@ impl Tcb {
                     inflight_packet.seq = ack;
                     self.inflight_packets.push(inflight_packet);
                 }
+                if i != 0 {
+                    self.inflight_packets.drain(0..i);
+                }
             }
         }
 

I don't know if this is the best solution however.

@SajjadPourali
Copy link
Collaborator

SajjadPourali commented Mar 29, 2024

Thanks!
The code you created may cause some issues since the inflight_packets aren't always ordered, especially when we push them back into the buffer/vector.

diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..ac90820 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -176,6 +176,7 @@ impl Tcb {
     }
     pub(super) fn change_last_ack(&mut self, ack: u32) {
         let distance = ack.wrapping_sub(self.last_ack);
+        self.last_ack = self.last_ack.wrapping_add(distance);

         if matches!(self.state, TcpState::Established) {
             if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
@@ -187,9 +188,11 @@ impl Tcb {
                     self.inflight_packets.push(inflight_packet);
                 }
             }
+            self.inflight_packets.retain(|p| {
+                let last_byte = p.seq.wrapping_add(p.payload.len() as u32);
+                last_byte.wrapping_sub(self.last_ack) > 0 && self.seq.wrapping_sub(last_byte) > 0 // only keep packets that are not fully acked
+            });
         }
-
-        self.last_ack = self.last_ack.wrapping_add(distance);
     }
     pub fn is_send_buffer_full(&self) -> bool {
         self.seq.wrapping_sub(self.last_ack) >= MAX_UNACK

Could you please try this patch?

@r58Playz
Copy link
Contributor Author

Doesn't seem to work.

acked: ack 5889 last 5 []
acked: ack 11453 last 5 []
acked: ack 19645 last 5 []
acked: ack 36029 last 5 ["19645..27837"]
acked: ack 40368 last 5 ["19645..27837"]
acked: ack 48560 last 5 ["19645..27837"]
acked: ack 56752 last 5 ["19645..27837"]
acked: ack 64944 last 5 ["19645..27837"]
acked: ack 73136 last 5 ["19645..27837"]
acked: ack 81328 last 5 ["19645..27837"]
acked: ack 89520 last 5 ["19645..27837"]
acked: ack 93185 last 5 ["19645..27837"]
acked: ack 101377 last 5 ["19645..27837"]
acked: ack 106583 last 5 ["19645..27837"]
acked: ack 114775 last 5 ["19645..27837"]
acked: ack 121625 last 5 ["19645..27837"]
acked: ack 129817 last 5 ["19645..27837"]
acked: ack 138009 last 5 ["19645..27837"]
acked: ack 142260 last 5 ["19645..27837"]
acked: ack 158644 last 5 ["142260..150452", "19645..27837"]
acked: ack 166836 last 5 ["142260..150452", "19645..27837"]
acked: ack 172543 last 5 ["142260..150452", "19645..27837"]

@SajjadPourali
Copy link
Collaborator

Oops, the wrong function was used. I just replaced wrapping_sub with saturating_sub.

diff --git a/src/stream/tcb.rs b/src/stream/tcb.rs
index 0e7383b..e771123 100644
--- a/src/stream/tcb.rs
+++ b/src/stream/tcb.rs
@@ -176,6 +176,7 @@ impl Tcb {
     }
     pub(super) fn change_last_ack(&mut self, ack: u32) {
         let distance = ack.wrapping_sub(self.last_ack);
+        self.last_ack = self.last_ack.wrapping_add(distance);

         if matches!(self.state, TcpState::Established) {
             if let Some(i) = self.inflight_packets.iter().position(|p| p.contains(ack)) {
@@ -187,9 +188,11 @@ impl Tcb {
                     self.inflight_packets.push(inflight_packet);
                 }
             }
+            self.inflight_packets.retain(|p| {
+                let last_byte = p.seq.wrapping_add(p.payload.len() as u32);
+                last_byte.saturating_sub(self.last_ack) > 0 && self.seq.saturating_sub(last_byte) > 0
+            });
         }
-
-        self.last_ack = self.last_ack.wrapping_add(distance);
     }
     pub fn is_send_buffer_full(&self) -> bool {
         self.seq.wrapping_sub(self.last_ack) >= MAX_UNACK

@r58Playz
Copy link
Contributor Author

That fixes it! I'll close my PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants