Skip to content

Commit

Permalink
Merge branch 'ipfix-yang-schema2' into ipfix-yang
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneia committed Oct 4, 2022
2 parents ebc7413 + f2870ac commit b26e2ea
Show file tree
Hide file tree
Showing 46 changed files with 803,738 additions and 1,773 deletions.
2 changes: 1 addition & 1 deletion src/apps/basic/basic_apps.lua
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ end
function Truncate:push ()
for _ = 1, link.nreadable(self.input.input) do
local p = receive(self.input.input)
ffi.fill(p.data, math.min(0, self.n - p.length))
ffi.fill(p.data, math.max(0, self.n - p.length))
p.length = self.n
transmit(self.output.output,p)
end
Expand Down
2 changes: 1 addition & 1 deletion src/apps/intel_mp/loadgen.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ local register = require("lib.hardware.register")
local receive, empty = link.receive, link.empty

local can_transmit, transmit
local num_descriptors = 1024
local num_descriptors = 4096

LoadGen = {}

Expand Down
1 change: 1 addition & 0 deletions src/apps/ipfix/ipfix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ function FlowSet:new (spec, args)
end
add_table_counters('table', o.table)
add_table_counters('rate_table', o.sp.table)
assert(not shm.exists(shm_name.."/packets_in"))
o.shm = shm.create_frame(shm_name, frame_init)

