Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impi #2

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/linkall-labs/sdk

go 1.18

replace (
github.com/linkall-labs/sdk/golang => ./golang
github.com/linkall-labs/sdk/proto => ./proto
)
Empty file added go.sum
Empty file.
15 changes: 0 additions & 15 deletions go/go.mod

This file was deleted.

6 changes: 3 additions & 3 deletions go/api.go → golang/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package vanus
package golang

import (
"github.com/cloudevents/sdk-go/v2"
v2 "github.com/cloudevents/sdk-go/v2"
)

type Client interface {
Send(eventbusName string, events ...*v2.Event) error
Subscribe(subscriptionID uint64) (<-chan Message, error)
Subscribe(subscriptionID string) (<-chan Message, error)
Close() error
}

Expand Down
182 changes: 182 additions & 0 deletions golang/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package golang

import (
"context"
"sync"

v2 "github.com/cloudevents/sdk-go/v2"
vanuspb "github.com/linkall-labs/sdk/proto/pkg/vanus"
"github.com/linkall-labs/vanus/proto/pkg/cloudevents"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Config struct {
Endpoint string
}

type streamState string

var (
stateRunning streamState = "running"
stateClosed streamState = "closed"
)

type streamCache struct {
subscribeStream vanuspb.Client_SubscribeClient
ackStream vanuspb.Client_AckClient
messagec chan Message
state streamState
}

func newStreamCache(
subscribeStream vanuspb.Client_SubscribeClient, ackStream vanuspb.Client_AckClient, ch chan Message,
) *streamCache {
return &streamCache{
subscribeStream: subscribeStream,
ackStream: ackStream,
messagec: ch,
state: stateRunning,
}
}

func (sc *streamCache) release() {
sc.subscribeStream.CloseSend()
sc.subscribeStream = nil
sc.ackStream.CloseSend()
sc.ackStream = nil
close(sc.messagec)
sc.state = stateClosed
}

type client struct {
Endpoint string
proxy vanuspb.ClientClient
streamCache sync.Map
mu sync.Mutex
}

func New(cfg *Config) Client {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(cfg.Endpoint, opts...)
if err != nil {
return nil
}
return &client{
Endpoint: cfg.Endpoint,
proxy: vanuspb.NewClientClient(conn),
}
}

func (c *client) Send(eventbusName string, events ...*v2.Event) error {
eventpb, err := ToProto(events[0])
if err != nil {
return err
}
in := &vanuspb.PublishRequest{
EventbusName: eventbusName,
Events: &cloudevents.CloudEventBatch{
Events: []*cloudevents.CloudEvent{eventpb},
},
}
_, err = c.proxy.Publish(context.Background(), in)
if err != nil {
return err
}
return nil
}

func (c *client) Subscribe(subscriptionID string) (<-chan Message, error) {
c.mu.Lock()
defer c.mu.Unlock()

value, ok := c.streamCache.Load(subscriptionID)
if ok && value.(*streamCache).state == stateRunning {
return value.(*streamCache).messagec, nil
}

in := &vanuspb.SubscribeRequest{
SubscriptionId: subscriptionID,
}
stream, err := c.proxy.Subscribe(context.Background(), in)
if err != nil {
return nil, err
}

ackStream, err := c.proxy.Ack(context.Background())
if err != nil {
stream.CloseSend()
return nil, err
}

messageC := make(chan Message, 32)
cache := newStreamCache(stream, ackStream, messageC)
c.streamCache.Store(subscriptionID, cache)

go func(cache *streamCache) {
for {
resp, err := cache.subscribeStream.Recv()
if err != nil {
cache.release()
return
}
ackFunc := func(result bool) error {
req := &vanuspb.AckRequest{
SequenceId: resp.SequenceId,
SubscriptionId: subscriptionID,
Success: result,
}
err = cache.ackStream.Send(req)
if err != nil {
cache.release()
return err
}
return nil
}
if batch := resp.GetEvents(); batch != nil {
if eventpbs := batch.GetEvents(); len(eventpbs) > 0 {
for _, eventpb := range eventpbs {
event, err2 := FromProto(eventpb)
if err2 != nil {
// TODO(jiangkai): check err
continue
}
cache.messagec <- newMessage(ackFunc, event)
}
}
}
}
}(cache)
return messageC, nil
}

func (c *client) Close() error {
return nil
}

type ackCallback func(result bool) error

type message struct {
event *v2.Event
ack ackCallback
}

func newMessage(cb ackCallback, e *v2.Event) Message {
return &message{
event: e,
ack: cb,
}
}

func (m *message) GetEvent() *v2.Event {
return m.event
}

func (m *message) Success() error {
return m.ack(true)
}

func (m *message) Failed(err error) error {
return m.ack(false)
}
28 changes: 28 additions & 0 deletions golang/example/publish/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"fmt"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
gocli "github.com/linkall-labs/sdk/golang"
)

