Skip to content

Commit

Permalink
Merge #1349 branch 'snabbco/wingo-next' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
lukego committed Jun 19, 2018
2 parents 7c36e3a + 3120d7c commit 0186ada
Show file tree
Hide file tree
Showing 24 changed files with 2,022 additions and 48 deletions.
2 changes: 1 addition & 1 deletion lib/ljsyscall/syscall/linux/c.lua
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ else -- 64 bit
function C.fstatfs(fd, buf) return syscall(sys.fstatfs, int(fd), void(buf)) end
function C.preadv(fd, iov, iovcnt, offset) return syscall_long(sys.preadv, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.pwritev(fd, iov, iovcnt, offset) return syscall_long(sys.pwritev, int(fd), void(iov), long(iovcnt), ulong(offset)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), ulong(offset), int(whence)) end
function C.lseek(fd, offset, whence) return syscall_off(sys.lseek, int(fd), long(offset), int(whence)) end
function C.sendfile(outfd, infd, offset, count)
return syscall_long(sys.sendfile, int(outfd), int(infd), void(offset), ulong(count))
end
Expand Down
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ TESTMODS = $(shell find . -regex '[^\#]*\.\(lua\|dasl\)' -printf '%P ' | \

# TESTSCRIPTS expands to:
# lib/watchdog/selftest.sh ...
# for each executable selftext.sh script in src.
TESTSCRIPTS = $(shell find . -name "selftest.sh" -executable | xargs)
# for each executable selftext.* script in src.
TESTSCRIPTS = $(shell find . -name "selftest.*" -executable | xargs)

PATH := ../lib/luajit/usr/local/bin:$(PATH)

Expand Down
65 changes: 65 additions & 0 deletions src/apps/interlink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Inter-process links (apps.interlink.*)

The “interlink” transmitter and receiver apps allow for efficient exchange
of packets between Snabb processes within the same process group (see
[Multiprocess operation (core.worker)](#multiprocess-operation-core.worker)).

DIAGRAM: Transmitter and Receiver
+-------------+ +-------------+
| | | |
input | | | |
----* Transmitter | | Reciever *----
| | | | output
| | | |
+-------------+ +-------------+

To make packets from an output port available to other processes, configure a
transmitter app, and link the appropriate output port to its `input` port.

```lua
local Transmitter = require("apps.interlink.transmitter")

config.app(c, "interlink", Transmitter)
config.link(c, "myapp.output -> interlink.input")
```

Then, in the process that should receive the packets, configure a receiver app
with the same name, and link its `output` port as suitable.

```lua
local Receiver = require("apps.interlink.receiver")

config.app(c, "interlink", Receiver)
config.link(c, "interlink.output -> otherapp.input")
```

Subsequently, packets transmitted to the transmitter’s `input` port will appear
on the receiver’s `output` port.

Alternatively, a name can be supplied as a configuration argument to be used
instead of the app’s name:

```lua
config.app(c, "mylink", Receiver, "interlink")
config.link(c, "mylink.output -> otherapp.input")
```

## Configuration

The configured app names denote globally unique queues within the process
group. Alternativelyy, the receiver and transmitter apps can instead be passed
a string that names the shared queue to which to attach to.

Starting either the transmitter or receiver app attaches them to a shared
packet queue visible to the process group under the name that was given to the
app. When the queue identified by the name is unavailable, because it is
already in use by a pair of processes within the group, configuration of the
app network will block until the queue becomes available. Once the transmitter
or receiver apps are stopped they detach from the queue.

Only two processes (one receiver and one transmitter) can be attached to an
interlink queue at the same time, but during the lifetime of the queue (e.g.,
from when the first process attached to when the last process detaches) it can
be shared by any number of receivers and transmitters. Meaning, either process
attached to the queue can be restarted or replaced by another process without
packet loss.
58 changes: 58 additions & 0 deletions src/apps/interlink/receiver.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Receiver = {name="apps.interlink.Receiver"}

function Receiver:new (queue)
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Receiver})
end

function Receiver:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/receiver/"..queue..".interlink"
self.interlink = interlink.attach_receiver(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Receiver:pull ()
local o, r, n = self.output.output, self.interlink, 0
if not o then return end -- don’t forward packets until connected
while not interlink.empty(r) and n < engine.pull_npackets do
link.transmit(o, interlink.extract(r))
n = n + 1
end
interlink.pull(r)
end

function Receiver:stop ()
if self.attached then
interlink.detach_receiver(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach receivers to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Receiver.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/receiver")) do
local backlink = "/"..pid.."/interlink/receiver/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_receiver(r, shm_name) end
shm.unlink(backlink)
end
end

return Receiver
33 changes: 33 additions & 0 deletions src/apps/interlink/selftest.snabb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!snabb snsh

-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

local worker = require("core.worker")
local interlink = require("lib.interlink")
local Receiver = require("apps.interlink.receiver")
local Sink = require("apps.basic.basic_apps").Sink

-- Synopsis: selftest.snabb [duration]
local DURATION = tonumber(main.parameters[1]) or 10

worker.start("source", [[require("apps.interlink.test_source").start("test")]])

local c = config.new()

config.app(c, "test", Receiver)
config.app(c, "sink", Sink)
config.link(c, "test.output->sink.input")

engine.configure(c)
engine.main({duration=DURATION, report={showlinks=true}})

for w, s in pairs(worker.status()) do
print(("worker %s: pid=%s alive=%s status=%s"):format(
w, s.pid, s.alive, s.status))
end
local stats = link.stats(engine.app_table["sink"].input.input)
print(stats.txpackets / 1e6 / DURATION .. " Mpps")

-- test teardown
engine.configure(config.new())
engine.main({duration=0.1})
15 changes: 15 additions & 0 deletions src/apps/interlink/test_source.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local Transmitter = require("apps.interlink.transmitter")
local Source = require("apps.basic.basic_apps").Source

function start (name)
local c = config.new()
config.app(c, name, Transmitter)
config.app(c, "source", Source)
config.link(c, "source.output -> "..name..".input")
engine.configure(c)
engine.main()
end
58 changes: 58 additions & 0 deletions src/apps/interlink/transmitter.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

module(...,package.seeall)

local shm = require("core.shm")
local interlink = require("lib.interlink")

local Transmitter = {name="apps.interlink.Transmitter"}

function Transmitter:new (queue)
packet.enable_group_freelist()
return setmetatable({attached=false, queue=queue}, {__index=Transmitter})
end

function Transmitter:link ()
local queue = self.queue or self.appname
if not self.attached then
self.shm_name = "group/interlink/"..queue..".interlink"
self.backlink = "interlink/transmitter/"..queue..".interlink"
self.interlink = interlink.attach_transmitter(self.shm_name)
shm.alias(self.backlink, self.shm_name)
self.attached = true
end
end

function Transmitter:push ()
local i, r = self.input.input, self.interlink
while not (interlink.full(r) or link.empty(i)) do
local p = link.receive(i)
packet.account_free(p) -- stimulate breathing
interlink.insert(r, p)
end
interlink.push(r)
end

function Transmitter:stop ()
if self.attached then
interlink.detach_transmitter(self.interlink, self.shm_name)
shm.unlink(self.backlink)
end
end

-- Detach transmitters to prevent leaking interlinks opened by pid.
--
-- This is an internal API function provided for cleanup during
-- process termination.
function Transmitter.shutdown (pid)
for _, queue in ipairs(shm.children("/"..pid.."/interlink/transmitter")) do
local backlink = "/"..pid.."/interlink/transmitter/"..queue..".interlink"
local shm_name = "/"..pid.."/group/interlink/"..queue..".interlink"
-- Call protected in case /<pid>/group is already unlinked.
local ok, r = pcall(interlink.open, shm_name)
if ok then interlink.detach_transmitter(r, shm_name) end
shm.unlink(backlink)
end
end

return Transmitter
7 changes: 5 additions & 2 deletions src/core/app.lua
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,11 @@ function breathe ()
end
end
counter.add(breaths)
-- Commit counters at a reasonable frequency
if counter.read(breaths) % 100 == 0 then counter.commit() end
-- Commit counters and rebalance freelists at a reasonable frequency
if counter.read(breaths) % 100 == 0 then
counter.commit()
packet.rebalance_freelists()
end
running = false
end

Expand Down
10 changes: 7 additions & 3 deletions src/core/lib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ function load_conf (file)
end

-- Store Lua representation of value in file.
function store_conf (file, value)
function print_object (value, stream)
stream = stream or io.stdout
local indent = 0
local function print_indent (stream)
for i = 1, indent do stream:write(" ") end
Expand Down Expand Up @@ -151,10 +152,13 @@ function store_conf (file, value)
stream:write(("%s"):format(value))
end
end
local stream = assert(io.open(file, "w"))
stream:write("return ")
print_value(value, stream)
stream:write("\n")
end
function store_conf (file, value)
local stream = assert(io.open(file, "w"))
stream:write("return ")
print_object(value, stream)
stream:close()
end

Expand Down
14 changes: 8 additions & 6 deletions src/core/main.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,19 @@ end

-- Cleanup after Snabb process.
function shutdown (pid)
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("apps.interlink.receiver").shutdown(pid) end)
safely(function () require("apps.interlink.transmitter").shutdown(pid) end)
-- Parent process performs additional cleanup steps.
-- (Parent is the process whose 'group' folder is not a symlink.)
local st, err = S.lstat(shm.root.."/"..pid.."/group")
local is_parent = st and st.isdir
if is_parent then
-- simple pcall helper to print error and continue
local function safely (f)
local ok, err = pcall(f)
if not ok then print(err) end
end
-- Run cleanup hooks
safely(function () require("lib.hardware.pci").shutdown(pid) end)
safely(function () require("core.memory").shutdown(pid) end)
end
Expand Down
Loading

0 comments on commit 0186ada

Please sign in to comment.