Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MPMC bounded queue for group freelist #1472

Merged
merged 13 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/apps/interlink/freelist_instrument.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local histogram = require("core.histogram")
local tsc = require("lib.tsc")

function instrument_freelist ()
local ts = tsc.new()
local rebalance_latency = histogram.create('engine/rebalance_latency.histogram', 1, 100e6)
local reclaim_latency = histogram.create('engine/reclaim_latency.histogram', 1, 100e6)

local rebalance_step, reclaim_step = packet.rebalance_step, packet.reclaim_step
packet.rebalance_step = function ()
local start = ts:stamp()
rebalance_step()
rebalance_latency:add(tonumber(ts:to_ns(ts:stamp()-start)))
end
packet.reclaim_step = function ()
local start = ts:stamp()
reclaim_step()
reclaim_latency:add(tonumber(ts:to_ns(ts:stamp()-start)))
end

return rebalance_latency, reclaim_latency
end

function histogram_csv_header (out)
out = out or io.stdout
out:write("histogram,lo,hi,count\n")
end

function histogram_csv (histogram, name, out)
out = out or io.stdout
name = name or 'untitled'
for count, lo, hi in histogram:iterate() do
out:write(("%s,%f,%f,%d\n"):format(name, lo, hi, tonumber(count)))
out:flush()
end
end
38 changes: 38 additions & 0 deletions src/apps/interlink/test_sink.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local Receiver = require("apps.interlink.receiver")
local Sink = require("apps.basic.basic_apps").Sink
local lib = require("core.lib")
local numa = require("lib.numa")

function configure (c, name)
config.app(c, name, Receiver)
config.app(c, "sink", Sink)
config.link(c, name..".output -> sink.input")
end

function start (name, duration)
local c = config.new()
configure(c, name)
engine.configure(c)
engine.main{duration=duration}
end

local instr = require("apps.interlink.freelist_instrument")

function start_instrument (name, duration, core)
numa.bind_to_cpu(core, 'skip')
local rebalance_latency = instr.instrument_freelist()
start(name, duration)
instr.histogram_csv(rebalance_latency, "rebalance")
local min, avg, max = rebalance_latency:summarize()
io.stderr:write(("(%d) rebalance latency (ns) min:%16s avg:%16s max:%16s\n")
:format(core,
lib.comma_value(math.floor(min)),
lib.comma_value(math.floor(avg)),
lib.comma_value(math.floor(max))))
io.stderr:flush()
end

52 changes: 48 additions & 4 deletions src/apps/interlink/test_source.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,56 @@ module(...,package.seeall)

local Transmitter = require("apps.interlink.transmitter")
local Source = require("apps.basic.basic_apps").Source
local lib = require("core.lib")
local numa = require("lib.numa")

function start (name)
local c = config.new()
function configure (c, name)
config.app(c, name, Transmitter)
config.app(c, "source", Source)
config.link(c, "source.output -> "..name..".input")
config.link(c, "source."..name.." -> "..name..".input")
end

function start (name, duration)
local c = config.new()
configure(c, name)
engine.configure(c)
engine.main()
engine.main{duration=duration}
end

function startn (name, duration, n)
local c = config.new()
for i=1,n do
configure(c, name..i)
end
engine.configure(c)
engine.main{duration=duration}
end

function txpackets ()
local txpackets = 0
for _, output in ipairs(engine.app_table["source"].output) do
txpackets = txpackets + link.stats(output).rxpackets
end
return txpackets
end

local instr = require("apps.interlink.freelist_instrument")

function startn_instrument (name, duration, n, core)
numa.bind_to_cpu(core, 'skip')
local _, reclaim_latency = instr.instrument_freelist()
startn(name, duration, n)
local txpackets = txpackets()
instr.histogram_csv(reclaim_latency, "reclaim")
local min, avg, max = reclaim_latency:summarize()
engine.main{duration=1, no_report=true}
io.stderr:write(("(%d) reclaim latency (ns) min:%16s avg:%16s max:%16s\n")
:format(core,
lib.comma_value(math.floor(min)),
lib.comma_value(math.floor(avg)),
lib.comma_value(math.floor(max))))
io.stderr:write(("%.3f Mpps\n"):format(txpackets / 1e6 / duration))
io.stderr:flush()

--engine.report_links()
end
36 changes: 36 additions & 0 deletions src/apps/interlink/wait_test.snabb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!snabb snsh

-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

local worker = require("core.worker")
local numa = require("lib.numa")

-- Test wait times caused by group freelist rebalancing
-- Synopsis: wait_test.snabb [duration] [nconsumers]
local DURATION = tonumber(main.parameters[1]) or 10
local NCONSUMERS = tonumber(main.parameters[2]) or 10
local CPUS = numa.parse_cpuset(main.parameters[3] or "")

local cores = {}
for core in pairs(CPUS) do
table.insert(cores, core)
table.sort(cores)
end

require("apps.interlink.freelist_instrument").histogram_csv_header()
io.stdout:flush()

for i=1,NCONSUMERS do
worker.start("sink"..i, ([[require("apps.interlink.test_sink").start_instrument(%q, %d, %s)]])
:format("test"..i, DURATION, cores[1+i]))
end

