-
Notifications
You must be signed in to change notification settings - Fork 104
/
data_manager.go
169 lines (148 loc) · 6.06 KB
/
data_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Package datamanager contains a service type that can be used to capture data from a robot's components.
package datamanager
import (
"context"
"encoding/json"
"reflect"
"slices"
servicepb "go.viam.com/api/service/datamanager/v1"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
"go.viam.com/rdk/utils"
)
func init() {
resource.RegisterAPIWithAssociation(
API,
resource.APIRegistration[Service]{
RPCServiceServerConstructor: NewRPCServiceServer,
RPCServiceHandler: servicepb.RegisterDataManagerServiceHandlerFromEndpoint,
RPCServiceDesc: &servicepb.DataManagerService_ServiceDesc,
RPCClient: NewClientFromConn,
MaxInstance: resource.DefaultMaxInstance,
},
resource.AssociatedConfigRegistration[*AssociatedConfig]{
AttributeMapConverter: newAssociatedConfig,
},
)
}
// Service defines what a Data Manager Service should expose to the users.
//
// Sync example:
//
// // Sync data stored on the machine to the cloud.
// err := data.Sync(context.Background(), nil)
type Service interface {
resource.Resource
// Sync will sync data stored on the machine to the cloud.
Sync(ctx context.Context, extra map[string]interface{}) error
}
// SubtypeName is the name of the type of service.
const SubtypeName = "data_manager"
// API is a variable that identifies the data manager service resource API.
var API = resource.APINamespaceRDK.WithServiceType(SubtypeName)
// Named is a helper for getting the named datamanager's typed resource name.
func Named(name string) resource.Name {
return resource.NewName(API, name)
}
// FromDependencies is a helper for getting the named data manager service from a collection of dependencies.
func FromDependencies(deps resource.Dependencies, name string) (Service, error) {
return resource.FromDependencies[Service](deps, Named(name))
}
// FromRobot is a helper for getting the named data manager service from the given Robot.
func FromRobot(r robot.Robot, name string) (Service, error) {
return robot.ResourceFromRobot[Service](r, Named(name))
}
// NamesFromRobot is a helper for getting all data manager services from the given Robot.
func NamesFromRobot(r robot.Robot) []string {
return robot.NamesByAPI(r, API)
}
// AssociatedConfig specify a list of methods to capture on resources and implements the resource.AssociatedConfig interface.
type AssociatedConfig struct {
CaptureMethods []DataCaptureConfig `json:"capture_methods"`
}
func newAssociatedConfig(attributes utils.AttributeMap) (*AssociatedConfig, error) {
md, err := json.Marshal(attributes)
if err != nil {
return nil, err
}
var conf AssociatedConfig
if err := json.Unmarshal(md, &conf); err != nil {
return nil, err
}
return &conf, nil
}
// Equals describes if an DataCaptureConfigs is equal to a given AssociatedConfig.
func (ac *AssociatedConfig) Equals(other resource.AssociatedConfig) bool {
ac2, err := utils.AssertType[*AssociatedConfig](other)
if err != nil {
return false
}
if len(ac.CaptureMethods) != len(ac2.CaptureMethods) {
return false
}
// naively iterate over the list of capture methods and determine if they are the same
// note that two lists with capture methods [a, b] and [b, a] will not be equal as they are out of order
for i := 0; i < len(ac.CaptureMethods); i++ {
if !ac.CaptureMethods[i].Equals(&ac2.CaptureMethods[i]) {
return false
}
}
return true
}
// UpdateResourceNames allows the caller to modify the resource names of data capture in place.
func (ac *AssociatedConfig) UpdateResourceNames(updater func(old resource.Name) resource.Name) {
for idx := range ac.CaptureMethods {
ac.CaptureMethods[idx].Name = updater(ac.CaptureMethods[idx].Name)
}
}
// Link associates an AssociatedConfig to a specific resource model (e.g. builtin data capture).
func (ac *AssociatedConfig) Link(conf *resource.Config) {
if len(ac.CaptureMethods) == 0 {
return
}
// infer name from first index in CaptureMethods
name := ac.CaptureMethods[0].Name
captureMethodCopies := make([]DataCaptureConfig, 0, len(ac.CaptureMethods))
for _, method := range ac.CaptureMethods {
methodCopy := method
captureMethodCopies = append(captureMethodCopies, methodCopy)
}
if conf.AssociatedAttributes == nil {
conf.AssociatedAttributes = make(map[resource.Name]resource.AssociatedConfig)
}
conf.AssociatedAttributes[name] = &AssociatedConfig{CaptureMethods: captureMethodCopies}
}
// DataCaptureConfig is used to initialize a collector for a component or remote.
type DataCaptureConfig struct {
Name resource.Name `json:"name"`
Method string `json:"method"`
CaptureFrequencyHz float32 `json:"capture_frequency_hz"`
CaptureQueueSize int `json:"capture_queue_size"`
CaptureBufferSize int `json:"capture_buffer_size"`
AdditionalParams map[string]string `json:"additional_params"`
Disabled bool `json:"disabled"`
Tags []string `json:"tags,omitempty"`
CaptureDirectory string `json:"capture_directory"`
}
// Equals checks if one capture config is equal to another.
func (c *DataCaptureConfig) Equals(other *DataCaptureConfig) bool {
return c.Name.String() == other.Name.String() &&
c.Method == other.Method &&
c.CaptureFrequencyHz == other.CaptureFrequencyHz &&
c.CaptureQueueSize == other.CaptureQueueSize &&
c.CaptureBufferSize == other.CaptureBufferSize &&
c.Disabled == other.Disabled &&
slices.Compare(c.Tags, other.Tags) == 0 &&
reflect.DeepEqual(c.AdditionalParams, other.AdditionalParams) &&
c.CaptureDirectory == other.CaptureDirectory
}
// ShouldSyncKey is a special key we use within a modular sensor to pass a boolean
// that indicates to the datamanager whether or not we want to sync.
var ShouldSyncKey = "should_sync"
// CreateShouldSyncReading is a helper for creating the expected reading for a modular sensor
// that passes a bool to the datamanager to indicate whether or not we want to sync.
func CreateShouldSyncReading(toSync bool) map[string]interface{} {
readings := map[string]interface{}{}
readings[ShouldSyncKey] = toSync
return readings
}