Skip to content

Commit

Permalink
chore: use waku-org/waku-proto repository for protobuffer definitions (
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 7, 2023
1 parent 3226def commit 150ade6
Show file tree
Hide file tree
Showing 93 changed files with 2,580 additions and 2,482 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[submodule "libs/waku-rln-contract"]
path = libs/waku-rln-contract
url = https://github.com/waku-org/waku-rln-contract.git
[submodule "waku/v2/protocol/waku-proto"]
path = waku/v2/protocol/waku-proto
url = git@github.com:waku-org/waku-proto.git
3 changes: 2 additions & 1 deletion cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
Expand Down Expand Up @@ -241,7 +242,7 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
ind := 0
for _, entry := range result.Errors() {
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
s.log.Error("can't unsubscribe", logging.HostID("peer", entry.PeerID), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope {
&pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: contentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
0,
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/server/rest/lightpush_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestLightpushMessagev1(t *testing.T) {
Message: &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
},
}
Expand Down
17 changes: 7 additions & 10 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {
Expand All @@ -38,7 +39,6 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -70,16 +70,16 @@ func TestRelaySubscription(t *testing.T) {
require.Equal(t, "true", rr.Body.String())

// Test max messages in subscription
now := utils.GetUnixEpoch()
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+1)), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+2)), relay.WithPubSubTopic("test"))
require.NoError(t, err)

_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test"))
tests.CreateWakuMessage("test", proto.Int64(now+3)), relay.WithPubSubTopic("test"))
require.NoError(t, err)

// Wait for the messages to be processed
Expand Down Expand Up @@ -130,7 +130,6 @@ func TestRelayGetV1Messages(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -168,7 +167,6 @@ func TestPostAutoV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
Expand Down Expand Up @@ -201,9 +199,9 @@ func TestRelayAutoSubUnsub(t *testing.T) {
require.Equal(t, "true", rr.Body.String())

// Test publishing messages after subscription
now := utils.GetUnixEpoch()
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage(cTopic1, now+1))
tests.CreateWakuMessage(cTopic1, proto.Int64(now+1)))
require.NoError(t, err)

// Wait for the messages to be processed
Expand Down Expand Up @@ -267,7 +265,6 @@ func TestRelayGetV1AutoMessages(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: cTopic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
Expand Down
12 changes: 7 additions & 5 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type HistoryCursor struct {
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"`
Version int32 `json:"version"`
Version uint32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Meta []byte `json:"meta"`
}
Expand Down Expand Up @@ -83,18 +83,20 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store

startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
query.StartTime, err = strconv.ParseInt(startTimeStr, 10, 64)
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.StartTime = &startTime
}

endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
query.EndTime, err = strconv.ParseInt(endTimeStr, 10, 64)
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
query.EndTime = &endTime
}

var cursor *pb.Index
Expand Down Expand Up @@ -178,8 +180,8 @@ func toStoreResponse(result *store.Result) StoreResponse {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: int32(m.Version),
Timestamp: m.Timestamp,
Version: m.GetVersion(),
Timestamp: m.GetTimestamp(),
Meta: m.Meta,
})
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/waku/server/rest/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)

func TestGetMessages(t *testing.T) {
Expand All @@ -32,14 +33,14 @@ func TestGetMessages(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"

now := utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, now+1)
msg2 := tests.CreateWakuMessage(topic1, now+2)
msg3 := tests.CreateWakuMessage(topic1, now+3)
now := *utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, proto.Int64(now+1))
msg2 := tests.CreateWakuMessage(topic1, proto.Int64(now+2))
msg3 := tests.CreateWakuMessage(topic1, proto.Int64(now+3))

node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubsubTopic1))

n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)
Expand Down
6 changes: 4 additions & 2 deletions cmd/waku/server/rpc/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ func TestV1Peers(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
bcast := relay.NewBroadcaster(10)
relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
Expand Down
8 changes: 7 additions & 1 deletion cmd/waku/server/rpc/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs,
}

for i := range f.messages[args.ContentTopic] {
*reply = append(*reply, ProtoToRPC(f.messages[args.ContentTopic][i]))
msg := f.messages[args.ContentTopic][i]
rpcMsg, err := ProtoToRPC(msg)
if err != nil {
f.log.Warn("could not include message in response", zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
}

f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0)
Expand Down
25 changes: 18 additions & 7 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand Down Expand Up @@ -69,7 +70,10 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
topic = args.Topic
}

