Skip to content

Commit

Permalink
Merge PR #1472 (Use MPMC bounded queue for group freelist) into max-next
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Apr 6, 2022
2 parents 43ce0fa + 616e6e2 commit 2ff0924
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 38 deletions.
40 changes: 40 additions & 0 deletions src/apps/interlink/freelist_instrument.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local histogram = require("core.histogram")
local tsc = require("lib.tsc")

function instrument_freelist ()
local ts = tsc.new()
local rebalance_latency = histogram.create('engine/rebalance_latency.histogram', 1, 100e6)
local reclaim_latency = histogram.create('engine/reclaim_latency.histogram', 1, 100e6)

local rebalance_step, reclaim_step = packet.rebalance_step, packet.reclaim_step
packet.rebalance_step = function ()
local start = ts:stamp()
rebalance_step()
rebalance_latency:add(tonumber(ts:to_ns(ts:stamp()-start)))
end
packet.reclaim_step = function ()
local start = ts:stamp()
reclaim_step()
reclaim_latency:add(tonumber(ts:to_ns(ts:stamp()-start)))
end

return rebalance_latency, reclaim_latency
end

function histogram_csv_header (out)
out = out or io.stdout
out:write("histogram,lo,hi,count\n")
end

function histogram_csv (histogram, name, out)
out = out or io.stdout
name = name or 'untitled'
for count, lo, hi in histogram:iterate() do
out:write(("%s,%f,%f,%d\n"):format(name, lo, hi, tonumber(count)))
out:flush()
end
end
38 changes: 38 additions & 0 deletions src/apps/interlink/test_sink.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local Receiver = require("apps.interlink.receiver")
local Sink = require("apps.basic.basic_apps").Sink
local lib = require("core.lib")
local numa = require("lib.numa")

function configure (c, name)
config.app(c, name, Receiver)
config.app(c, "sink", Sink)
config.link(c, name..".output -> sink.input")
end

function start (name, duration)
local c = config.new()
configure(c, name)
engine.configure(c)
engine.main{duration=duration}
end

local instr = require("apps.interlink.freelist_instrument")

function start_instrument (name, duration, core)
numa.bind_to_cpu(core, 'skip')
local rebalance_latency = instr.instrument_freelist()
start(name, duration)
instr.histogram_csv(rebalance_latency, "rebalance")
local min, avg, max = rebalance_latency:summarize()
io.stderr:write(("(%d) rebalance latency (ns) min:%16s avg:%16s max:%16s\n")
:format(core,
lib.comma_value(math.floor(min)),
lib.comma_value(math.floor(avg)),
lib.comma_value(math.floor(max))))
io.stderr:flush()
end

52 changes: 48 additions & 4 deletions src/apps/interlink/test_source.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,56 @@ module(...,package.seeall)

local Transmitter = require("apps.interlink.transmitter")
local Source = require("apps.basic.basic_apps").Source
local lib = require("core.lib")
local numa = require("lib.numa")

function start (name)
local c = config.new()
function configure (c, name)
config.app(c, name, Transmitter)
config.app(c, "source", Source)
config.link(c, "source.output -> "..name..".input")
config.link(c, "source."..name.." -> "..name..".input")
end

function start (name, duration)
local c = config.new()
configure(c, name)
engine.configure(c)
engine.main()
engine.main{duration=duration}
end

function startn (name, duration, n)
local c = config.new()
for i=1,n do
configure(c, name..i)
end
engine.configure(c)
engine.main{duration=duration}
end

function txpackets ()
local txpackets = 0
for _, output in ipairs(engine.app_table["source"].output) do
txpackets = txpackets + link.stats(output).rxpackets
end
return txpackets
end

local instr = require("apps.interlink.freelist_instrument")

function startn_instrument (name, duration, n, core)
numa.bind_to_cpu(core, 'skip')
local _, reclaim_latency = instr.instrument_freelist()
startn(name, duration, n)
local txpackets = txpackets()
instr.histogram_csv(reclaim_latency, "reclaim")
local min, avg, max = reclaim_latency:summarize()
engine.main{duration=1, no_report=true}
io.stderr:write(("(%d) reclaim latency (ns) min:%16s avg:%16s max:%16s\n")
:format(core,
lib.comma_value(math.floor(min)),
lib.comma_value(math.floor(avg)),
lib.comma_value(math.floor(max))))
io.stderr:write(("%.3f Mpps\n"):format(txpackets / 1e6 / duration))
io.stderr:flush()

--engine.report_links()
end
36 changes: 36 additions & 0 deletions src/apps/interlink/wait_test.snabb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!snabb snsh

-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

local worker = require("core.worker")
local numa = require("lib.numa")

-- Test wait times caused by group freelist rebalancing
-- Synopsis: wait_test.snabb [duration] [nconsumers]
local DURATION = tonumber(main.parameters[1]) or 10
local NCONSUMERS = tonumber(main.parameters[2]) or 10
local CPUS = numa.parse_cpuset(main.parameters[3] or "")

local cores = {}
for core in pairs(CPUS) do
table.insert(cores, core)
table.sort(cores)
end

