Skip to content

Commit

Permalink
allow receiving MEMIF_BUFFER_FLAG_NEXT
Browse files Browse the repository at this point in the history
  • Loading branch information
yoursunny committed Jul 15, 2023
1 parent 2c57239 commit 553c2b9
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 43 deletions.
3 changes: 1 addition & 2 deletions README.md
Expand Up @@ -22,6 +22,7 @@ const memif = new Memif({
// Readable side of the stream gives access to received packets.
memif.on("data", (pkt) => {
// pkt is a Uint8Array containing received packet.
// Fragmented messages with MEMIF_BUFFER_FLAG_NEXT are concatenated.
});

// Writable side of the stream allows transmitting packets.
Expand All @@ -35,5 +36,3 @@ memif.close();
## Limitations

Each `Memif` instance must have a distinct `socketName`.

`MEMIF_BUFFER_FLAG_NEXT` is not supported.
41 changes: 28 additions & 13 deletions lib/memif.ts
Expand Up @@ -8,7 +8,7 @@ const require = createRequire(import.meta.url);

interface NativeMemif {
readonly counters: Memif.Counters;
send: (b: Uint8Array) => void;
send: (buffer: ArrayBuffer, offset: number, len: number, hasNext: boolean) => void;
close: () => void;
}

Expand All @@ -18,7 +18,7 @@ interface NativeMemifOptions {
dataroom: number;
ringCapacityLog2: number;
isServer: boolean;
rx: (b: Uint8Array) => void;
rx: (b: Uint8Array, hasNext: boolean) => void;
state: (up: boolean) => void;
}

Expand Down Expand Up @@ -120,20 +120,24 @@ export class Memif extends Duplex {
override _write(chunk: any, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
void encoding;

let u8: Uint8Array;
if (chunk instanceof Uint8Array) {
u8 = chunk;
let buffer: ArrayBuffer;
let offset: number;
let length: number;
if (ArrayBuffer.isView(chunk)) {
buffer = chunk.buffer;
offset = chunk.byteOffset;
length = chunk.byteLength;
} else if (chunk instanceof ArrayBuffer) {
u8 = new Uint8Array(chunk);
} else if (ArrayBuffer.isView(chunk)) {
u8 = new Uint8Array(chunk.buffer, chunk.byteOffset, chunk.byteLength);
buffer = chunk;
offset = 0;
length = chunk.byteLength;
} else {
callback(new TypeError("chunk must be ArrayBufferView or ArrayBuffer"));
return;
}

try {
this.native.send(u8);
this.native.send(buffer, offset, length, false);
} catch (err: unknown) {
callback(err as Error);
return;
Expand All @@ -150,8 +154,18 @@ export class Memif extends Duplex {
private readonly native: NativeMemif;
private readonly socketName: string;
private connected_ = false;
private rxChunks: Uint8Array[] = [];

private readonly handleRx = (b: Uint8Array) => {
private readonly handleRx = (b: Uint8Array, hasNext: boolean) => {
if (hasNext) {
this.rxChunks.push(b);
return;
}

if (this.rxChunks.length > 0) {
this.rxChunks.push(b);
b = Buffer.concat(this.rxChunks.splice(0, Infinity));
}
this.push(b);
};

Expand Down Expand Up @@ -191,9 +205,10 @@ export namespace Memif {
}

export interface Counters {
nRxDelivered: bigint;
nRxDropped: bigint;
nTxDelivered: bigint;
nRxPackets: bigint;
nRxFragments: bigint;
nTxPackets: bigint;
nTxFragments: bigint;
nTxDropped: bigint;
}
}
50 changes: 31 additions & 19 deletions src/memif.cc
Expand Up @@ -97,31 +97,38 @@ class Memif : public Napi::ObjectWrap<Memif>
{
auto env = info.Env();
auto cnt = Napi::Object::New(env);
cnt.Set("nRxDelivered", Napi::BigInt::New(env, m_nRxDelivered));
cnt.Set("nRxDropped", Napi::BigInt::New(env, m_nRxDropped));
cnt.Set("nTxDelivered", Napi::BigInt::New(env, m_nTxDelivered));
cnt.Set("nRxPackets", Napi::BigInt::New(env, m_nRxPackets));
cnt.Set("nRxFragments", Napi::BigInt::New(env, m_nRxFragments));
cnt.Set("nTxPackets", Napi::BigInt::New(env, m_nTxPackets));
cnt.Set("nTxFragments", Napi::BigInt::New(env, m_nTxFragments));
cnt.Set("nTxDropped", Napi::BigInt::New(env, m_nTxDropped));
return cnt;
}

void send(const Napi::CallbackInfo& info)
{
auto u8 = info[0].As<Napi::Uint8Array>();
size_t len = u8.ByteLength();
auto buffer = info[0].As<Napi::ArrayBuffer>();
uint32_t offset = info[1].ToNumber();
uint32_t length = info[2].ToNumber();
bool hasNext = info[3].ToBoolean();

if (!m_connected || len > m_dataroom) {
if (!m_connected || length > m_dataroom) {
++m_nTxDropped;
return;
}

memif_buffer_t b{};
uint16_t nAlloc = 0;
int err = memif_buffer_alloc(m_conn, 0, &b, 1, &nAlloc, len);
int err = memif_buffer_alloc(m_conn, 0, &b, 1, &nAlloc, length);
if (err != MEMIF_ERR_SUCCESS) {
++m_nTxDropped;
return;
}
std::copy_n(u8.Data(), len, reinterpret_cast<uint8_t*>(b.data));
std::copy_n(reinterpret_cast<const uint8_t*>(buffer.Data()) + offset, length,
reinterpret_cast<uint8_t*>(b.data));
if (hasNext) {
b.flags |= MEMIF_BUFFER_FLAG_NEXT;
}

uint16_t nTx = 0;
err = memif_tx_burst(m_conn, 0, &b, 1, &nTx);
Expand All @@ -130,7 +137,10 @@ class Memif : public Napi::ObjectWrap<Memif>
return;
}

++m_nTxDelivered;
if (!hasNext) {
++m_nTxPackets;
}
++m_nTxFragments;
}

void close(const Napi::CallbackInfo& info)
Expand Down Expand Up @@ -217,17 +227,18 @@ class Memif : public Napi::ObjectWrap<Memif>

void receive(const memif_buffer_t& b)
{
if ((b.flags & MEMIF_BUFFER_FLAG_NEXT) != 0) {
++m_nRxDropped;
return;
}

auto env = Env();
Napi::HandleScope scope(env);
auto u8 = Napi::Uint8Array::New(env, b.len);
std::memcpy(u8.Data(), b.data, b.len);
m_rx.Call({ u8 });
++m_nRxDelivered;
bool hasNext = (b.flags & MEMIF_BUFFER_FLAG_NEXT) != 0;
auto hasNextB = Napi::Boolean::New(env, hasNext);
m_rx.Call({ u8, hasNextB });

if (!hasNext) {
++m_nRxPackets;
}
++m_nRxFragments;
}

void setState(bool up)
Expand Down Expand Up @@ -306,9 +317,10 @@ class Memif : public Napi::ObjectWrap<Memif>
Napi::FunctionReference m_state;
size_t m_dataroom = 0;

uint64_t m_nRxDelivered = 0;
uint64_t m_nRxDropped = 0;
uint64_t m_nTxDelivered = 0;
uint64_t m_nRxPackets = 0;
uint64_t m_nRxFragments = 0;
uint64_t m_nTxPackets = 0;
uint64_t m_nTxFragments = 0;
uint64_t m_nTxDropped = 0;
bool m_running = false;
bool m_connected = false;
Expand Down
40 changes: 31 additions & 9 deletions test/main.js
Expand Up @@ -2,10 +2,11 @@
import assert from "node:assert/strict";
import crypto from "node:crypto";
import path from "node:path";
import { setTimeout as delay } from "node:timers/promises";
import { fileURLToPath } from "node:url";

import { execaNode } from "execa";
import { pEvent } from "p-event";
import { pEvent, pEventMultiple } from "p-event";
import tmp from "tmp";

import { Memif } from "../dist/memif.js";
Expand All @@ -26,18 +27,32 @@ assert(!memif.connected);
await pEvent(memif, "memif:up");
assert(memif.connected);

const c2s = crypto.randomBytes(1024);
const s2c = crypto.randomBytes(1024);
const msg0 = crypto.randomBytes(1500);
const msg1 = crypto.randomBytes(2000);
const msg2 = crypto.randomBytes(1000);

memif.write(c2s);
helper.send(s2c);
const [c2sR, s2cR] = await Promise.all([
pEvent(helper, "message"),
setTimeout(async () => {
memif.write(msg0);
await delay(10);

// @ts-expect-error
const native = memif.native;
const chunk0 = msg1.subarray(0, 1200);
const chunk1 = msg1.subarray(1200, 2000);
native.send(chunk0.buffer, chunk0.byteOffset, chunk0.byteLength, true);
native.send(chunk1.buffer, chunk1.byteOffset, chunk1.byteLength, false);
}, 0);
setTimeout(() => {
helper.send(msg2);
}, 0);
const [[rcv0, rcv1], rcv2] = await Promise.all([
pEventMultiple(helper, "message", { count: 2 }),
pEvent(memif, "data"),
]);

assert(c2s.equals(c2sR));
assert(s2c.equals(s2cR));
assert(msg0.equals(rcv0));
assert(msg1.equals(rcv1));
assert(msg2.equals(rcv2));

assert(memif.connected);
helper.disconnect();
Expand All @@ -47,4 +62,11 @@ await Promise.all([
]);
assert(!memif.connected);

const cnt = memif.counters;
assert.equal(cnt.nRxPackets, 1n);
assert.equal(cnt.nRxFragments, 1n);
assert.equal(cnt.nTxPackets, 2n);
assert.equal(cnt.nTxFragments, 3n);
assert.equal(cnt.nTxDropped, 0n);

memif.destroy();

0 comments on commit 553c2b9

Please sign in to comment.