-- Template-specific counters
Expand Down
38 changes: 35 additions & 3 deletions src/apps/mellanox/connectx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ ConnectX.config = {
fc_tx_enable = { default = false },
queues = { required = true },
macvlan = { default = false },
sync_stats_interval = {default = 1}
}
local queue_config = {
id = { required = true },
Expand Down Expand Up @@ -315,7 +316,7 @@ function ConnectX:new (conf)
-- Lists of receive queues by macvlan (used if usemac=true)
local macvlan_rqlist = {}

for _, queue in ipairs(conf.queues) do
for _, queue in ipairs(queues) do
-- Create a shared memory object for controlling the queue pair
local shmpath = "group/pci/"..pciaddress.."/"..queue.id
local cxq = shm.create(shmpath, cxq_t)
Expand Down Expand Up @@ -610,7 +611,7 @@ function ConnectX:new (conf)
-- Post command
req.start_fn(req.hca, req.args)
end
self.sync_timer = lib.throttle(1)
self.sync_timer = lib.throttle(conf.sync_stats_interval)

function free_cxq (cxq)
-- Force CXQ state -> DEAD
Expand Down Expand Up @@ -644,6 +645,7 @@ function ConnectX:new (conf)
free_cxq(cxq)
shm.unlink(shmpath)
end
shm.delete_frame(self.stats)
end

function self:pull ()
Expand All @@ -653,6 +655,31 @@ function ConnectX:new (conf)
end
end

local last_stats = {
rxpackets = 0,
rxbytes = 0,
txpackets = 0,
txbytes = 0
}
function self:report ()
self:sync_stats()
local stats = self.stats
local txpackets = counter.read(stats.txpackets) - last_stats.txpackets
local txbytes = counter.read(stats.txbytes) - last_stats.txbytes
local rxpackets = counter.read(stats.rxpackets) - last_stats.rxpackets
local rxbytes = counter.read(stats.rxbytes) - last_stats.rxbytes
last_stats.txpackets = counter.read(stats.txpackets)
last_stats.txbytes = counter.read(stats.txbytes)
last_stats.rxpackets = counter.read(stats.rxpackets)
last_stats.rxbytes = counter.read(stats.rxbytes)
print(pciaddress,
"TX packets", lib.comma_value(tonumber(txpackets)),
"TX bytes", lib.comma_value(tonumber(txbytes)))
print(pciaddress,
"RX packets", lib.comma_value(tonumber(rxpackets)),
"RX bytes", lib.comma_value(tonumber(rxbytes)))
end

function self:sync_stats ()
for _, req in ipairs(self.stats_reqs) do
local hca = req.hca
Expand Down Expand Up @@ -1459,7 +1486,12 @@ function IO:new (conf)

-- Detach from the NIC.
function self:stop ()
close()
if cxq then
if not sync.cas(cxq.state, IDLE, FREE) then
assert(cxq.state[0] == DEAD, "illegal state detected")
end
close()
end
end

-- Configure self as packetblaster?
Expand Down
67 changes: 67 additions & 0 deletions src/apps/mellanox/connectx_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,72 @@ function basic_match (pci0, pci1)
print("selftest: done")
end

function drop (pci0, pci1)
print("selftest: connectx_test drop")

local basic = require("apps.basic.basic_apps")
local counter = require("core.counter")

local c = config.new()
config.app(c, "source", basic.Source)
local txqueues = {}
for i=1,16 do
table.insert(txqueues, {id="tx"..i})
end
config.app(c, "nic0", connectx.ConnectX, {
pciaddress=pci0,
queues=txqueues
})
for i, queue in ipairs(txqueues) do
config.app(c, "tx"..i, connectx.IO, {
pciaddress=pci0, queue="tx"..i, packetblaster=true
})
config.link(c, "source.output"..i.." -> tx"..i..".input")
end

config.app(c, "sink", basic.Sink)
config.app(c, "nic1", connectx.ConnectX, {
pciaddress=pci1,
queues={{id="rx1"}}
})
config.app(c, "rx1", connectx.IO, {pciaddress=pci1, queue="rx1"})
config.link(c, "rx1.output -> sink.input")

engine.configure(c)

print("waiting for linkup...")
lib.waitfor(function ()
return engine.app_table.nic0.hca:linkup()
and engine.app_table.nic1.hca:linkup()
end)

local stats0 = engine.app_table.nic0.stats
local stats1 = engine.app_table.nic1.stats

local function have_stats ()
return counter.read(stats0.txpackets) > 0
and counter.read(stats1.rxpackets) > 0
end

engine.main({done=have_stats})
engine.report_links()

engine.app_table.nic0:sync_stats()
engine.app_table.nic1:sync_stats()
print("nic0", "txpackets ", tonumber(counter.read(stats0.txpackets)))
print("nic1", "rxpackets ", tonumber(counter.read(stats1.rxpackets)))
print("nic1", "rxdrop ", tonumber(counter.read(stats1.rxdrop)))
print("nic1", "rxdrop_rx1", tonumber(counter.read(stats1.rxdrop_rx1)))
assert(counter.read(stats1.rxpackets) > 0,
"some packets should have been received")
assert(counter.read(stats1.rxdrop) > 0,
"some packets should be dropped")
assert(counter.read(stats1.rxdrop) == counter.read(stats1.rxdrop_rx1),
"Per-queue drop counter should match rxdrop")

print("selftest: done")
end

function selftest ()
local pci0 = os.getenv("SNABB_PCI_CONNECTX_0")
local pci1 = os.getenv("SNABB_PCI_CONNECTX_1")
Expand All @@ -347,5 +413,6 @@ function selftest ()
switch(pci0, pci1, 10e6, 1, 60, 1500, 100, 100, 2, 2, 4)
switch(pci0, pci1, 10e6, 1, 60, 1500, 100, 100, 1, 2, 8)
switch(pci0, pci1, 10e6, 1, 60, 1500, 100, 100, 4, 1, 4)
drop(pci0, pci1)
end

3 changes: 2 additions & 1 deletion src/lib/hardware/pci.lua
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,12 @@ function map_pci_memory (f)
-- unbind_device_from_linux() has already been called but it may take
-- some time for the driver to release the device.
if not mem and err.INVAL then
local filepath = S.readlink("/proc/self/fd/"..f:getfd())
lib.waitfor2("mmap of "..filepath,
function ()
mem, err = f:mmap(nil, st.size, "read, write", "shared", 0)
return mem ~= nil or not err.INVAL
end, 5, 1000000)
end, 10, 1000000)
end
assert(mem, err)
return ffi.cast("uint32_t *", mem)
Expand Down
6 changes: 5 additions & 1 deletion src/lib/ptree/ptree.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ local manager_config_spec = {
}

local worker_opt_spec = {
acquire_cpu = {default=true}, -- Needs dedicated CPU core?
restart_intensity = {default=0}, -- How many restarts are permitted...
restart_period = {default=0} -- ...within period seconds.
}
Expand Down Expand Up @@ -303,6 +304,7 @@ function Manager:compute_workers_aux (worker_app_graphs, worker_opts)
for id in pairs(worker_app_graphs) do
local worker_opt = lib.parse(worker_opts[id] or {}, worker_opt_spec)
local worker_aux = {
acquire_cpu = worker_opt.acquire_cpu,
restart = {
period = worker_opt.restart_period,
intensity = worker_opt.restart_intensity,
Expand Down Expand Up @@ -373,7 +375,9 @@ end
function Manager:compute_scheduling_for_worker(id, app_graph)
local ret = {}
for k, v in pairs(self.worker_default_scheduling) do ret[k] = v end
ret.cpu = self:acquire_cpu_for_worker(id, app_graph)
if self.workers_aux[id].acquire_cpu then
ret.cpu = self:acquire_cpu_for_worker(id, app_graph)
end
return ret
end

Expand Down

0 comments on commit b26e2ea

Please sign in to comment.