require("apps.interlink.freelist_instrument").histogram_csv_header()
io.stdout:flush()

for i=1,NCONSUMERS do
worker.start("sink"..i, ([[require("apps.interlink.test_sink").start_instrument(%q, %d, %s)]])
:format("test"..i, DURATION, cores[1+i]))
end

worker.start("source", ([[require("apps.interlink.test_source").startn_instrument(%q, %d, %d, %s)]])
:format("test", DURATION, NCONSUMERS, assert(cores[1])))

engine.main{done = function ()
for w, s in pairs(worker.status()) do
if s.alive then return false end
end
return true
end}
3 changes: 1 addition & 2 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,9 @@ function breathe ()
end
::PUSH_EXIT::
counter.add(breaths)
-- Commit counters and rebalance freelists at a reasonable frequency
-- Commit counters at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
packet.rebalance_freelists()
end
running = false
end
Expand Down
163 changes: 163 additions & 0 deletions src/core/group_freelist.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local sync = require("core.sync")
local shm = require("core.shm")
local lib = require("core.lib")
local ffi = require("ffi")

local waitfor, compiler_barrier = lib.waitfor, lib.compiler_barrier
local band = bit.band

-- Group freelist: lock-free multi-producer multi-consumer ring buffer
-- (mpmc queue)
--
-- https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
--
-- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)!

-- Group freelist holds up to SIZE chunks of chunksize packets each
chunksize = 2048

-- (SIZE=1024)*(chunksize=2048) == roughly two million packets
local SIZE = 1024 -- must be a power of two
local MAX = SIZE - 1

local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("uint32_t")

ffi.cdef([[
struct group_freelist_chunk {
uint32_t sequence[1], nfree;
struct packet *list[]]..chunksize..[[];
} __attribute__((packed))]])

ffi.cdef([[
struct group_freelist {
uint32_t enqueue_pos[1];
uint8_t pad_enqueue_pos[]]..CACHELINE-1*INT..[[];
uint32_t dequeue_pos[1];
uint8_t pad_dequeue_pos[]]..CACHELINE-1*INT..[[];
struct group_freelist_chunk chunk[]]..SIZE..[[];
uint32_t state[1];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))]])

-- Group freelists states
local CREATE, INIT, READY = 0, 1, 2

function freelist_create (name)
local fl = shm.create(name, "struct group_freelist")
if sync.cas(fl.state, CREATE, INIT) then
for i = 0, MAX do
fl.chunk[i].sequence[0] = i
end
fl.state[0] = READY
else
waitfor(function () return fl.state[0] == READY end)
end
return fl
end

function freelist_open (name, readonly)
local fl = shm.open(name, "struct group_freelist", readonly)
waitfor(function () return fl.state[0] == READY end)
return fl
end

local function mask (i)
return band(i, MAX)
end

function start_add (fl)
local pos = fl.enqueue_pos[0]
while true do
local chunk = fl.chunk[mask(pos)]
local seq = chunk.sequence[0]
local dif = seq - pos
if dif == 0 then
if sync.cas(fl.enqueue_pos, pos, pos+1) then
return chunk, pos+1
end
elseif dif < 0 then
return
else
compiler_barrier() -- ensure fresh load of enqueue_pos
pos = fl.enqueue_pos[0]
end
end
end

function start_remove (fl)
local pos = fl.dequeue_pos[0]
while true do
local chunk = fl.chunk[mask(pos)]
local seq = chunk.sequence[0]
local dif = seq - (pos+1)
if dif == 0 then
if sync.cas(fl.dequeue_pos, pos, pos+1) then
return chunk, pos+MAX+1
end
elseif dif < 0 then
return
else
compiler_barrier() -- ensure fresh load of dequeue_pos
pos = fl.dequeue_pos[0]
end
end
end

function finish (chunk, seq)
chunk.sequence[0] = seq
end

function selftest ()
local fl = freelist_create("test_freelist")
assert(not start_remove(fl)) -- empty

local w1, sw1 = start_add(fl)
local w2, sw2 = start_add(fl)
assert(not start_remove(fl)) -- empty
finish(w2, sw2)
assert(not start_remove(fl)) -- empty
finish(w1, sw1)
local r1, sr1 = start_remove(fl)
assert(r1 == w1)
local r2, sr2 = start_remove(fl)
assert(r2 == w2)
assert(not start_remove(fl)) -- empty
finish(r1, sr1)
finish(r2, sr2)
assert(not start_remove(fl)) -- empty

for i=1,SIZE do
local w, sw = start_add(fl)
assert(w)
finish(w, sw)
end
assert(not start_add(fl)) -- full
for i=1,SIZE do
local r, sr = start_remove(fl)
assert(r)
finish(r, sr)
end
assert(not start_remove(fl)) -- empty

local w = {}
for _=1,10000 do
for _=1,math.random(SIZE) do
local w1, sw = start_add(fl)
if not w1 then break end
finish(w1, sw)
table.insert(w, w1)
end
for _=1,math.random(#w) do
local r, sr = start_remove(fl)
assert(r == table.remove(w, 1))
finish(r, sr)
end
end
end

0 comments on commit 2ff0924

Please sign in to comment.