worker.start("source", ([[require("apps.interlink.test_source").startn_instrument(%q, %d, %d, %s)]])
:format("test", DURATION, NCONSUMERS, assert(cores[1])))

engine.main{done = function ()
for w, s in pairs(worker.status()) do
if s.alive then return false end
end
return true
end}
3 changes: 1 addition & 2 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,9 @@ function breathe ()
end
::PUSH_EXIT::
counter.add(breaths)
-- Commit counters and rebalance freelists at a reasonable frequency
-- Commit counters at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
packet.rebalance_freelists()
end
running = false
end
Expand Down
163 changes: 163 additions & 0 deletions src/core/group_freelist.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local sync = require("core.sync")
local shm = require("core.shm")
local lib = require("core.lib")
local ffi = require("ffi")

local waitfor, compiler_barrier = lib.waitfor, lib.compiler_barrier
local band = bit.band

-- Group freelist: lock-free multi-producer multi-consumer ring buffer
-- (mpmc queue)
--
-- https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
--
-- NB: assumes 32-bit wide loads/stores are atomic (as is the fact on x86_64)!

-- Group freelist holds up to SIZE chunks of chunksize packets each
chunksize = 2048

-- (SIZE=1024)*(chunksize=2048) == roughly two million packets
local SIZE = 1024 -- must be a power of two
local MAX = SIZE - 1

local CACHELINE = 64 -- XXX - make dynamic
local INT = ffi.sizeof("uint32_t")

ffi.cdef([[
struct group_freelist_chunk {
uint32_t sequence[1], nfree;
struct packet *list[]]..chunksize..[[];
} __attribute__((packed))]])

ffi.cdef([[
struct group_freelist {
uint32_t enqueue_pos[1];
uint8_t pad_enqueue_pos[]]..CACHELINE-1*INT..[[];

uint32_t dequeue_pos[1];
uint8_t pad_dequeue_pos[]]..CACHELINE-1*INT..[[];

struct group_freelist_chunk chunk[]]..SIZE..[[];

uint32_t state[1];
} __attribute__((packed, aligned(]]..CACHELINE..[[)))]])

-- Group freelists states
local CREATE, INIT, READY = 0, 1, 2

function freelist_create (name)
local fl = shm.create(name, "struct group_freelist")
if sync.cas(fl.state, CREATE, INIT) then
for i = 0, MAX do
fl.chunk[i].sequence[0] = i
end
fl.state[0] = READY
else
waitfor(function () return fl.state[0] == READY end)
end
return fl
end

function freelist_open (name, readonly)
local fl = shm.open(name, "struct group_freelist", readonly)
waitfor(function () return fl.state[0] == READY end)
return fl
end

local function mask (i)
return band(i, MAX)
end

function start_add (fl)
local pos = fl.enqueue_pos[0]
while true do
local chunk = fl.chunk[mask(pos)]
local seq = chunk.sequence[0]
local dif = seq - pos
if dif == 0 then
if sync.cas(fl.enqueue_pos, pos, pos+1) then
return chunk, pos+1
end
elseif dif < 0 then
return
else
compiler_barrier() -- ensure fresh load of enqueue_pos
pos = fl.enqueue_pos[0]
end
end
end

function start_remove (fl)
local pos = fl.dequeue_pos[0]
while true do
local chunk = fl.chunk[mask(pos)]
local seq = chunk.sequence[0]
local dif = seq - (pos+1)
if dif == 0 then
if sync.cas(fl.dequeue_pos, pos, pos+1) then
return chunk, pos+MAX+1
end
elseif dif < 0 then
return
else
compiler_barrier() -- ensure fresh load of dequeue_pos
pos = fl.dequeue_pos[0]
end
end
end

function finish (chunk, seq)
chunk.sequence[0] = seq
end

function selftest ()
local fl = freelist_create("test_freelist")
assert(not start_remove(fl)) -- empty

local w1, sw1 = start_add(fl)
local w2, sw2 = start_add(fl)
assert(not start_remove(fl)) -- empty
finish(w2, sw2)
assert(not start_remove(fl)) -- empty
finish(w1, sw1)
local r1, sr1 = start_remove(fl)
assert(r1 == w1)
local r2, sr2 = start_remove(fl)
assert(r2 == w2)
assert(not start_remove(fl)) -- empty
finish(r1, sr1)
finish(r2, sr2)
assert(not start_remove(fl)) -- empty

for i=1,SIZE do
local w, sw = start_add(fl)
assert(w)
finish(w, sw)
end
assert(not start_add(fl)) -- full
for i=1,SIZE do
local r, sr = start_remove(fl)
assert(r)
finish(r, sr)
end
assert(not start_remove(fl)) -- empty

local w = {}
for _=1,10000 do
for _=1,math.random(SIZE) do
local w1, sw = start_add(fl)
if not w1 then break end
finish(w1, sw)
table.insert(w, w1)
end
for _=1,math.random(#w) do
local r, sr = start_remove(fl)
assert(r == table.remove(w, 1))
finish(r, sr)
end
end
end