Skip to content

Commit

Permalink
Merge remote-tracking branch 'eugeneia/mcpring-group-freelist' into i…
Browse files Browse the repository at this point in the history
…pfix-rss
  • Loading branch information
alexandergall committed Mar 15, 2018
2 parents 7f6c241 + 154d63c commit c666e4d
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 21 deletions.
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ TESTMODS = $(shell find . -regex '[^\#]*\.\(lua\|dasl\)' -printf '%P ' | \

# TESTSCRIPTS expands to:
# lib/watchdog/selftest.sh ...
# for each executable selftext.sh script in src.
TESTSCRIPTS = $(shell find . -name "selftest.sh" -executable | xargs)
# for each executable selftext.* script in src.
TESTSCRIPTS = $(shell find . -name "selftest.*" -executable | xargs)

PATH := ../lib/luajit/usr/local/bin:$(PATH)

Expand Down
49 changes: 49 additions & 0 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}

function Receiver:new (_, name)
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/receiver/"..name..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Receiver})
end

function Receiver:pull ()
local o, r, n = self.output.output, self.interlink, 0
if not o then return end -- don’t forward packets until connected
while not interlink.empty(r) and n < engine.pull_npackets do
link.transmit(o, interlink.extract(r))
n = n + 1
end
interlink.pull(r)
end

function Receiver:stop ()
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end

-- Detach receivers to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Receiver.shutdown (pid)
for _, name in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..name..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..name..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_receiver(r, shm_name) end
shm.unlink(backlink)
end
end

return Receiver
33 changes: 33 additions & 0 deletions src/apps/interlink/selftest.snabb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!snabb snsh

-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

local worker = require("core.worker")
local interlink = require("lib.interlink")
local Receiver = require("apps.interlink.receiver")
local Sink = require("apps.basic.basic_apps").Sink

-- Synopsis: selftest.snabb [duration]
local DURATION = tonumber(main.parameters[1]) or 10

worker.start("source", [[require("apps.interlink.test_source").start("test")]])

local c = config.new()

config.app(c, "test", Receiver)
config.app(c, "sink", Sink)
config.link(c, "test.output->sink.input")

engine.configure(c)
engine.main({duration=DURATION, report={showlinks=true}})

for w, s in pairs(worker.status()) do
print(("worker %s: pid=%s alive=%s status=%s"):format(
w, s.pid, s.alive, s.status))
end
local stats = link.stats(engine.app_table["sink"].input.input)
print(stats.txpackets / 1e6 / DURATION .. " Mpps")

-- test teardown
engine.configure(config.new())
engine.main({duration=0.1})
15 changes: 15 additions & 0 deletions src/apps/interlink/test_source.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local Transmitter = require("apps.interlink.transmitter")
local Source = require("apps.basic.basic_apps").Source

function start (name)
local c = config.new()
config.app(c, name, Transmitter)
config.app(c, "source", Source)
config.link(c, "source.output -> "..name..".input")
engine.configure(c)
engine.main()
end
49 changes: 49 additions & 0 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}

function Transmitter:new (_, name)
local self = {}
self.shm_name = "group/interlink/"..name..".interlink"
self.backlink = "interlink/transmitter/"..name..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
return setmetatable(self, {__index=Transmitter})
end

function Transmitter:push ()
local i, r = self.input.input, self.interlink
while not (interlink.full(r) or link.empty(i)) do
local p = link.receive(i)
packet.account_free(p) -- stimulate breathing
interlink.insert(r, p)
end
interlink.push(r)
end

function Transmitter:stop ()
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end

-- Detach transmitters to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Transmitter.shutdown (pid)
for _, name in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..name..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..name..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_transmitter(r, shm_name) end
shm.unlink(backlink)
end
end

return Transmitter
9 changes: 6 additions & 3 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ function apply_config_actions (actions)
configuration.apps[name] = nil
end
function ops.start_app (name, class, arg)
local app = class:new(arg)
local app = class:new(arg, name)
if type(app) ~= 'table' then
error(("bad return value from app '%s' start() method: %s"):format(
name, tostring(app)))
Expand Down Expand Up @@ -535,8 +535,11 @@ function breathe ()
end
end
counter.add(breaths)
-- Commit counters at a reasonable frequency
if counter.read(breaths) % 100 == 0 then counter.commit() end
-- Commit counters and rebalance freelists at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
packet.rebalance_freelists()
end
running = false
end

