Permalink
Fetching contributors…
Cannot retrieve contributors at this time
424 lines (365 sloc) 10.5 KB
local ffi = require('ffi')
local math = require('math')
local class = require('radio.core.class')
local platform = require('radio.core.platform')
---
-- Input port of a pipe. These are created in Block's add_type_signature().
--
-- @local
-- @type
-- @tparam Block owner Block owner
-- @tparam string name Input name
local PipeInput = class.factory()
function PipeInput.new(owner, name)
local self = setmetatable({}, PipeInput)
self.owner = owner
self.name = name
self.data_type = nil
self.pipe = nil
return self
end
---
-- Close input end of associated pipe.
--
-- @local
function PipeInput:close()
self.pipe:close_input()
end
---
-- Get input file descriptors of associated pipe.
--
-- @local
-- @treturn array Array of file descriptors
function PipeInput:filenos()
return {self.pipe:fileno_input()}
end
---
-- Output port of a pipe. These are created in Block's add_type_signature().
--
-- @local
-- @type
-- @tparam Block owner Block owner
-- @tparam string name Output name
local PipeOutput = class.factory()
function PipeOutput.new(owner, name)
local self = setmetatable({}, PipeOutput)
self.owner = owner
self.name = name
self.data_type = nil
self.pipes = {}
return self
end
---
-- Close output end of associated pipe.
--
-- @local
function PipeOutput:close()
for i=1, #self.pipes do
self.pipes[i]:close_output()
end
end
---
-- Get output file descriptors of associated pipe.
--
-- @local
-- @treturn array Array of file descriptors
function PipeOutput:filenos()
local fds = {}
for i = 1, #self.pipes do
fds[i] = self.pipes[i]:fileno_output()
end
return fds
end
---
-- Aliased input port of a pipe. These alias PipeInput objects, and are created
-- in CompositeBlock's add_type_signature().
--
-- @local
-- @type
-- @tparam Block owner Block owner
-- @tparam string name Output name
local AliasedPipeInput = class.factory()
function AliasedPipeInput.new(owner, name)
local self = setmetatable({}, AliasedPipeInput)
self.owner = owner
self.name = name
self.real_inputs = {}
return self
end
---
-- Aliased output port of a pipe. These alias PipeOutput objects, and are
-- created in CompositeBlock's add_type_signature().
--
-- @local
-- @type
-- @tparam Block owner Block owner
-- @tparam string name Output name
local AliasedPipeOutput = class.factory()
function AliasedPipeOutput.new(owner, name)
local self = setmetatable({}, AliasedPipeOutput)
self.owner = owner
self.name = name
self.real_output = nil
return self
end
---
-- Pipe. This class implements the serialization/deserialization of sample
-- vectors between blocks.
--
-- @local
-- @type
-- @tparam PipeOutput pipe_output Pipe output port
-- @tparam PipeInput pipe_input Pipe input port
local Pipe = class.factory()
function Pipe.new(pipe_output, pipe_input)
local self = setmetatable({}, Pipe)
self.pipe_output = pipe_output
self.pipe_input = pipe_input
return self
end
---
-- Get sample rate of pipe.
--
-- @local
-- @treturn number Sample rate
function Pipe:get_rate()
return self.pipe_output.owner:get_rate()
end
---
-- Get data type of pipe.
--
-- @local
-- @treturn data_type Data type
function Pipe:get_data_type()
return self.pipe_output.data_type
end
ffi.cdef[[
enum { AF_UNIX = 1 };
enum { SOCK_STREAM = 1 };
int socketpair(int domain, int type, int protocol, int socket_vector[2]);
int close(int fildes);
enum { EPIPE = 32, ECONNRESET = 104 };
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);
]]
---
-- Initialize the pipe.
--
-- @local
function Pipe:initialize()
-- Look up our data type
self.data_type = self:get_data_type()
-- Create UNIX socket pair
local socket_fds = ffi.new("int[2]")
if ffi.C.socketpair(ffi.C.AF_UNIX, ffi.C.SOCK_STREAM, 0, socket_fds) ~= 0 then
error("socketpair(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
self._rfd = socket_fds[0]
self._wfd = socket_fds[1]
self._eof = false
-- Pre-allocate read buffer
self._buf_capacity = 1048576
self._buf = platform.alloc(self._buf_capacity)
self._buf_size = 0
self._buf_read_offset = 0
end
---
-- Update the Pipe's internal read buffer.
--
-- @local
function Pipe:_read_buffer_update()
-- Shift unread samples down to beginning of buffer
local unread_length = self._buf_size - self._buf_read_offset
if unread_length > 0 then
ffi.C.memmove(self._buf, ffi.cast("char *", self._buf) + self._buf_read_offset, unread_length)
end
-- Read new samples in
local bytes_read = tonumber(ffi.C.read(self._rfd, ffi.cast("char *", self._buf) + unread_length, self._buf_capacity - unread_length))
if bytes_read < 0 then
error("read(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
elseif unread_length == 0 and bytes_read == 0 then
self._eof = true
end
-- Update size and reset unread offset
self._buf_size = unread_length + bytes_read
self._buf_read_offset = 0
end
---
-- Get the Pipe's internal read buffer's element count.
--
-- @local
-- @treturn int Count
function Pipe:_read_buffer_count()
-- Return nil on EOF
if self._eof then
return nil
end
-- Return item count in read buffer
return self.data_type.deserialize_count(ffi.cast("char *", self._buf) + self._buf_read_offset, self._buf_size - self._buf_read_offset)
end
---
-- Test if the Pipe's internal read buffer is full.
--
-- @local
-- @treturn bool Full
function Pipe:_read_buffer_full()
-- Return full status of read buffer
return (self._buf_size - self._buf_read_offset) == self._buf_capacity
end
---
-- Deserialize elements from the Pipe's internal read buffer into a vector.
--
-- @local
-- @tparam int num Number of elements to deserialize
-- @treturn Vector Vector
function Pipe:_read_buffer_deserialize(num)
-- Shift samples down to beginning of buffer
if self._buf_read_offset > 0 then
ffi.C.memmove(self._buf, ffi.cast("char *", self._buf) + self._buf_read_offset, self._buf_size - self._buf_read_offset)
self._buf_size = self._buf_size - self._buf_read_offset
self._buf_read_offset = 0
end
-- Deserialize a vector from the read buffer
local vec, size = self.data_type.deserialize_partial(ffi.cast("char *", self._buf), num)
-- Update read offset
self._buf_read_offset = self._buf_read_offset + size
return vec
end
---
-- Read a sample vector from the Pipe.
--
-- @local
-- @treturn Vector|nil Sample vector or nil on EOF
function Pipe:read()
-- Update our read buffer
self:_read_buffer_update()
-- Get available item count
local num = self:_read_buffer_count()
-- Return nil on EOF
if num == nil then
return nil
end
return self:_read_buffer_deserialize(num)
end
---
-- Write a sample vector to the Pipe.
--
-- @local
-- @tparam Vector vec Sample vector
function Pipe:write(vec)
-- Get vector serialized buffer and size
local data, size = self.data_type.serialize(vec)
-- Write entire buffer
local len = 0
while len < size do
local bytes_written = tonumber(ffi.C.write(self._wfd, ffi.cast("char *", data) + len, size - len))
if bytes_written <= 0 then
local errno = ffi.errno()
if errno == ffi.C.EPIPE or errno == ffi.C.ECONNRESET then
io.stderr:write(string.format("[%s] Downstream block %s terminated unexpectedly.\n", self.pipe_output.owner.name, self.pipe_input.owner.name))
end
error("write(): " .. ffi.string(ffi.C.strerror(errno)))
end
len = len + bytes_written
end
end
---
-- Close the input end of the pipe.
--
-- @local
function Pipe:close_input()
if self._rfd then
if ffi.C.close(self._rfd) ~= 0 then
error("close(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
self._rfd = nil
end
end
---
-- Close the output end of the pipe.
--
-- @local
function Pipe:close_output()
if self._wfd then
if ffi.C.close(self._wfd) ~= 0 then
error("close(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
self._wfd = nil
end
end
---
-- Get the file descriptor of the input end of the Pipe.
--
-- @local
-- @treturn int File descriptor
function Pipe:fileno_input()
return self._rfd
end
---
-- Get the file descriptor of the output end of the Pipe.
--
-- @local
-- @treturn int File descriptor
function Pipe:fileno_output()
return self._wfd
end
-- Helper function to read synchronously from a set of pipes
ffi.cdef[[
struct pollfd {
int fd;
short events;
short revents;
};
typedef unsigned long int nfds_t;
enum { POLLIN = 0x1, POLLOUT = 0x4, POLLHUP = 0x10 };
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
]]
local POLL_READ_EVENTS = bit.bor(ffi.C.POLLIN, ffi.C.POLLHUP)
---
-- Read synchronously from a set of pipe. The vectors returned
-- will all be of the same length.
--
-- @local
-- @tparam array pipes Array of Pipe objects
-- @treturn array|nil Array of sample vectors or nil on EOF
local function read_synchronous(pipes)
-- Set up pollfd structures for all not-full pipes
local pollfds = ffi.new("struct pollfd[?]", #pipes)
for i=1, #pipes do
pollfds[i-1].fd = pipes[i]:fileno_input()
pollfds[i-1].events = not pipes[i]:_read_buffer_full() and POLL_READ_EVENTS or 0
end
-- Poll (non-blocking)
local ret = ffi.C.poll(pollfds, #pipes, 0)
if ret < 0 then
error("poll(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
-- Compute maximum available item count across all pipes
local num_elems = math.huge
for i=1, #pipes do
-- Update read buffer if pipe is ready
if pollfds[i-1].revents ~= 0 then
pipes[i]:_read_buffer_update()
end
local count = pipes[i]:_read_buffer_count()
-- Block updating read buffer if we've ran out of items
if count == 0 then
pipes[i]:_read_buffer_update()
count = pipes[i]:_read_buffer_count()
end
-- If we've reached EOF, return nil
if count == nil then
return nil
end
-- Update maximum available item count
num_elems = (count < num_elems) and count or num_elems
end
-- Read maximum available item count from all pipes
local data_in = {}
for i=1, #pipes do
data_in[i] = pipes[i]:_read_buffer_deserialize(num_elems)
end
return data_in
end
-- Exported module
return {PipeInput = PipeInput, PipeOutput = PipeOutput, AliasedPipeInput = AliasedPipeInput, AliasedPipeOutput = AliasedPipeOutput, Pipe = Pipe, read_synchronous = read_synchronous}