Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FlowTracker: high-level interface #20

Open
emmericp opened this issue Jul 24, 2017 · 0 comments
Open

FlowTracker: high-level interface #20

emmericp opened this issue Jul 24, 2017 · 0 comments

Comments

@emmericp
Copy link

rough sketch of my idea of a high-level API in pseudo-code:

user-defined module

-- must be done on top level to be available/defined in all threads
ffi.cdef[[
	struct my_flow_state {
		uint64_t packet_counter;
		uint64_t byte_counter;
		uint64_t first_seen;
		uint64_t last_seen;
		uint8_t some_flags;
		uint16_t some_fancy_data[20];
	};
]]

local tracker = flowtracker:new{
	struct = "struct my_flow_state",
	ip4Handler = "handleIp4Packet",
	ip4TimeoutHandler = "handleIp4Timeout",
	-- default = ffi.new("struct my_flow_state", { other defaults go here })
}

-- this part should be wrapped by flowscope and exposed via CLI arguments
local dev = device.config{...}
for i = 0, 3 do
	-- get from QQ or from a device queue
	lm.startTask(flowtracker.analyzerTask, tracker, dev:getRxQueue(i))
end
-- end wrapped part

-- state starts out empty if it doesn't exist yet; buf is whatever the device queue or QQ gives us
function handleIp4Packet(tuple, state, buf, isFirstPacket)
	-- implicit lock by TBB
	state.packet_counter = state.packet_counter + 1
	state.byte_counter = state.byte_counter + buf:getSize()
	if isFirstPacket then
		state.first_seen = time()
	end
	state.last_seen = time()
	-- can add custom "active timeout" (like ipfix) here
end

function handleIp4Timeout(tuple, state)
	print("flow died, state was: %s", state) -- assume state has reasonable __tostring
end

flowtracker looks like this

function flowtracker:new(args)
	-- get size of stateType and round up to something
	-- in C++: force template instantiation of several hashtable types (4,8,16,32,64,....?) bytes value?
	-- get appropriate hashtables
	-- check parameters here
	local obj = setmetatable(args, flowtracker)
	obj.table4 = C.get_table(ffi.sizeof(obj.stateType)),
	obj.default = obj.default or ffi.new(obj.stateType),
	lm.startTask("__FLOWTRACKER_SWAPPER", obj)
	return obj
end

function flowtracker:analyzer(queue)
	local handler4 = _G[self.ip4Handler]
	assert(handler4)
	while lm.running() do
		local bufs = -- perform the usual DPDK incantations to get packets
		for buf in ipairs(bufs) do
			-- also handle IPv4/6/whatever
			local tuple = extractTuple(buf)
			-- copy-constructed
			local accessor, new = self.table4.insertWithDefault(tuple, self.default)
			handler4(tuple, accessor:getValue(), buf, new)
			accessor:release()
		end
	end
end
-- usual libmoon threading magic
__FLOWTRACKER_ANALYZER = flowtracker.analyzer
flowtracker.analyzerTask = "__FLOWTRACKER_ANALYZER"

-- don't forget the usual magic in __serialize for thread-stuff


-- swapper goes here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant