-
Notifications
You must be signed in to change notification settings - Fork 66
/
datasets.go
101 lines (80 loc) · 2.28 KB
/
datasets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package p2p
// TODO (ramfox): relies on old `depQriProtocolID`
// Should have its own protocol & protobuf & not rely on the Message struct
import (
"context"
"encoding/json"
"fmt"
"github.com/qri-io/qri/base"
reporef "github.com/qri-io/qri/repo/ref"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// MtDatasets is a dataset list message
const MtDatasets = MsgType("list_datasets")
// listMax is the highest number of entries a list request should return
const listMax = 30
// DatasetsListParams encapsulates options for requesting datasets
type DatasetsListParams struct {
Term string
Limit int
Offset int
}
// RequestDatasetsList gets a list of a peer's datasets
func (n *QriNode) RequestDatasetsList(ctx context.Context, pid peer.ID, p DatasetsListParams) ([]reporef.DatasetRef, error) {
log.Debugf("%s RequestDatasetList: %s", n.ID, pid)
if pid == n.ID {
// requesting self isn't a network operation
return n.Repo.References(p.Offset, p.Limit)
}
if !n.Online {
return nil, fmt.Errorf("not connected to p2p network")
}
req, err := NewJSONBodyMessage(n.ID, MtDatasets, p)
if err != nil {
log.Debug(err.Error())
return nil, err
}
req = req.WithHeaders("phase", "request")
s, err := n.host.NewStream(ctx, pid, depQriProtocolID)
if err != nil {
return nil, fmt.Errorf("error opening stream: %s", err.Error())
}
defer s.Close()
ws := WrapStream(s)
if err := ws.sendMessage(req); err != nil {
return nil, err
}
res, err := ws.receiveMessage()
if err != nil {
return nil, err
}
ref := []reporef.DatasetRef{}
err = json.Unmarshal(res.Body, &ref)
return ref, err
}
func (n *QriNode) handleDatasetsList(ws *WrappedStream, msg Message) (hangup bool) {
hangup = true
switch msg.Header("phase") {
case "request":
dlp := DatasetsListParams{}
if err := json.Unmarshal(msg.Body, &dlp); err != nil {
log.Debugf("%s %s", n.ID, err.Error())
return
}
if dlp.Limit == 0 || dlp.Limit > listMax {
dlp.Limit = listMax
}
refs, err := base.ListDatasets(context.TODO(), n.Repo, dlp.Term, "", dlp.Offset, dlp.Limit, true, false)
if err != nil {
log.Error(err)
return
}
reply, err := msg.UpdateJSON(refs)
reply = reply.WithHeaders("phase", "response")
if err := ws.sendMessage(reply); err != nil {
log.Debug(err.Error())
return
}
}
return
}