-
Notifications
You must be signed in to change notification settings - Fork 298
/
probe_ptree.lua
354 lines (311 loc) · 11.6 KB
/
probe_ptree.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
module(..., package.seeall)
local yang = require("lib.yang.yang")
local yang_util = require("lib.yang.util")
local path_data = require("lib.yang.path_data")
local mem = require("lib.stream.mem")
local ptree = require("lib.ptree.ptree")
local cpuset = require("lib.cpuset")
local pci = require("lib.hardware.pci")
local lib = require("core.lib")
local app_graph = require("core.config")
local probe = require("program.ipfix.lib")
local probe_schema = 'snabb-snabbflow-v1'
local probe_cpuset = cpuset.new()
local function warn (msg, ...)
io.stderr:write("Warning: "..msg:format(...).."\n")
io.stderr:flush()
end
function setup_ipfix (conf)
-- yang.print_config_for_schema_by_name(probe_schema, conf, io.stdout)
return setup_workers(conf)
end
function start (name, confpath)
local conf = yang.load_configuration(confpath, {schema_name=probe_schema})
update_cpuset(conf.snabbflow_config.rss.cpu_pool)
return ptree.new_manager{
log_level = 'INFO',
setup_fn = setup_ipfix,
initial_configuration = conf,
schema_name = probe_schema,
cpuset = probe_cpuset,
name = name,
worker_default_scheduling = {
busywait = false,
jit_opt = {
sizemcode=256,
maxmcode=8192,
maxtrace=8000,
maxrecord=50000,
maxsnap=20000,
maxside=10000
}
},
}
end
function run (args)
local name = assert(args[1])
local confpath = assert(args[2])
-- print("Confpath is:", confpath)
local manager = start(name, confpath)
manager:main()
end
local ipfix_default_config = lib.deepcopy(probe.probe_config)
for _, key in ipairs({
"collector_ip",
"collector_port",
"observation_domain",
"exporter_mac",
"templates",
"output_type",
"output",
"input_type",
"input",
"instance"
}) do
ipfix_default_config[key] = nil
end
function update_cpuset (cpu_pool)
local cpu_set = {}
if cpu_pool then
for _, cpu in ipairs(cpu_pool.cpu) do
if not probe_cpuset:contains(cpu) then
probe_cpuset:add(cpu)
end
cpu_set[cpu] = true
end
for _, cpu in ipairs(probe_cpuset:list()) do
if not cpu_set[cpu] then
probe_cpuset:remove(cpu)
end
end
end
end
local software_scaling_parser = path_data.parser_for_schema_by_name(
probe_schema, '/snabbflow-config/rss/software-scaling/exporter[name=""]'
)
local default_software_scaling =
software_scaling_parser(mem.open_input_string(''))
function setup_workers (config)
local interfaces = config.snabbflow_config.interface
local rss = config.snabbflow_config.rss
local flow_director = config.snabbflow_config.flow_director
local ipfix = config.snabbflow_config.ipfix
local collector_pools = {}
for name, p in pairs(ipfix.collector_pool) do
local collectors = {}
for entry in p.collector:iterate() do
table.insert(collectors, {
ip = yang_util.ipv4_ntop(entry.key.ip),
port = entry.key.port
})
end
collector_pools[name] = collectors
end
local function select_collector (pool)
-- Select the collector ip and port from the front of the
-- pool and rotate the pool's elements by one
assert(collector_pools[pool] and #collector_pools[pool] > 0,
"Undefined or empty collector pool: "..pool)
local collector = table.remove(collector_pools[pool], 1)
table.insert(collector_pools[pool], collector)
return collector
end
if flow_director.default_class.exporter then
assert(not flow_director.class[flow_director.default_class.exporter],
"Exporter for the default traffic class can not be the exporter for a defined class.")
end
local class_order = {}
for exporter in pairs(flow_director.class) do
table.insert(class_order, exporter)
end
table.sort(class_order, function (x, y)
return flow_director.class[x].order < flow_director.class[y].order
end)
local function class_name (exporter, class)
-- Including order in name to avoid name collision with 'default' class
return ("%s_%d"):format(exporter, class.order)
end
local rss_links = {}
local function rss_link_name (class)
if not rss_links[class] then
rss_links[class] = 0
end
local qno = rss_links[class] + 1
rss_links[class] = qno
return class.."_"..qno
end
local observation_domain = ipfix.observation_domain_base
local function next_observation_domain ()
local ret = observation_domain
observation_domain = observation_domain + 1
return ret
end
local workers = {}
local worker_opts = {}
local mellanox = {}
update_cpuset(rss.cpu_pool)
local function ensure_device_unique (device, interfaces)
for other in pairs(interfaces) do
if device ~= other then
if pci.qualified(device) == pci.qualified(other) then
error("Duplicate interfaces: "..device..", "..other..
"\nNot applying configuration. Remove one of them via"..
("\n snabb config remove <snabbflow> /snabbflow-config/interface[device=%q]")
:format(other))
end
end
end
end
for rss_group = 1, rss.hardware_scaling.rss_groups do
local inputs, outputs = {}, {}
for device, opt in pairs(interfaces) do
ensure_device_unique(device, interfaces)
local input = lib.deepcopy(opt)
input.device = device
input.rxq = rss_group - 1
table.insert(inputs, input)
-- The mellanox driver requires a master process that sets up
-- all queues for the interface. We collect all queues per
-- device of this type here.
local device_info = pci.device_info(device)
if device_info.driver == 'apps.mellanox.connectx' then
local spec = mellanox[device]
if not spec then
spec = { ifName = input.name,
ifAlias = input.description,
queues = {},
recvq_size = input.receive_queue_size }
mellanox[device] = spec
end
table.insert(spec.queues, { id = input.rxq })
else
-- Silently truncate receive-queue-size for other drivers.
-- (We are not sure what they can handle.)
input.receive_queue_size = math.min(input.receive_queue_size, 8192)
end
end
for name, exporter in pairs(ipfix.exporter) do
local config = {}
for key in pairs(ipfix_default_config) do
config[key] = ipfix[key]
end
config.exporter_ip = yang_util.ipv4_ntop(ipfix.exporter_ip)
config.collector_pool = exporter.collector_pool
config.templates = exporter.template
config.output_type = "tap_routed"
config.add_packet_metadata = false
config.maps = {}
for name, map in pairs(ipfix.maps) do
config.maps[name] = map.file
end
local software_scaling = (rss.software_scaling.exporter and
rss.software_scaling.exporter[name])
or default_software_scaling
local num_instances = 1
if not software_scaling.embed then
num_instances = software_scaling.instances
end
for i = 1, num_instances do
-- Create a clone of the configuration for parameters
-- specific to the instance
local iconfig = lib.deepcopy(config)
-- This is used to disambiguate multiple instances of the
-- ipfix app (possibly using multiple instances of the same
-- template) within a single worker.
iconfig.instance = name.."_"..i
local rss_link
local class = flow_director.class[name]
if class then
rss_link = rss_link_name(class_name(name, class))
elseif name == flow_director.default_class.exporter then
rss_link = rss_link_name('default')
else
-- No traffic class configured for exporter, do not create
-- instances.
warn("No traffic class configured for exporter '%s'.", name)
break
end
local collector = select_collector(config.collector_pool)
iconfig.collector_ip = collector.ip
iconfig.collector_port = collector.port
iconfig.collector_pool = nil
iconfig.log_date = ipfix.log_date
local od = next_observation_domain()
iconfig.observation_domain = od
iconfig.output = "ipfixexport"..od
if ipfix.maps.log_directory then
iconfig.maps_logfile =
ipfix.maps.log_directory.."/"..od..".log"
end
-- Scale the scan protection parameters by the number of
-- ipfix instances in this RSS class
local scale_factor = rss.hardware_scaling.rss_groups * num_instances
iconfig.scan_protection = {
enable = ipfix.scan_protection.enable,
threshold_rate = ipfix.scan_protection.threshold_rate / scale_factor,
export_rate = ipfix.scan_protection.export_rate / scale_factor,
}
local output
if software_scaling.embed then
output = {
link_name = rss_link,
args = iconfig
}
else
output = { type = "interlink", link_name = rss_link }
iconfig.input_type = "interlink"
iconfig.input = rss_link
workers[rss_link] = probe.configure_graph(iconfig)
-- Dedicated exporter processes are restartable
worker_opts[rss_link] = {
restart_intensity = software_scaling.restart.intensity,
restart_period = software_scaling.restart.period
}
end
table.insert(outputs, output)
end
end
local rss_config = {
default_class = flow_director.default_class.exporter ~= nil,
classes = {},
remove_extension_headers = flow_director.remove_ipv6_extension_headers
}
for _, exporter in ipairs(class_order) do
local class = flow_director.class[exporter]
if not ipfix.exporter[exporter] then
error(("Exporter '%s' referenced in traffic class %d is not defined.")
:format(exporter, class.order))
end
table.insert(rss_config.classes, {
name = class_name(exporter, class),
filter = class.filter,
continue = class.continue
})
end
workers["rss"..rss_group] = probe.configure_rss_graph(
rss_config, inputs, outputs, ipfix.log_date, rss_group
)
end
-- Create a trivial app graph that only contains the control apps
-- for the Mellanox driver, which sets up the queues and
-- maintains interface counters.
local ctrl_graph, need_ctrl = probe.configure_mlx_ctrl_graph(mellanox, ipfix.log_date)
if need_ctrl then
workers["mlx_ctrl"] = ctrl_graph
worker_opts["mlx_ctrl"] = {acquire_cpu=false}
end
if false then -- enable to debug
for name, graph in pairs(workers) do
print("worker", name)
print("", "apps:")
for name, _ in pairs(graph.apps) do
print("", "", name)
end
print("", "links:")
for spec in pairs(graph.links) do
print("", "", spec)
end
end
end
return workers, worker_opts
end