-
Notifications
You must be signed in to change notification settings - Fork 110
/
local_robot.go
1362 lines (1230 loc) · 45.4 KB
/
local_robot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package robotimpl defines implementations of robot.Robot and robot.LocalRobot.
//
// It also provides a remote robot implementation that is aware that the robot.Robot
// it is working with is not on the same physical system.
package robotimpl
import (
"context"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Masterminds/semver/v3"
"github.com/pkg/errors"
"go.uber.org/multierr"
packagespb "go.viam.com/api/app/packages/v1"
modulepb "go.viam.com/api/module/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.viam.com/rdk/cloud"
"go.viam.com/rdk/config"
icloud "go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/operation"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/referenceframe"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
"go.viam.com/rdk/robot/client"
"go.viam.com/rdk/robot/framesystem"
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/robot/web"
weboptions "go.viam.com/rdk/robot/web/options"
"go.viam.com/rdk/session"
"go.viam.com/rdk/utils"
)
var _ = robot.LocalRobot(&localRobot{})
// localRobot satisfies robot.LocalRobot and defers most
// logic to its manager.
type localRobot struct {
// statusLock guards calls to the Status method.
statusLock sync.Mutex
manager *resourceManager
mostRecentCfg atomic.Value // config.Config
operations *operation.Manager
sessionManager session.Manager
packageManager packages.ManagerSyncer
localPackages packages.ManagerSyncer
cloudConnSvc icloud.ConnectionService
logger logging.Logger
activeBackgroundWorkers sync.WaitGroup
// reconfigureWorkers tracks goroutines spawned by reconfiguration functions. we only
// wait on this group in tests to prevent goleak-related failures. however, we do not
// wait on this group outside of testing, since the related goroutines may be running
// outside code and have unexpected behavior.
reconfigureWorkers sync.WaitGroup
cancelBackgroundWorkers func()
closeContext context.Context
triggerConfig chan struct{}
configTicker *time.Ticker
revealSensitiveConfigDiffs bool
shutdownCallback func()
// lastWeakDependentsRound stores the value of the resource graph's
// logical clock when updateWeakDependents was called.
lastWeakDependentsRound atomic.Int64
// internal services that are in the graph but we also hold onto
webSvc web.Service
frameSvc framesystem.Service
// map keyed by Module.Name. This is necessary to get the package manager to use a new folder
// when a local tarball is updated.
localModuleVersions map[string]semver.Version
}
// ExportResourcesAsDot exports the resource graph as a DOT representation for
// visualization.
// DOT reference: https://graphviz.org/doc/info/lang.html
func (r *localRobot) ExportResourcesAsDot(index int) (resource.GetSnapshotInfo, error) {
return r.manager.ExportDot(index)
}
// RemoteByName returns a remote robot by name. If it does not exist
// nil is returned.
func (r *localRobot) RemoteByName(name string) (robot.Robot, bool) {
return r.manager.RemoteByName(name)
}
// ResourceByName returns a resource by name. If it does not exist
// nil is returned.
func (r *localRobot) ResourceByName(name resource.Name) (resource.Resource, error) {
return r.manager.ResourceByName(name)
}
// RemoteNames returns the names of all known remote robots.
func (r *localRobot) RemoteNames() []string {
return r.manager.RemoteNames()
}
// ResourceNames returns the names of all known resources.
func (r *localRobot) ResourceNames() []resource.Name {
return r.manager.ResourceNames()
}
// ResourceRPCAPIs returns all known resource RPC APIs in use.
func (r *localRobot) ResourceRPCAPIs() []resource.RPCAPI {
return r.manager.ResourceRPCAPIs()
}
// ProcessManager returns the process manager for the robot.
func (r *localRobot) ProcessManager() pexec.ProcessManager {
return r.manager.processManager
}
// OperationManager returns the operation manager for the robot.
func (r *localRobot) OperationManager() *operation.Manager {
return r.operations
}
// SessionManager returns the session manager for the robot.
func (r *localRobot) SessionManager() session.Manager {
return r.sessionManager
}
// PackageManager returns the package manager for the robot.
func (r *localRobot) PackageManager() packages.Manager {
return r.packageManager
}
// Close attempts to cleanly close down all constituent parts of the robot. It does not wait on reconfigureWorkers,
// as they may be running outside code and have unexpected behavior.
func (r *localRobot) Close(ctx context.Context) error {
// we will stop and close web ourselves since modules need it to be
// removed properly and in the right order, so grab it before its removed
// from the graph/closed automatically.
if r.webSvc != nil {
// we may not have the web service if we closed prematurely
r.webSvc.Stop()
}
if r.cancelBackgroundWorkers != nil {
r.cancelBackgroundWorkers()
r.cancelBackgroundWorkers = nil
if r.configTicker != nil {
r.configTicker.Stop()
}
}
r.activeBackgroundWorkers.Wait()
r.sessionManager.Close()
var err error
if r.cloudConnSvc != nil {
err = multierr.Combine(err, r.cloudConnSvc.Close(ctx))
}
if r.manager != nil {
err = multierr.Combine(err, r.manager.Close(ctx))
}
if r.packageManager != nil {
err = multierr.Combine(err, r.packageManager.Close(ctx))
}
if r.webSvc != nil {
err = multierr.Combine(err, r.webSvc.Close(ctx))
}
return err
}
// StopAll cancels all current and outstanding operations for the robot and stops all actuators and movement.
func (r *localRobot) StopAll(ctx context.Context, extra map[resource.Name]map[string]interface{}) error {
// Stop all operations
for _, op := range r.OperationManager().All() {
op.Cancel()
}
// Stop all stoppable resources
resourceErrs := []string{}
for _, name := range r.ResourceNames() {
res, err := r.ResourceByName(name)
if err != nil {
resourceErrs = append(resourceErrs, name.Name)
continue
}
if actuator, ok := res.(resource.Actuator); ok {
if err := actuator.Stop(ctx, extra[name]); err != nil {
resourceErrs = append(resourceErrs, name.Name)
}
}
}
if len(resourceErrs) > 0 {
return errors.Errorf("failed to stop components named %s", strings.Join(resourceErrs, ","))
}
return nil
}
// Config returns a config representing the current state of the robot.
func (r *localRobot) Config() *config.Config {
cfg := r.mostRecentCfg.Load().(config.Config)
// Use resource manager to generate Modules, Remotes, Components, Processes
// and Services.
//
// NOTE(benji): it would be great if the resource manager could somehow
// generate Cloud, Packages, Network and Auth fields.
generatedCfg := r.manager.createConfig()
cfg.Modules = generatedCfg.Modules
cfg.Remotes = generatedCfg.Remotes
cfg.Components = generatedCfg.Components
cfg.Processes = generatedCfg.Processes
cfg.Services = generatedCfg.Services
return &cfg
}
// Logger returns the logger the robot is using.
func (r *localRobot) Logger() logging.Logger {
return r.logger
}
// StartWeb starts the web server, will return an error if server is already up.
func (r *localRobot) StartWeb(ctx context.Context, o weboptions.Options) (err error) {
return r.webSvc.Start(ctx, o)
}
// StopWeb stops the web server, will be a noop if server is not up.
func (r *localRobot) StopWeb() {
r.webSvc.Stop()
}
// WebAddress return the web service's address.
func (r *localRobot) WebAddress() (string, error) {
return r.webSvc.Address(), nil
}
// ModuleAddress return the module service's address.
func (r *localRobot) ModuleAddress() (string, error) {
return r.webSvc.ModuleAddress(), nil
}
// remoteNameByResource returns the remote the resource is pulled from, if found.
// False can mean either the resource doesn't exist or is local to the robot.
func remoteNameByResource(resourceName resource.Name) (string, bool) {
if !resourceName.ContainsRemoteNames() {
return "", false
}
remote := strings.Split(resourceName.Remote, ":")
return remote[0], true
}
func (r *localRobot) Status(ctx context.Context, resourceNames []resource.Name) ([]robot.Status, error) {
r.statusLock.Lock()
defer r.statusLock.Unlock()
// If no resource names are specified, return status of all resources.
namesToDedupe := resourceNames
if len(resourceNames) == 0 {
namesToDedupe = append(namesToDedupe, r.manager.ResourceNames()...)
}
// Dedupe resources.
resourceNameSet := make(map[resource.Name]struct{}, len(namesToDedupe))
for _, name := range namesToDedupe {
resourceNameSet[name] = struct{}{}
}
// Group remote resource names by owning remote and map those names to
// corresponding name on the remote (without the remote prefix).
remoteResources := make(map[string]map[resource.Name]resource.Name)
for name := range resourceNameSet {
remoteName, ok := remoteNameByResource(name)
if !ok {
continue
}
mappings, ok := remoteResources[remoteName]
if !ok {
mappings = make(map[resource.Name]resource.Name)
}
mappings[name.PopRemote()] = name
remoteResources[remoteName] = mappings
}
// Loop through remotes and get remote resource statuses through remotes.
combinedRemoteResourceStatuses := make(map[resource.Name]robot.Status)
for remoteName, resourceNameMappings := range remoteResources {
remote, ok := r.RemoteByName(remoteName)
if !ok {
// should never happen
r.Logger().CErrorw(ctx, "remote robot not found in resource graph while creating status",
"remote", remoteName)
continue
}
var remoteResourceNames []resource.Name
for remoteResourceName := range resourceNameMappings {
remoteResourceNames = append(remoteResourceNames, remoteResourceName)
}
// Request status of resources associated with the remote from the remote.
remoteResourceStatuses, err := remote.Status(ctx, remoteResourceNames)
if err != nil {
return nil, err
}
for _, remoteResourceStatus := range remoteResourceStatuses {
mappedName, ok := resourceNameMappings[remoteResourceStatus.Name]
if !ok {
// should never happen
r.Logger().CErrorw(ctx,
"failed to find corresponding resource name for remote resource name while creating status",
"resource", remoteResourceStatus.Name,
)
continue
}
// Set name to have remote prefix and add to remoteStatuses.
remoteResourceStatus.Name = mappedName
combinedRemoteResourceStatuses[mappedName] = remoteResourceStatus
}
}
// Loop through entire resourceNameSet and get status for any local resources.
combinedResourceStatuses := make([]robot.Status, 0, len(resourceNameSet))
for name := range resourceNameSet {
// Just append status if it was a remote resource.
resourceStatus, ok := combinedRemoteResourceStatuses[name]
if !ok {
res, err := r.manager.ResourceByName(name)
if err != nil {
return nil, err
}
// If resource API registration had an associated CreateStatus method,
// call that method, otherwise return an empty status.
var status interface{} = map[string]interface{}{}
if apiReg, ok := resource.LookupGenericAPIRegistration(name.API); ok &&
apiReg.Status != nil {
status, err = apiReg.Status(ctx, res)
if err != nil {
return nil, errors.Wrapf(err, "failed to get status from %q", name)
}
}
resNode, ok := r.manager.resources.Node(name)
if !ok {
return nil, resource.NewNotFoundError(name)
}
lastReconfigured := resNode.LastReconfigured()
if lastReconfigured == nil {
return nil, errors.Errorf("resource %s queried for status is not configured",
name)
}
resourceStatus = robot.Status{
Name: name,
LastReconfigured: *lastReconfigured,
Status: status,
}
}
combinedResourceStatuses = append(combinedResourceStatuses, resourceStatus)
}
return combinedResourceStatuses, nil
}
func (r *localRobot) sendTriggerConfig(caller string) {
if r.closeContext.Err() != nil {
return
}
// Attempt to trigger completeConfig goroutine execution when called,
// but does not block if triggerConfig is full.
select {
case <-r.closeContext.Done():
return
case r.triggerConfig <- struct{}{}:
default:
r.Logger().CDebugw(
r.closeContext,
"attempted to trigger reconfiguration, but there is already one queued.",
"caller", caller,
)
}
}
func newWithResources(
ctx context.Context,
cfg *config.Config,
resources map[resource.Name]resource.Resource,
logger logging.Logger,
opts ...Option,
) (robot.LocalRobot, error) {
var rOpts options
var err error
for _, opt := range opts {
opt.apply(&rOpts)
}
closeCtx, cancel := context.WithCancel(ctx)
r := &localRobot{
manager: newResourceManager(
resourceManagerOptions{
debug: cfg.Debug,
fromCommand: cfg.FromCommand,
allowInsecureCreds: cfg.AllowInsecureCreds,
untrustedEnv: cfg.UntrustedEnv,
tlsConfig: cfg.Network.TLSConfig,
},
logger,
),
operations: operation.NewManager(logger),
logger: logger,
closeContext: closeCtx,
cancelBackgroundWorkers: cancel,
// triggerConfig buffers 1 message so that we can queue up to 1 reconfiguration attempt
// (as long as there is 1 queued, further messages can be safely discarded).
triggerConfig: make(chan struct{}, 1),
configTicker: nil,
revealSensitiveConfigDiffs: rOpts.revealSensitiveConfigDiffs,
cloudConnSvc: icloud.NewCloudConnectionService(cfg.Cloud, logger),
shutdownCallback: rOpts.shutdownCallback,
localModuleVersions: make(map[string]semver.Version),
}
r.mostRecentCfg.Store(config.Config{})
var heartbeatWindow time.Duration
if cfg.Network.Sessions.HeartbeatWindow == 0 {
heartbeatWindow = config.DefaultSessionHeartbeatWindow
} else {
heartbeatWindow = cfg.Network.Sessions.HeartbeatWindow
}
r.sessionManager = robot.NewSessionManager(r, heartbeatWindow)
var successful bool
defer func() {
if !successful {
if err := r.Close(context.Background()); err != nil {
logger.CErrorw(ctx, "failed to close robot down after startup failure", "error", err)
}
}
}()
if cfg.Cloud != nil && cfg.Cloud.AppAddress != "" {
r.packageManager = packages.NewDeferredPackageManager(
ctx,
func(ctx context.Context) (packagespb.PackageServiceClient, error) {
_, cloudConn, err := r.cloudConnSvc.AcquireConnection(ctx)
return packagespb.NewPackageServiceClient(cloudConn), err
},
cfg.Cloud,
cfg.PackagePath,
logger,
)
} else {
r.logger.CDebug(ctx, "Using no-op PackageManager when Cloud config is not available")
r.packageManager = packages.NewNoopManager()
}
r.localPackages, err = packages.NewLocalManager(cfg, logger)
if err != nil {
return nil, err
}
// start process manager early
if err := r.manager.processManager.Start(ctx); err != nil {
return nil, err
}
// we assume these never appear in our configs and as such will not be removed from the
// resource graph
r.webSvc = web.New(r, logger, rOpts.webOptions...)
r.frameSvc, err = framesystem.New(ctx, resource.Dependencies{}, logger)
if err != nil {
return nil, err
}
if err := r.manager.resources.AddNode(
web.InternalServiceName,
resource.NewConfiguredGraphNode(resource.Config{}, r.webSvc, builtinModel)); err != nil {
return nil, err
}
if err := r.manager.resources.AddNode(
framesystem.InternalServiceName,
resource.NewConfiguredGraphNode(resource.Config{}, r.frameSvc, builtinModel)); err != nil {
return nil, err
}
if err := r.manager.resources.AddNode(
r.packageManager.Name(),
resource.NewConfiguredGraphNode(resource.Config{}, r.packageManager, builtinModel)); err != nil {
return nil, err
}
if err := r.manager.resources.AddNode(
r.cloudConnSvc.Name(),
resource.NewConfiguredGraphNode(resource.Config{}, r.cloudConnSvc, builtinModel)); err != nil {
return nil, err
}
if err := r.webSvc.StartModule(ctx); err != nil {
return nil, err
}
var cloudID string
if cfg.Cloud != nil {
cloudID = cfg.Cloud.ID
}
homeDir := config.ViamDotDir
if rOpts.viamHomeDir != "" {
homeDir = rOpts.viamHomeDir
}
// Once web service is started, start module manager
r.manager.startModuleManager(
closeCtx,
r.webSvc.ModuleAddress(),
r.removeOrphanedResources,
cfg.UntrustedEnv,
homeDir,
cloudID,
logger,
cfg.PackagePath,
)
r.activeBackgroundWorkers.Add(1)
r.configTicker = time.NewTicker(5 * time.Second)
// This goroutine tries to complete the config and update weak dependencies
// if any resources are not configured. It executes every 5 seconds or when
// manually triggered. Manual triggers are sent when changes in remotes are
// detected and in testing.
goutils.ManagedGo(func() {
for {
if closeCtx.Err() != nil {
return
}
var trigger string
select {
case <-closeCtx.Done():
return
case <-r.configTicker.C:
trigger = "ticker"
case <-r.triggerConfig:
trigger = "remote"
r.logger.CDebugw(ctx, "configuration attempt triggered by remote")
}
anyChanges := r.manager.updateRemotesResourceNames(closeCtx)
if r.manager.anyResourcesNotConfigured() {
anyChanges = true
r.manager.completeConfig(closeCtx, r, false)
}
if anyChanges {
r.updateWeakDependents(ctx)
r.logger.CDebugw(ctx, "configuration attempt completed with changes", "trigger", trigger)
}
}
}, r.activeBackgroundWorkers.Done)
r.Reconfigure(ctx, cfg)
for name, res := range resources {
if err := r.manager.resources.AddNode(
name, resource.NewConfiguredGraphNode(resource.Config{}, res, unknownModel)); err != nil {
return nil, err
}
}
if len(resources) != 0 {
r.updateWeakDependents(ctx)
}
successful = true
return r, nil
}
// New returns a new robot with parts sourced from the given config.
func New(
ctx context.Context,
cfg *config.Config,
logger logging.Logger,
opts ...Option,
) (robot.LocalRobot, error) {
return newWithResources(ctx, cfg, nil, logger, opts...)
}
// removeOrphanedResources is called by the module manager to remove resources
// orphaned due to module crashes.
func (r *localRobot) removeOrphanedResources(ctx context.Context,
rNames []resource.Name,
) {
r.manager.markResourcesRemoved(rNames, nil)
if err := r.manager.removeMarkedAndClose(ctx, nil); err != nil {
r.logger.CErrorw(ctx, "error removing and closing marked resources",
"error", err)
}
r.updateWeakDependents(ctx)
}
// getDependencies derives a collection of dependencies from a robot for a given
// component's name. We don't use the resource manager for this information since
// it is not be constructed at this point.
func (r *localRobot) getDependencies(
ctx context.Context,
rName resource.Name,
gNode *resource.GraphNode,
) (resource.Dependencies, error) {
if deps := gNode.UnresolvedDependencies(); len(deps) != 0 {
return nil, errors.Errorf("resource has unresolved dependencies: %v", deps)
}
allDeps := make(resource.Dependencies)
var needUpdate bool
for _, dep := range r.manager.resources.GetAllParentsOf(rName) {
// If any of the dependencies of this resource has an updatedAt value that
// is "later" than the last value at which we ran updateWeakDependents,
// ensure that we run updateWeakDependents later in this method.
if node, ok := r.manager.resources.Node(dep); ok {
if r.lastWeakDependentsRound.Load() <= node.UpdatedAt() {
needUpdate = true
}
}
// Specifically call ResourceByName and not directly to the manager since this
// will only return fully configured and available resources (not marked for removal
// and no last error).
r, err := r.ResourceByName(dep)
if err != nil {
return nil, &resource.DependencyNotReadyError{Name: dep.Name, Reason: err}
}
allDeps[dep] = r
}
nodeConf := gNode.Config()
for weakDepName, weakDepRes := range r.getWeakDependencies(rName, nodeConf.API, nodeConf.Model) {
if _, ok := allDeps[weakDepName]; ok {
continue
}
allDeps[weakDepName] = weakDepRes
}
if needUpdate {
r.updateWeakDependents(ctx)
}
return allDeps, nil
}
func (r *localRobot) getWeakDependencyMatchers(api resource.API, model resource.Model) []resource.Matcher {
reg, ok := resource.LookupRegistration(api, model)
if !ok {
return nil
}
return reg.WeakDependencies
}
func (r *localRobot) getWeakDependencies(resName resource.Name, api resource.API, model resource.Model) resource.Dependencies {
weakDepMatchers := r.getWeakDependencyMatchers(api, model)
allNames := r.manager.resources.Names()
deps := make(resource.Dependencies, len(allNames))
for _, n := range allNames {
if !(n.API.IsComponent() || n.API.IsService()) || n == resName {
continue
}
res, err := r.ResourceByName(n)
if err != nil {
if !resource.IsDependencyNotReadyError(err) && !resource.IsNotAvailableError(err) {
r.Logger().Debugw("error finding resource while getting weak dependencies", "resource", n, "error", err)
}
continue
}
for _, matcher := range weakDepMatchers {
if matcher.IsMatch(res) {
deps[n] = res
}
}
}
return deps
}
func (r *localRobot) newResource(
ctx context.Context,
gNode *resource.GraphNode,
conf resource.Config,
) (res resource.Resource, err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Wrap(errors.Errorf("%v", r), "panic creating resource")
}
}()
resName := conf.ResourceName()
resInfo, ok := resource.LookupRegistration(resName.API, conf.Model)
if !ok {
return nil, errors.Errorf("unknown resource type: API %q with model %q not registered", resName.API, conf.Model)
}
deps, err := r.getDependencies(ctx, resName, gNode)
if err != nil {
return nil, err
}
c, ok := resource.LookupGenericAPIRegistration(resName.API)
if ok {
// If MaxInstance equals zero then there is not a limit on the number of resources
if c.MaxInstance != 0 {
if err := r.checkMaxInstance(resName.API, c.MaxInstance); err != nil {
return nil, err
}
}
}
if resInfo.Constructor != nil {
return resInfo.Constructor(ctx, deps, conf, gNode.Logger())
}
if resInfo.DeprecatedRobotConstructor == nil {
return nil, errors.Errorf("invariant: no constructor for %q", conf.API)
}
return resInfo.DeprecatedRobotConstructor(ctx, r, conf, gNode.Logger())
}
func (r *localRobot) updateWeakDependents(ctx context.Context) {
// Track the current value of the resource graph's logical clock. This will
// later be used to determine if updateWeakDependents should be called during
// getDependencies.
r.lastWeakDependentsRound.Store(r.manager.resources.CurrLogicalClockValue())
allResources := map[resource.Name]resource.Resource{}
internalResources := map[resource.Name]resource.Resource{}
components := map[resource.Name]resource.Resource{}
for _, n := range r.manager.resources.Names() {
if !(n.API.IsComponent() || n.API.IsService()) {
continue
}
res, err := r.ResourceByName(n)
if err != nil {
if !resource.IsDependencyNotReadyError(err) && !resource.IsNotAvailableError(err) {
r.Logger().CDebugw(ctx, "error finding resource during weak dependent update", "resource", n, "error", err)
}
continue
}
allResources[n] = res
switch {
case n.API.IsComponent():
components[n] = res
case n.API.Type.Namespace == resource.APINamespaceRDKInternal:
internalResources[n] = res
}
}
timeout := utils.GetResourceConfigurationTimeout(r.logger)
// NOTE(erd): this is intentionally hard coded since these services are treated specially with
// how they request dependencies or consume the robot's config. We should make an effort to
// formalize these as servcices that while internal, obey the reconfigure lifecycle.
// For example, the framesystem should depend on all input enabled components while the web
// service depends on all resources.
// For now, we pass all resources and empty configs.
processInternalResources := func(resName resource.Name, res resource.Resource, resChan chan struct{}) {
ctxWithTimeout, timeoutCancel := context.WithTimeout(ctx, timeout)
defer timeoutCancel()
cleanup := utils.SlowStartupLogger(
ctx, "Waiting for internal resource to complete reconfiguration during weak dependencies update", "resource", resName.String(), r.logger)
defer cleanup()
r.reconfigureWorkers.Add(1)
goutils.PanicCapturingGo(func() {
defer func() {
resChan <- struct{}{}
r.reconfigureWorkers.Done()
}()
switch resName {
case web.InternalServiceName:
if err := res.Reconfigure(ctxWithTimeout, allResources, resource.Config{}); err != nil {
r.Logger().CErrorw(ctx, "failed to reconfigure internal service during weak dependencies update", "service", resName, "error", err)
}
case framesystem.InternalServiceName:
fsCfg, err := r.FrameSystemConfig(ctxWithTimeout)
if err != nil {
r.Logger().CErrorw(ctx, "failed to reconfigure internal service during weak dependencies update", "service", resName, "error", err)
break
}
if err := res.Reconfigure(ctxWithTimeout, components, resource.Config{ConvertedAttributes: fsCfg}); err != nil {
r.Logger().CErrorw(ctx, "failed to reconfigure internal service during weak dependencies update", "service", resName, "error", err)
}
case packages.InternalServiceName, packages.DeferredServiceName, icloud.InternalServiceName:
default:
r.logger.CWarnw(ctx, "do not know how to reconfigure internal service during weak dependencies update", "service", resName)
}
})
select {
case <-resChan:
case <-ctxWithTimeout.Done():
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) {
r.logger.CWarn(ctx, utils.NewWeakDependenciesUpdateTimeoutError(resName.String()))
}
case <-ctx.Done():
return
}
}
for resName, res := range internalResources {
select {
case <-ctx.Done():
return
default:
}
resChan := make(chan struct{}, 1)
resName := resName
res := res
processInternalResources(resName, res, resChan)
}
updateResourceWeakDependents := func(ctx context.Context, conf resource.Config) {
resName := conf.ResourceName()
resNode, ok := r.manager.resources.Node(resName)
if !ok {
return
}
res, err := resNode.Resource()
if err != nil {
return
}
if len(r.getWeakDependencyMatchers(conf.API, conf.Model)) == 0 {
return
}
r.Logger().CDebugw(ctx, "handling weak update for resource", "resource", resName)
deps, err := r.getDependencies(ctx, resName, resNode)
if err != nil {
r.Logger().CErrorw(ctx, "failed to get dependencies during weak dependencies update; skipping", "resource", resName, "error", err)
return
}
if err := res.Reconfigure(ctx, deps, conf); err != nil {
r.Logger().CErrorw(ctx, "failed to reconfigure resource during weak dependencies update", "resource", resName, "error", err)
}
}
cfg := r.Config()
for _, conf := range append(cfg.Components, cfg.Services...) {
select {
case <-ctx.Done():
return
default:
}
conf := conf
ctxWithTimeout, timeoutCancel := context.WithTimeout(ctx, timeout)
defer timeoutCancel()
cleanup := utils.SlowStartupLogger(
ctx,
"Waiting for resource to complete reconfiguration during weak dependencies update",
"resource",
conf.ResourceName().String(),
r.logger,
)
resChan := make(chan struct{}, 1)
r.reconfigureWorkers.Add(1)
goutils.PanicCapturingGo(func() {
defer func() {
cleanup()
resChan <- struct{}{}
r.reconfigureWorkers.Done()
}()
updateResourceWeakDependents(ctxWithTimeout, conf)
})
select {
case <-resChan:
case <-ctxWithTimeout.Done():
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) {
r.logger.CWarn(ctx, utils.NewWeakDependenciesUpdateTimeoutError(conf.ResourceName().String()))
}
case <-ctx.Done():
return
}
}
}
// Config returns the info of each individual part that makes up the frame system
// The output of this function is to be sent over GRPC to the client, so the client
// can build its frame system. requests the remote components from the remote's frame system service.
func (r *localRobot) FrameSystemConfig(ctx context.Context) (*framesystem.Config, error) {
localParts, err := r.getLocalFrameSystemParts()
if err != nil {
return nil, err
}
remoteParts, err := r.getRemoteFrameSystemParts(ctx)
if err != nil {
return nil, err
}
return &framesystem.Config{Parts: append(localParts, remoteParts...)}, nil
}
// getLocalFrameSystemParts collects and returns the physical parts of the robot that may have frame info,
// excluding remote robots and services, etc from the robot's config.Config.
func (r *localRobot) getLocalFrameSystemParts() ([]*referenceframe.FrameSystemPart, error) {
cfg := r.Config()
parts := make([]*referenceframe.FrameSystemPart, 0)
for _, component := range cfg.Components {
if component.Frame == nil { // no Frame means dont include in frame system.
continue
}
if component.Name == referenceframe.World {
return nil, errors.Errorf("cannot give frame system part the name %s", referenceframe.World)
}
if component.Frame.Parent == "" {
return nil, errors.Errorf("parent field in frame config for part %q is empty", component.Name)
}
cfgCopy := &referenceframe.LinkConfig{
ID: component.Frame.ID,
Translation: component.Frame.Translation,
Orientation: component.Frame.Orientation,
Geometry: component.Frame.Geometry,
Parent: component.Frame.Parent,
}
if cfgCopy.ID == "" {
cfgCopy.ID = component.Name
}
model, err := r.extractModelFrameJSON(component.ResourceName())
if err != nil && !errors.Is(err, referenceframe.ErrNoModelInformation) {
// When we have non-nil errors here, it is because the resource is not yet available.
// In this case, we will exclude it from the FS.
// When it becomes available, it will be included.
continue
}
lif, err := cfgCopy.ParseConfig()
if err != nil {
return nil, err
}
parts = append(parts, &referenceframe.FrameSystemPart{FrameConfig: lif, ModelFrame: model})
}
return parts, nil
}
func (r *localRobot) getRemoteFrameSystemParts(ctx context.Context) ([]*referenceframe.FrameSystemPart, error) {
cfg := r.Config()
remoteNames := r.RemoteNames()
remoteNameSet := make(map[string]struct{}, len(remoteNames))
for _, val := range remoteNames {
remoteNameSet[val] = struct{}{}
}
remoteParts := make([]*referenceframe.FrameSystemPart, 0)
for _, remoteCfg := range cfg.Remotes {
// remote could be in config without being available (remotes could be down or otherwise unavailable)
if _, ok := remoteNameSet[remoteCfg.Name]; !ok {
r.logger.CDebugf(ctx, "remote %q is not available, skipping", remoteCfg.Name)
continue
}
// build the frame system part that connects remote world to base world
if remoteCfg.Frame == nil { // skip over remote if it has no frame info
r.logger.CDebugf(ctx, "remote %q has no frame config info, skipping", remoteCfg.Name)
continue
}
remoteRobot, ok := r.RemoteByName(remoteCfg.Name)
if !ok {
return nil, errors.Errorf("cannot find remote robot %q", remoteCfg.Name)
}
remote, err := utils.AssertType[robot.RemoteRobot](remoteRobot)
if err != nil {
// should never happen
return nil, err
}
if !remote.Connected() {
r.logger.CDebugf(ctx, "remote %q is not connected, skipping", remoteCfg.Name)
continue
}
lif, err := remoteCfg.Frame.ParseConfig()
if err != nil {
return nil, err
}
parentName := remoteCfg.Name + "_" + referenceframe.World
lif.SetName(parentName)
remoteParts = append(remoteParts, &referenceframe.FrameSystemPart{FrameConfig: lif})
// get the parts from the remote itself
remoteFsCfg, err := remote.FrameSystemConfig(ctx)
if err != nil {
return nil, errors.Wrapf(err, "error from remote %q", remoteCfg.Name)
}
framesystem.PrefixRemoteParts(remoteFsCfg.Parts, remoteCfg.Name, parentName)
remoteParts = append(remoteParts, remoteFsCfg.Parts...)
}
return remoteParts, nil
}
// extractModelFrameJSON finds the robot part with a given name, checks to see if it implements ModelFrame, and returns the
// JSON []byte if it does, or nil if it doesn't.
func (r *localRobot) extractModelFrameJSON(name resource.Name) (referenceframe.Model, error) {
part, err := r.ResourceByName(name)
if err != nil {
return nil, err
}
if framer, ok := part.(referenceframe.ModelFramer); ok {
return framer.ModelFrame(), nil
}
return nil, referenceframe.ErrNoModelInformation
}
// TransformPose will transform the pose of the requested poseInFrame to the desired frame in the robot's frame system.