/
remote_push.go
110 lines (90 loc) · 2.73 KB
/
remote_push.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"context"
"github.com/ipfs/testground/plans/qri/sim"
"github.com/qri-io/qri/event"
"github.com/qri-io/qri/repo/profile"
)
// RunPlanRemotePushPull demonstrates test output functions
// This method emits two Messages and one Metric
func RunPlanRemotePushPull(ctx context.Context, plan *Plan) error {
if err := plan.SetupNetwork(ctx); err != nil {
return err
}
var constructor ActorConstructor
// even actors push, odd actors receive
if plan.Runenv.TestInstanceCount%2 == 0 {
constructor = newPusher
} else {
constructor = newReceiver
}
if err := plan.ConstructActor(ctx, constructor); err != nil {
return err
}
if err := plan.ConnectAllNodes(ctx); err != nil {
return err
}
return <-plan.Finished(ctx)
}
func newPusher(ctx context.Context, plan *Plan) (*sim.Actor, error) {
opt := func(cfg *sim.Config) {
// pusher doesn't accept datasets
cfg.QriConfig.Remote.Enabled = false
cfg.EventHandlers = map[event.Topic]func(interface{}){
event.ETP2PQriPeerConnected: func(payload interface{}) {
if pro, ok := payload.(*profile.Profile); ok {
plan.Runenv.RecordMessage("qri peer connected! %#v", pro)
// TODO (b5) - attempt to publish to peer just connected to
}
},
event.ETP2PPeerConnected: func(payload interface{}) {
plan.Runenv.RecordMessage("peer connected")
plan.ActorFinished(ctx)
},
}
}
act, err := sim.NewActor(ctx, plan.Runenv, opt)
if err != nil {
return nil, err
}
if err := act.GenerateDatasetVersion("megajoules", 1000); err != nil {
return nil, err
}
if err := act.Inst.Connect(ctx); err != nil {
return nil, err
}
// notifee := &net.NotifyBundle{
// ConnectedF: func(_ net.Network, conn net.Conn) {
// plan.Runenv.RecordMessage("peer connected in notifee! %#v", conn)
// plan.ActorFinished(ctx)
// },
// }
// act.Inst.Node().Host().Network().Notify(notifee)
plan.Runenv.RecordMessage("I'm a Pusher named %s", act.Peername())
return act, err
}
func newReceiver(ctx context.Context, plan *Plan) (*sim.Actor, error) {
opt := func(cfg *sim.Config) {
cfg.EventHandlers = map[event.Topic]func(interface{}){
event.ETP2PQriPeerConnected: func(payload interface{}) {
if pro, ok := payload.(*profile.Profile); ok {
plan.Runenv.RecordMessage("qri peer connected! %#v", pro)
// TODO (b5) - attempt to publish to peer just connected to
}
},
event.ETP2PPeerConnected: func(payload interface{}) {
plan.Runenv.RecordMessage("peer connected")
plan.ActorFinished(ctx)
},
}
}
act, err := sim.NewActor(ctx, plan.Runenv, opt)
if err != nil {
return nil, err
}
if err := act.Inst.Connect(ctx); err != nil {
return nil, err
}
plan.Runenv.RecordMessage("I'm a Receiver named %s", act.Peername())
return act, err
}