local memory = require "memory" local device = require "device" local stats = require "stats" local log = require "log" local timer = require "timer" local phobos = require "libmoon" local pipe = require "pipe" local ffi = require "ffi" local namespaces = require "namespaces" ffi.cdef[[ struct counter_t { uint64_t c; } counter_t; ]] local sketch = namespaces:get() function master(rxPort) if not rxPort then return log:info("usage: rxPort") end local rxDev = device.config{ port = rxPort, dropEnable = false } device.waitForLinks() local pipe = pipe:newFastPipe() sketchInit() phobos.startTask("counterSlave", rxDev:getRxQueue(0), pipe) phobos.startTask("sketchUpdate", pipe) phobos.waitForTasks() end function sketchInit() sketch.skt = memory.alloc("struct counter_t *", ffi.sizeof("struct counter_t")) sketch.skt.c = 0 return skt end function sketchUpdate(pipe) while phobos.running() do local data = ffi.cast("struct counter_t *", pipe:recv()) sketch.skt.c = sketch.skt.c + data.c end memory.free(data) end function sketchCount() return sketch.skt.c end function counterSlave(queue, pipe) local bufs = memory.bufArray() local rxCtr = stats:newDevRxCounter(queue.dev) -- to track if we lose packets on the NIC local pktCtr = stats:newPktRxCounter("Packets counted", "plain") local timer = timer:new(saveInterval) local skt = sketch.skt local data = memory.alloc("struct counter_t *", ffi.sizeof("struct counter_t")) while phobos.running() do local rx = queue:tryRecv(bufs, 100) for i = 1, rx do local buf = bufs[i] local size = buf:getSize() pktCtr:countPacket(buf) end if rx > 0 then data.c = rx pipe:send(data) end bufs:free(rx) rxCtr:update() pktCtr:update() end rxCtr:finalize() pktCtr:finalize() print("Pipe-relying function counted: " .. tonumber(sketchCount())) -- TODO: check the queue's overflow counter to detect lost packets end -- vim:ts=4:sw=4:noexpandtab:ai