Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client connect().on('data', .... terminates early when receiving large packets #11

Open
RileyRaschke opened this issue Jan 13, 2019 · 17 comments

Comments

@RileyRaschke
Copy link

RileyRaschke commented Jan 13, 2019

I think this bug may be a feature from inheriting EventEmitter but i haven't got to dig that far just yet.

A working scenario that runs until terminated:

const NetcatClient = require('netcat/client');
const { spawn } = require('child_process');
// Bad example; made zombies - SORRY!!
//var child = spawn('bash', ['-c','cat /dev/urandom | base64 | nc -l -U /tmp/test.sock']); 
var nc = new NetcatClient();
/**
 * This works! Runs until interrupted as expected
 */
nc.enc('utf8').unixSocket('/tmp/test.sock').connect().pipe(process.stdout);

Chunked data terminates early without error/reason:

const NetcatClient = require('netcat/client');
const { spawn } = require('child_process');
// Bad example; made zombies - SORRY!!
//var child = spawn('bash', ['-c','cat /dev/urandom | /usr/bin/base64 | /bin/nc -l -U /tmp/test.sock']); 
var nc = new NetcatClient();
/**
 * This terminates early?!? (and silently?)
 */
nc.enc('utf8').unixSocket('/tmp/test.sock').connect()
  .on('data', (chunk) => {
    process.stdout.write(chunk);
  });

Going to take a break and come back to this one. Will follow up if i find the issue or the correct implementation to achieve an endless stream of chunked data. The child process above will hang but you can always start up an external listener/feed as well to remove that side effect of the test case.

@roccomuso
Copy link
Owner

Have you seen and tested with the keepAlive flag?

@RileyRaschke
Copy link
Author

RileyRaschke commented Jan 15, 2019

I'm working with the Client implementation. I've played with interval and retry without a change to my test results. I also tried using the Server implementation to read the "feed", but connect is missing on that front, although keepalive is there!

I've enhanced my (unix specific) test cases to simplify chunk size comparisons/interactions. Should run from the tests folder in this repo. Bash is using OpenBSD netcat (Debian patchlevel 1.187-1ubuntu0.1) under the hood.

I'm more suspect of a buffer issue than EventEmitter at this point. I'm not seeing any endless loop protections from quick EventEmitter google searches.

Please try out this script with varying chunk counts and you'll see what i mean! I've walked a bit of this libraries code as well, but still trying to wrap my head around where things could be going wrong with .on('data' at the EventEmiter inheritance level rather than the stream/pipe event handlers.

tests/onDataUnixTest.js:

#!/usr/bin/env node

const Netcat = require('../')
const NetcatClient = Netcat.client

const { spawn } = require('child_process')

var CHUNK_COUNT = 100 // Breaks around chunk 34 for me!
// var CHUNK_COUNT = 10 // SUCCESSFUL with clean exit

var bashDataFeedServer = `
  for i in {1..${CHUNK_COUNT}}
  do
    cat /dev/urandom | base64 | dd bs=100 count=10
    sleep 0.1
  done | nc -l -q 1 -U /tmp/test.sock
`
// var child = spawn('bash', ['-c', bashDataFeedServer])
spawn('bash', ['-c', bashDataFeedServer])

var nc = new NetcatClient()

var chunkCount = 0
// nc.enc('utf8').retry(1).interval(1).unixSocket('/tmp/gemini-feed.sock').connect()
nc.enc('utf8').unixSocket('/tmp/test.sock').connect()
  .on('data', (chunk) => {
    ++chunkCount
    process.stdout.write(`Chunkcount: ${chunkCount} - chunkSize: ${chunk.length} \n`)
  })

I have the pipe'd version of the test playing nice with internal workings and a clean exit.
Piping it to a byte count yields exactly the amount expected.

/var/git/netcat$ ./tests/onPipeUnixTest.js  | wc -c
100000

Successful piped implementation: tests/onPipeUnixTest.js

#!/usr/bin/env node

const Netcat = require('../')
const NetcatClient = Netcat.client

const { spawn } = require('child_process')

var bashDataFeedServer = `
  for i in {1..100}
  do
    cat /dev/urandom | base64 | dd bs=100 count=10
    sleep 0.1
  done | nc -l -q 1 -U /tmp/test.sock
`
spawn('bash', ['-c', bashDataFeedServer])

var nc = new NetcatClient()

nc.enc('utf8').unixSocket('/tmp/test.sock').connect().pipe(process.stdout)

Happy to help where i can!

@roccomuso
Copy link
Owner

It could be just a race condition with the spawn. It doesn't care if there's a listener the other way. Once it is spawned it keeps sending data to /tmp/test.sock eitherway. Moreover spawn in that way is an async instruction.

Try a different approach.

@RileyRaschke
Copy link
Author

Using spawn inside the test case for the server was only for demonstration/testing purposes.. My experience looks pretty synced to the sleep timings of the bash server.... I'm thinking buffer's would take over if that wasn't the case, dumping it to the screen all at once; but that's all moot IMO.

Small scale, the test passes, large scale/continuous data fails miserably.

I'll look into your existing tests and see if i can extend one of them to the point of failure from this side.
Your existing tests parameters are passing on my linux host, but with possible EventEmitter memory leaks being issued and handful of times:
(node:2489) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 unpipe listeners added. Use emitter.setMaxListeners() to increase limit

I can also reproduce with entirely separate processes running the server, as well as with ipv4 on localhost, which is how i got here in the first place!

Getting the demonstration working in a single file, while emulating chunked data, and not have any zombies without adding classes/threads, was quite the challenge on it's own! The secret sauce there was adding the -q 1 option to openbsd nc for termination when dd was done writing 10x a second.

My real "server" is a separate python process that handles multiple clients well in it's own testing, present or not. It massages a data stream and i want other processes to use it as they wish. For now, my real use case on the node/js side spawns /bin/nc and get's chunked data from STDIN for days at a time. interestingly enough, it doesn't appear to be line buffered either. I'm receiving full multi line pretty printed JSON over the socket in stable chunks through localhost. Works great, but I'd prefer your native library's clean implementation! 💯

My apologies on the zombie procs if you tried the example from initial issue submission. Super lame of me to not catch those prior to posting... Caught them writing comment #2 :(

@roccomuso
Copy link
Owner

That MaxListenersExceededWarning should be fine.
Thanks for you investigations and your time. The lib is being used in a lot of places so finding the bug if any would be great.

Are you testing on Ubuntu? what Node.js version do you have?
Once we find something we'll write the unit test to cover it.

You're telling me that basically whatever test we do with Netcat client, a big chunk of data and .on('data', ..) listener would fail. We do have a test like this but data chunks are small: https://github.com/roccomuso/netcat/blob/master/tests/tcp.js#L392

We could try to write a test transfering a crypto.randomBytes(1e6) // 1MB and use the .on listener to collect it.

@RileyRaschke
Copy link
Author

Got to play a bit tonight. I feel like crypto.randomBytes() is sending close connection/EOF signals in it's random bytes and invalidating the test, as it fails randomly small scale as well. Looking like we're going to need to induce a synthetic chunk stream to get a valid test case.

My lack of experience with javascript await/async usage will probably show here as well. Took some fiddling to make it act mostly right.. Couldn't get it to let me re-use the port in the loop.

Here's where I'm at:

test('Serving buffers of increasing size', async function (t) {
  var testStartSize = 5 // bytes
  var testStopSize = 10 // bytes
  var testSizes = Array(testStopSize - testStartSize).fill(1).map((x, y) => x + y + testStartSize)
  t.plan(2 * testSizes.length)
  t.timeoutAfter(4000)
  testSizes.forEach(async (testSize) => {
    var chunkCount = 1
    var recvBytes = []
    var nc = new NetcatServer()
    await nc.port(2595 + testSize - testStartSize).listen().serve(crypto.randomBytes(testSize)).on('close', function () {
      t.equal(recvBytes.length, testSize, `Client unbuffered ${recvBytes.length} of ${testSize} bytes from server`)
      t.ok(true, 'server closed (no keepalive)')
    })

    var nc2 = new NetcatClient()
    nc2.addr('127.0.0.1').port(2595 + testSize - testStartSize).connect().on('data', async function (data) {
      recvBytes += data
      console.log('ChunkCount' + chunkCount)
      ++chunkCount
      // TODO: Will need deterministic chunk counts to set plan size when this works...
      // t.ok(data.length > 0, 'client got buffer with data')
    })
  })
})

My desktop site install was node 8.12, but I upped it to Node-v10.15.0 and nothing seems to have changed. (x)ubuntu 18.04, 4.15.0-43-generic as well

Also - any pointers on github best practices or your personal preferences would be appreciated.
Should i fork to show you my test/debugging changes? or just post them here? Thanks in advance!

@roccomuso
Copy link
Owner

Ok I'm asking for Node version since it looks like Node 10 fixed some unit tests that were failing under Mac OS.

You can post here. But couldn't we just simplify that test?
Moreover, are we sure crypto.randomBytes could generate a EOF char? if so what is it? can't we filter it?

@RileyRaschke
Copy link
Author

RileyRaschke commented Jan 27, 2019

I think I've got the native node test written up! It's long, but it shows the successful side (pipes/streams) with the same server implementation apples to apples.

You can make the pipe test fail due to buffers shrinking test counts if you set the interval too low.. but that's understood. Final received counts will match.

Creating this test did show me how to work around the issue via the write stream implementation, but we still have a bug in my opinion!

Let me know if you think this is a valid test. Should drop in your tests folder and pass with current config, but will quickly fail rather than scale per the comments.

File: tests/bigChunkStream.js

'use strict'

// NOTE: must be divisible by 3 for base64 math tests to work!
var CHUNK_SIZE = 3 * 300 // 3 * 1000 works with 10 chunks, but not 20
var CHUNK_COUNT = 20 // 20 works with chunksize: 3 * 300, but not 3 * 1000

const test = require('tape')
const crypto = require('crypto')
const stream = require('stream')
const Netcat = require('../')
const NetcatServer = Netcat.server
const NetcatClient = Netcat.client

var chunkInterval = 100
var testSize = 4 * ((CHUNK_SIZE * CHUNK_COUNT) / 3)

function chunkSever (t, port, cb) {
  var sentCount = 0
  var chunkedServer = new stream.PassThrough()

  setInterval(function () {
    if (++sentCount <= CHUNK_COUNT) {
      chunkedServer.write(crypto.randomBytes(CHUNK_SIZE).toString('base64'))
    } else {
      clearInterval(this)
      chunkedServer.end()
    }
  }, chunkInterval)

  var nc = new NetcatServer()
  nc.port(port).listen()
    .serve(chunkedServer).on('close', function () {
      t.ok(true, 'server closed (no keepalive)')
      cb()
    })
}

test('Client receives big chunks via writeStream by pipe', function (t) {
  t.plan(2 + CHUNK_COUNT)
  t.timeoutAfter(5000)
  var recvBytes = []
  chunkSever(t, 2595, function () {
    t.equal(recvBytes.length, testSize, `Client unbuffered ${recvBytes.length} of ${testSize} bytes from server`)
  })
  var resChunkCount = 1
  var writeStream = new stream.Writable({
    write: function (chunk, encoding, next) {
      var recv = chunk.toString()
      recvBytes += recv
      t.ok(recv.length >= 4 * (CHUNK_SIZE / 3),
        `ChunkCount: ${resChunkCount} with ` +
        `length: ${recv.length} matched or exceeded ${4 * (CHUNK_SIZE / 3)}`)
      ++resChunkCount
      next()
    }
  })
  var ncc = new NetcatClient()
  ncc.addr('127.0.0.1').port(2595).connect().pipe(writeStream)
})

test('Client receives big chunks via eventEmitter inheritance', function (t) {
  t.plan(2 + CHUNK_COUNT)
  t.timeoutAfter(5000)
  var recvBytes = []
  chunkSever(t, 2596, function () {
    t.equal(recvBytes.length, testSize, `Client unbuffered ${recvBytes.length} of ${testSize} bytes from server`)
  })
  var resChunkCount = 1
  var ncc = new NetcatClient()
  ncc.addr('127.0.0.1').port(2596).connect().on('data', function (chunk) {
    var recv = chunk.toString()
    recvBytes += recv
    t.ok(recv.length >= 4 * (CHUNK_SIZE / 3),
      `ChunkCount: ${resChunkCount} with ` +
      `length: ${recv.length} matched or exceeded ${4 * (CHUNK_SIZE / 3)}`)

    ++resChunkCount
  })
})

PS: My bash based chunk server was certainly the simplest yet ;-)

@roccomuso
Copy link
Owner

I don't think it's a bug. Setting a too low time interval is not a valid test. If you think there's an issue it should be reproduceable somehow.
Try to transfer a big file (over ~15Mb) file and check for integrity comparing shasum.

@RileyRaschke
Copy link
Author

RileyRaschke commented Jan 27, 2019

Okay. Test timeouts are now dynamic to test size in updated code below. The test specs scale for a chunk count of 1-74 chunks and static size of 333 bytes (444 base64 chars).

The event emitter implementation will fail at >=75 chunks (second test block), but consuming the stream can handle ANY chunk count (first test block). Second test block is the exact issue i opened. It should act just like the first test, but doesn't.

Start with a chunk count of 50 and go up by one if 74 doesn't work for you?

Running with debugging on shows the client fails to "continue" after about 32kb of the stream is consumed IF using the eventEmitter Implementation.. the server does keep "pushing" data if you watch the debugging output visually.

File: tests\bigChunkStream.js

'use strict'

var CHUNK_SIZE = 333 // NOTE: `CHUNK_SIZE` must be divisible by 3 for base64 math tests to work!
var CHUNK_COUNT = 74 // 1-74 WORK - WHY DOEST >= 75 (or ~32kb) die if read via event emitter?
var CHUNKINTERVAL = 30 // ms

// Add 10MS overhead for crypto.random and STDOUT
var T_TIMEOUT = (CHUNKINTERVAL + 10) * CHUNK_COUNT

const test = require('tape')
const crypto = require('crypto')
const stream = require('stream')
const Netcat = require('../')
const NetcatServer = Netcat.server
const NetcatClient = Netcat.client

// Byte length to base 64length for tests
// base64Length = 4 * ( nBytes / 3)
var testSize = 4 * ((CHUNK_SIZE * CHUNK_COUNT) / 3)

function chunkSever (t, port, cb) {
  var sentCount = 0
  var chunkedServer = new stream.PassThrough()

  setInterval(function () {
    if (++sentCount <= CHUNK_COUNT) {
      chunkedServer.write(crypto.randomBytes(CHUNK_SIZE).toString('base64'))
    } else {
      clearInterval(this)
      chunkedServer.end()
    }
  }, CHUNKINTERVAL)

  var nc = new NetcatServer()
  nc.port(port).listen()
    .serve(chunkedServer).on('close', function () {
      t.ok(true, 'server closed (no keepalive)')
      cb()
    })
}

test('Client receives big chunks via writeStream by pipe', function (t) {
  t.plan(2 + CHUNK_COUNT)
  t.timeoutAfter(T_TIMEOUT)
  var recvBytes = []
  chunkSever(t, 2595, function () {
    t.equal(recvBytes.length, testSize, `Client unbuffered ${recvBytes.length} of ${testSize} bytes from server`)
  })
  var resChunkCount = 1
  var writeStream = new stream.Writable({
    write: function (chunk, encoding, next) {
      var recv = chunk.toString()
      recvBytes += recv
      t.ok(recv.length >= 4 * (CHUNK_SIZE / 3),
        `ChunkCount: ${resChunkCount} with ` +
        `length: ${recv.length} matched or exceeded ${4 * (CHUNK_SIZE / 3)}`)
      ++resChunkCount
      next()
    }
  })
  var ncc = new NetcatClient()
  ncc.addr('127.0.0.1').port(2595).connect().pipe(writeStream)
})

test('Client receives big chunks via eventEmitter inheritance', function (t) {
  t.plan(2 + CHUNK_COUNT)
  t.timeoutAfter(T_TIMEOUT)
  var recvBytes = []
  chunkSever(t, 2596, function () {
    t.equal(recvBytes.length, testSize, `Client unbuffered ${recvBytes.length} of ${testSize} bytes from server`)
  })
  var resChunkCount = 1
  var ncc = new NetcatClient()
  ncc.addr('127.0.0.1').port(2596).connect()
    .on('error', function (error) {
      console.log('ERROR: ' + error.toString())
    })
    .on('data', function (chunk) {
      var recv = chunk.toString()
      recvBytes += recv
      t.ok(recv.length >= 4 * (CHUNK_SIZE / 3),
        `ChunkCount: ${resChunkCount} with ` +
        `length: ${recv.length} matched or exceeded ${4 * (CHUNK_SIZE / 3)}`)
      ++resChunkCount
    })
})

@roccomuso
Copy link
Owner

roccomuso commented Jan 27, 2019

Yeah definetly looks like a bug. Still wasn't able to figure it out. I tried to check if somehow is related with node's buffer hightWaterMark (16384 by default).
Btw running it with DEBUG=netcat* , not sure if client or server related

@roccomuso
Copy link
Owner

roccomuso commented Mar 9, 2020

Any news on this?

Confirmed that looks like a Server problem.

let nc = new NetcatServer()

nc.k().port(2389).listen().serve(process.stdin).on('data', (s, d) => console.log(d.toString())) // broken with big payload
// .pipe(process.stdout) // works with pipe!

@roccomuso
Copy link
Owner

Fixed here: a2e095e
But still that test doesn't pass.

@junbujianwpl
Copy link

It does not work. Just write a ping-pong infinite client and server, it will appear definitely.

@roccomuso
Copy link
Owner

Not clear why. There should be a bug somewhere.

junbujianwpl pushed a commit to junbujianwpl/netcat that referenced this issue Nov 11, 2020
@junbujianwpl
Copy link

Not clear why. There should be a bug somewhere.

junbujianwpl@f008c5d

There is a workaround for a temporary fix. Long time ping pong works for hours. Maybe a clue.

@roccomuso
Copy link
Owner

Cannot tell why that should work. Will be worth investigating

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants