Skip to content

Commit

Permalink
core.app: support link-specific push methods
Browse files Browse the repository at this point in the history
An app's link() method is now called with four arguments: mode,
direction, name and link. Mode is either 'link' or 'unlink', direction
is either 'input' or 'output', name is the name of the link link and
link is the link object itself. This allows the app to perform
link-specific actions.

The breathe_push_order array is organized such that the dispatcher in
the breathe() loop calls each app once for each of its input links.
Since the link is known when constructing the array, it can be passed
to the push() method, which may use it to narrow the processing of
incoming packets to that specific link.

If the mode is 'link' and direction is 'input', the link() method may
return two optional objects, a method and an arbitrary argument.  If a
method is given, it is called by the dispatcher in the breathe() loop
instead of the regular push() method for this particular link. Apart
from the link in question, the dispatcher also passes the argument to
the method.  The argument is private to the app and is treated as an
opaque object by the dispatcher.

An app may also define link-specific push methods by using the naming
convention 'push_<linkname>', where <linkname> is the name of the
input link.  If such a method exists, it is preferred over the regular
push() method for that link by the dispatcher.

The method returned by link(), if supplied, takes precedence over the
name-based push method.
  • Loading branch information
alexandergall committed Jan 11, 2019
1 parent 395a4f8 commit 2c7929c
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions src/core/app.lua
Expand Up @@ -81,19 +81,19 @@ end

-- Run app:methodname() in protected mode (pcall). If it throws an
-- error app will be marked as dead and restarted eventually.
function with_restart (app, method)
function with_restart (app, method, link, arg)
local status, result
if use_restart then
-- Run fn in protected mode using pcall.
status, result = pcall(method, app)
status, result = pcall(method, app, link, arg)

-- If pcall caught an error mark app as "dead" (record time and cause
-- of death).
if not status then
app.dead = { error = result, time = now() }
end
else
status, result = true, method(app)
status, result = true, method(app, link, arg)
end
return status, result
end
Expand Down Expand Up @@ -305,14 +305,14 @@ function apply_config_actions (actions)
local link = app.output[linkname]
app.output[linkname] = nil
remove_link_from_array(app.output, link)
if app.link then app:link() end
if app.link then app:link('unlink', 'output', linkname) end
end
function ops.unlink_input (appname, linkname)
local app = app_table[appname]
local link = app.input[linkname]
app.input[linkname] = nil
remove_link_from_array(app.input, link)
if app.link then app:link() end
if app.link then app:link('unlink', 'input', linkname) end
end
function ops.free_link (linkspec)
link.free(link_table[linkspec], linkspec)
Expand All @@ -330,7 +330,7 @@ function apply_config_actions (actions)
appname..": duplicate output link "..linkname)
app.output[linkname] = link
table.insert(app.output, link)
if app.link then app:link() end
if app.link then app:link('link', 'output', linkname, link) end
end
function ops.link_input (appname, linkname, linkspec)
local app = app_table[appname]
Expand All @@ -339,7 +339,10 @@ function apply_config_actions (actions)
appname..": duplicate input link "..linkname)
app.input[linkname] = link
table.insert(app.input, link)
if app.link then app:link() end
if app.link then
local method, arg = app:link('link', 'input', linkname, link)
app.push_link[linkname] = { method = method, arg = arg }
end
end
function ops.stop_app (name)
local app = app_table[name]
Expand All @@ -358,6 +361,7 @@ function apply_config_actions (actions)
app.appname = name
app.output = {}
app.input = {}
app.push_link = {}
app_table[name] = app
app.zone = zone
if app.shm then
Expand Down Expand Up @@ -430,11 +434,17 @@ function compute_breathe_order ()
for linkname,link in pairs(app.input) do
if type(linkname) == "string" then
linknames[link] = appname..'.'..linkname
inputs[link] = app
local method, arg = app['push_'..linkname] or app.push, nil
if app.push_link[linkname] then
method = app.push_link[linkname].method or method
arg = app.push_link[linkname].arg
end
inputs[link] = { app = app, method = method, arg = arg, link = link }
end
end
end
for link,app in pairs(inputs) do
for link,spec in pairs(inputs) do
local app = spec.app
successors[link] = {}
if not app.pull then
for _,succ in pairs(app.output) do
Expand Down Expand Up @@ -464,7 +474,7 @@ function compute_breathe_order ()
local link_order = tsort(nodes, entry_nodes, successors)
local i = 1
for _,link in ipairs(link_order) do
if breathe_push_order[#breathe_push_order] ~= inputs[link] then
if breathe_push_order[#breathe_push_order] ~= inputs[link].app then
table.insert(breathe_push_order, inputs[link])
end
end
Expand Down Expand Up @@ -539,10 +549,11 @@ function breathe ()
end
-- Exhale: push work out through the app network
for i = 1, #breathe_push_order do
local app = breathe_push_order[i]
if app.push and not app.dead then
local spec = breathe_push_order[i]
local app = spec.app
if spec.method and not app.dead then
zone(app.zone)
with_restart(app, app.push)
with_restart(app, spec.method, spec.link, spec.arg)
zone()
end
end
Expand Down

0 comments on commit 2c7929c

Please sign in to comment.