func main() {
cfg := gocli.Config{
Endpoint: "172.17.0.2:30001",
}
c := gocli.New(&cfg)

event := v2.NewEvent()
event.SetID(uuid.New().String())
event.SetSource("event-source")
event.SetType("event-type")
event.SetData(v2.ApplicationJSON, map[string]string{"hello": "world"})
err := c.Send("quick-start", &event)
if err != nil {
fmt.Printf("send event failed, err: %s\n", err.Error())
return
}
fmt.Printf("send event success\n")
}
33 changes: 33 additions & 0 deletions golang/example/subscribe/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"fmt"

gocli "github.com/linkall-labs/sdk/golang"
)

func main() {
cfg := gocli.Config{
Endpoint: "172.17.0.2:30001",
}
c := gocli.New(&cfg)

ctx := context.Background()

messagec, err := c.Subscribe("000018FD28000011")
if err != nil {
fmt.Printf("subscribe failed, err: %s\n", err.Error())
return
}
for {
select {
case msg := <-messagec:
fmt.Printf("received a message, event: %s\n", msg.GetEvent().String())
case <-ctx.Done():
fmt.Println("ctx exit")
return
}
}
fmt.Printf("send event success\n")
}
28 changes: 28 additions & 0 deletions golang/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module github.com/linkall-labs/sdk/golang

go 1.18

replace github.com/linkall-labs/sdk/proto => ../proto

require (
github.com/cloudevents/sdk-go/v2 v2.12.0
github.com/google/uuid v1.3.0
github.com/linkall-labs/sdk/proto v0.0.0-00010101000000-000000000000
github.com/linkall-labs/vanus/proto v0.0.0-20230105121006-1395ee889925
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
)

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/text v0.4.0 // indirect
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c // indirect
)
28 changes: 24 additions & 4 deletions go/go.sum → golang/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ github.com/cloudevents/sdk-go/v2 v2.12.0/go.mod h1:xDmKfzNjM8gBvjaF8ijFjM1VYOVUE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/linkall-labs/vanus/proto v0.0.0-20230105121006-1395ee889925 h1:yUhEKfZiwQIfLuQXHSlZo8bXY523m8PrZux5q+rDOS8=
github.com/linkall-labs/vanus/proto v0.0.0-20230105121006-1395ee889925/go.mod h1:QhoJhDCnwHIVTrkGXtaRyyGenuNt7a7tx8MLdUSCGok=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
Expand All @@ -26,6 +32,20 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b h1:tvrvnPFcdzp294diPnrdZZZ8XUt2Tyj7svb7X52iDuU=
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c h1:QgY/XxIAIeccR+Ca/rDdKubLIU9rcJ3xfy1DC/Wd2Oo=
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c/go.mod h1:CGI5F/G+E5bKwmfYo09AXuVN4dD894kIKUFmVbP2/Fo=
google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Binary file added golang/main
Binary file not shown.
Loading