/
disk.go
170 lines (142 loc) · 3.81 KB
/
disk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Package disk implements an rep-mgr informer which can provide different
// disk-related metrics from the IPFS daemon as an api.Metric.
package disk
import (
"context"
"fmt"
"github.com/mtdepin/rep-mgr/ipfsconn/knodemanager"
"github.com/mtdepin/rep-mgr/observations"
"go.opencensus.io/stats"
"sync"
logging "github.com/ipfs/go-log/v2"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/mtdepin/rep-mgr/api"
"go.opencensus.io/trace"
)
// MetricType identifies the type of metric to fetch from the IPFS daemon.
type MetricType int
const (
// MetricFreeSpace provides the available space reported by IPFS
MetricFreeSpace MetricType = iota
// MetricRepoSize provides the used space reported by IPFS
MetricRepoSize
)
// String returns a string representation for MetricType.
func (t MetricType) String() string {
switch t {
case MetricFreeSpace:
return "freespace"
case MetricRepoSize:
return "reposize"
}
return ""
}
var logger = logging.Logger("diskinfo")
// Informer is a simple object to implement the ipfscluster.Informer
// and Component interfaces.
type Informer struct {
config *Config // set when created, readonly
km *knodemanager.KnodeManager
mu sync.Mutex // guards access to following fields
rpcClient *rpc.Client
}
// NewInformer returns an initialized informer using the given InformerConfig.
func NewInformer(cfg *Config, manager *knodemanager.KnodeManager) (*Informer, error) {
err := cfg.Validate()
if err != nil {
return nil, err
}
return &Informer{
config: cfg,
km: manager,
}, nil
}
// Name returns the name of the metric issued by this informer.
func (disk *Informer) Name() string {
return disk.config.MetricType.String()
}
// SetClient provides us with an rpc.Client which allows
// contacting other components in the cluster.
func (disk *Informer) SetClient(c *rpc.Client) {
disk.mu.Lock()
defer disk.mu.Unlock()
disk.rpcClient = c
}
// Shutdown is called on cluster shutdown. We just invalidate
// any metrics from this point.
func (disk *Informer) Shutdown(ctx context.Context) error {
_, span := trace.StartSpan(ctx, "informer/disk/Shutdown")
defer span.End()
disk.mu.Lock()
defer disk.mu.Unlock()
disk.rpcClient = nil
return nil
}
// GetMetrics returns the metric obtained by this Informer. It must always
// return at least one metric.
func (disk *Informer) GetMetrics(ctx context.Context) []api.Metric {
ctx, span := trace.StartSpan(ctx, "informer/disk/GetMetric")
defer span.End()
if disk.km != nil {
return disk.km.GetMetric()
}
logger.Errorf("disk GetMetrics: unexpected here, knodemanage is nil")
//return []api.Metric{}
disk.mu.Lock()
rpcClient := disk.rpcClient
disk.mu.Unlock()
if rpcClient == nil {
return []api.Metric{
{
Name: disk.Name(),
Valid: false,
},
}
}
var repoStat api.IPFSRepoStat
var weight uint64
var value string
valid := true
err := rpcClient.CallContext(
ctx,
"",
"IPFSConnector",
"RepoStat",
struct{}{},
&repoStat,
)
if err != nil {
logger.Error(err)
valid = false
} else {
switch disk.config.MetricType {
case MetricFreeSpace:
size := repoStat.RepoSize
total := repoStat.StorageMax
if size < total {
weight = total - size
} else {
// Make sure we don't underflow and stop
// sending this metric when space is exhausted.
weight = 0
valid = false
logger.Warn("reported freespace is 0")
}
value = fmt.Sprintf("%d", weight)
case MetricRepoSize:
// smaller repositories have more priority
weight = -repoStat.RepoSize
value = fmt.Sprintf("%d", repoStat.RepoSize)
}
}
m := api.Metric{
Name: disk.Name(),
Value: value,
Valid: valid,
Weight: int64(weight),
Partitionable: false,
}
m.SetTTL(disk.config.MetricTTL)
stats.Record(ctx, observations.InformerDisk.M(m.Weight))
return []api.Metric{m}
}