msg := args.Message.toProto()
msg, err := args.Message.toProto()
if err != nil {
return err
}

if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
Expand Down Expand Up @@ -117,10 +121,9 @@ func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsA

// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
var err error
msg := args.Message.toProto()
if msg == nil {
err := fmt.Errorf("invalid message format received")
msg, err := args.Message.toProto()
if err != nil {
err = fmt.Errorf("invalid message format received: %w", err)
r.log.Error("publishing message", zap.Error(err))
return err
}
Expand Down Expand Up @@ -148,7 +151,12 @@ func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, rep
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
rpcMsg, err := ProtoToRPC(msg.Message())
if err != nil {
r.log.Warn("could not include message in response", logging.HexString("hash", msg.Hash()), zap.Error(err))
} else {
*reply = append(*reply, rpcMsg)
}
default:
break
}
Expand Down Expand Up @@ -200,7 +208,10 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *
}
select {
case msg := <-sub.Ch:
*reply = append(*reply, ProtoToRPC(msg.Message()))
m, err := ProtoToRPC(msg.Message())
if err == nil {
*reply = append(*reply, m)
}
default:
break
}
Expand Down
32 changes: 24 additions & 8 deletions cmd/waku/server/rpc/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ func TestPostV1Message(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}

err := d.PostV1Message(
rpcWakuMsg, err := ProtoToRPC(msg)
require.NoError(t, err)

err = d.PostV1Message(
makeRequest(t),
&RelayMessageArgs{
Message: ProtoToRPC(msg),
Message: rpcWakuMsg,
},
&reply,
)
Expand Down Expand Up @@ -95,6 +97,17 @@ func TestRelayGetV1Messages(t *testing.T) {
time.Sleep(1 * time.Second)

args := &TopicsArgs{Topics: []string{"test"}}

// Subscribe A to topic
err = serviceA.PostV1Subscription(
makeRequest(t),
args,
&reply,
)
require.NoError(t, err)
require.True(t, reply)

// Subscribe B to topic
err = serviceB.PostV1Subscription(
makeRequest(t),
args,
Expand All @@ -106,14 +119,17 @@ func TestRelayGetV1Messages(t *testing.T) {
// Wait for the subscription to be started
time.Sleep(1 * time.Second)

rpcWakuMsg, err := ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
ContentTopic: "test",
})
require.NoError(t, err)

err = serviceA.PostV1Message(
makeRequest(t),
&RelayMessageArgs{
Topic: "test",
Message: ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
ContentTopic: "testContentTopic",
}),
Topic: "test",
Message: rpcWakuMsg,
},
&reply,
)
Expand Down
10 changes: 7 additions & 3 deletions cmd/waku/server/rpc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type StorePagingOptions struct {
type StoreMessagesArgs struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []string `json:"contentFilters,omitempty"`
StartTime int64 `json:"startTime,omitempty"`
EndTime int64 `json:"endTime,omitempty"`
StartTime *int64 `json:"startTime,omitempty"`
EndTime *int64 `json:"endTime,omitempty"`
PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"`
}

Expand Down Expand Up @@ -63,7 +63,11 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,

reply.Messages = make([]*RPCWakuMessage, len(res.Messages))
for i := range res.Messages {
reply.Messages[i] = ProtoToRPC(res.Messages[i])
msg, err := ProtoToRPC(res.Messages[i])
if err != nil {
return err
}
reply.Messages[i] = msg
}

reply.PagingInfo = StorePagingOptions{
Expand Down
6 changes: 4 additions & 2 deletions cmd/waku/server/rpc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ func makeRequest(t *testing.T) *http.Request {
func TestBase64Encoding(t *testing.T) {
input := "Hello World"

rpcMsg := ProtoToRPC(&pb.WakuMessage{
Payload: []byte(input),
rpcMsg, err := ProtoToRPC(&pb.WakuMessage{
Payload: []byte(input),
ContentTopic: "test",
})
require.NoError(t, err)

jsonBytes, err := json.Marshal(rpcMsg)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 150ade6

Please sign in to comment.