-
Notifications
You must be signed in to change notification settings - Fork 25
/
disk.go
164 lines (139 loc) · 4.03 KB
/
disk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package fwdp
/*
#include "../../csrc/fwdp/disk.h"
*/
import "C"
import (
"errors"
"fmt"
"io"
"math"
"unsafe"
"github.com/usnistgov/ndn-dpdk/container/disk"
"github.com/usnistgov/ndn-dpdk/dpdk/bdev"
"github.com/usnistgov/ndn-dpdk/dpdk/eal"
"github.com/usnistgov/ndn-dpdk/dpdk/ealthread"
"github.com/usnistgov/ndn-dpdk/dpdk/spdkenv"
"github.com/usnistgov/ndn-dpdk/iface"
"github.com/usnistgov/ndn-dpdk/ndni"
"go4.org/must"
)
// DiskConfig contains disk service thread configuration.
type DiskConfig struct {
// Locator describes where to create or attach a block device.
bdev.Locator
// Overprovision is the ratio of block device size divided by CS disk capacity.
// Setting this above 1.00 can reduce disk full errors due to some slots still occupied by async I/O.
// Default is 1.05.
Overprovision float64 `json:"overprovision"`
// Bdev specifies the block device.
// If set, Locator and Overprovision are ignored.
Bdev bdev.Device `json:"-"`
// BdevCloser allows closing the block device.
BdevCloser io.Closer `json:"-"`
csDiskCapacity int
}
func (cfg *DiskConfig) createDevice(nBlocks int64) (bdev.Device, io.Closer, error) {
if cfg.Bdev != nil {
return cfg.Bdev, cfg.BdevCloser, nil
}
if !(cfg.Overprovision >= 1.0) {
cfg.Overprovision = 1.05
}
nBlocks = int64(math.Ceil(float64(nBlocks) * cfg.Overprovision))
return cfg.Locator.Create(nBlocks)
}
// Disk represents a disk helper thread.
type Disk struct {
*spdkenv.Thread
id int
c *C.FwDisk
bdev bdev.Device
bdevCloser io.Closer
store *disk.Store
allocs map[int]*disk.Alloc
}
var (
_ ealthread.ThreadWithRole = (*Disk)(nil)
_ ealthread.ThreadWithLoadStat = (*Disk)(nil)
_ DispatchThread = (*Disk)(nil)
)
// DispatchThreadID implements DispatchThread interface.
func (fwdisk *Disk) DispatchThreadID() int {
return fwdisk.id
}
func (fwdisk *Disk) String() string {
return fmt.Sprintf("disk%d", fwdisk.id)
}
// DemuxOf implements DispatchThread interface.
func (fwdisk *Disk) DemuxOf(t ndni.PktType) *iface.InputDemux {
if t == ndni.PktInterest {
return iface.InputDemuxFromPtr(unsafe.Pointer(&fwdisk.c.output))
}
return nil
}
// Close stops and releases the thread.
func (fwdisk *Disk) Close() error {
errs := []error{}
for id, alloc := range fwdisk.allocs {
errs = append(errs, alloc.Close())
delete(fwdisk.allocs, id)
}
if fwdisk.store != nil {
errs = append(errs, fwdisk.store.Close())
fwdisk.store = nil
}
if fwdisk.bdevCloser != nil {
errs = append(errs, fwdisk.bdevCloser.Close())
fwdisk.bdev, fwdisk.bdevCloser = nil, nil
}
if fwdisk.Thread != nil {
errs = append(errs, fwdisk.Thread.Close())
fwdisk.Thread = nil
}
eal.Free(fwdisk.c)
return errors.Join(errs...)
}
// ThreadRole implements ealthread.ThreadWithRole interface.
func (Disk) ThreadRole() string {
return RoleDisk
}
// newDisk creates a disk service thread.
func newDisk(id int, lc eal.LCore, demuxPrep *demuxPreparer, cfg DiskConfig) (fwdisk *Disk, e error) {
fwdisk = &Disk{
id: id,
}
defer func(d *Disk) {
if e != nil {
must.Close(d)
}
}(fwdisk)
if fwdisk.Thread, e = spdkenv.NewThread(); e != nil {
return nil, e
}
calc := disk.SizeCalc{
NThreads: len(demuxPrep.Fwds),
NPackets: cfg.csDiskCapacity,
PacketSize: ndni.PacketMempool.Config().Dataroom,
}
if fwdisk.bdev, fwdisk.bdevCloser, e = cfg.createDevice(calc.MinBlocks()); e != nil {
return nil, e
}
socket := lc.NumaSocket()
fwdisk.c = eal.ZmallocAligned[C.FwDisk]("FwDisk", C.sizeof_FwDisk, 1, socket)
fwdisk.SetLCore(lc)
if fwdisk.store, e = disk.NewStore(fwdisk.bdev, fwdisk.Thread, calc.BlocksPerSlot(),
disk.StoreGetDataCallback.C(C.FwDisk_GotData, fwdisk.c)); e != nil {
return nil, e
}
fwdisk.allocs = map[int]*disk.Alloc{}
for i, fwd := range demuxPrep.Fwds {
alloc := disk.NewAllocIn(fwdisk.store, i, len(demuxPrep.Fwds), fwd.NumaSocket())
fwdisk.allocs[fwd.id] = alloc
if e = fwd.Cs().SetDisk(fwdisk.store, alloc); e != nil {
return nil, fmt.Errorf("Cs[%d].SetDisk: %w", fwd.id, e)
}
}
demuxPrep.Prepare(fwdisk, socket)
return fwdisk, nil
}