Expand Down
14 changes: 8 additions & 6 deletions src/core/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,19 @@ end

-- Cleanup after Snabb process.
function shutdown (pid)
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("apps.interlink.receiver").shutdown(pid) end)
safely(function () require("apps.interlink.transmitter").shutdown(pid) end)
-- Parent process performs additional cleanup steps.
-- (Parent is the process whose 'group' folder is not a symlink.)
local st, err = S.lstat(shm.root.."/"..pid.."/group")
local is_parent = st and st.isdir
if is_parent then
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("lib.hardware.pci").shutdown(pid) end)
safely(function () require("core.memory").shutdown(pid) end)
end
Expand Down
63 changes: 55 additions & 8 deletions src/core/packet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ local C = ffi.C

local lib = require("core.lib")
local memory = require("core.memory")
local shm = require("core.shm")
local counter = require("core.counter")
local sync = require("core.sync")

require("core.packet_h")

Expand Down Expand Up @@ -45,18 +47,25 @@ end

-- Freelist containing empty packets ready for use.

ffi.cdef[[
local max_packets = 1e6

ffi.cdef([[
struct freelist {
int32_t lock[1];
uint64_t nfree;
uint64_t max;
struct packet *list[?];
struct packet *list[]]..max_packets..[[];
};
]]
]])

local function freelist_full(freelist)
return freelist.nfree == freelist.max
end

local function freelist_add(freelist, element)
-- Safety check
if _G.developer_debug then
assert(freelist.nfree < freelist.max, "freelist overflow")
assert(not freelist_full(freelist), "freelist overflow")
end
freelist.list[freelist.nfree] = element
freelist.nfree = freelist.nfree + 1
Expand All @@ -75,15 +84,49 @@ local function freelist_nfree(freelist)
return freelist.nfree
end

local max_packets = 1e6
local function freelist_lock(freelist)
sync.lock(freelist.lock)
end

local function freelist_unlock(freelist)
sync.unlock(freelist.lock)
end

local packet_allocation_step = 1000
local packets_allocated = 0
local packets_fl = ffi.new("struct freelist", max_packets, 0, max_packets)
local packets_fl = ffi.new("struct freelist")
packets_fl.max = max_packets
local group_fl
if not shm.exists("group/packets.freelist") then
group_fl = shm.create("group/packets.freelist", "struct freelist")
group_fl.max = max_packets
else
group_fl = shm.open("group/packets.freelist", "struct freelist")
end

function rebalance_freelists ()
if freelist_nfree(packets_fl) > packets_allocated then
freelist_lock(group_fl)
while freelist_nfree(packets_fl) > packets_allocated
and not freelist_full(group_fl) do
freelist_add(group_fl, freelist_remove(packets_fl))
end
freelist_unlock(group_fl)
end
end

-- Return an empty packet.
function allocate ()
if freelist_nfree(packets_fl) == 0 then
preallocate_step()
freelist_lock(group_fl)
while freelist_nfree(group_fl) > 0
and freelist_nfree(packets_fl) < packets_allocated do
freelist_add(packets_fl, freelist_remove(group_fl))
end
freelist_unlock(group_fl)
if freelist_nfree(packets_fl) == 0 then
preallocate_step()
end
end
return freelist_remove(packets_fl)
end
Expand Down Expand Up @@ -173,12 +216,16 @@ local function free_internal (p)
freelist_add(packets_fl, p)
end

function free (p)
function account_free (p)
counter.add(engine.frees)
counter.add(engine.freebytes, p.length)
-- Calculate bits of physical capacity required for packet on 10GbE
-- Account for minimum data size and overhead of CRC and inter-packet gap
counter.add(engine.freebits, (math.max(p.length, 46) + 4 + 5) * 8)
end

function free (p)
account_free(p)
free_internal(p)
end

Expand Down
2 changes: 1 addition & 1 deletion src/core/shm.lua
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ function open_frame (path)
local module = types[type]
if module then
assert(frame[name] == nil, "shm: duplicate name: "..name)
frame[name] = module.open(frame.path..file)
frame[name] = module.open(frame.path..file, 'readonly')
frame.specs[name] = {module}
end
end
Expand Down

0 comments on commit c666e4d

Please sign in to comment.