Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of OpenSergo Go SDK #2

Merged
merged 10 commits into from
Nov 26, 2022
Merged

Conversation

jnan806
Copy link
Collaborator

@jnan806 jnan806 commented Sep 30, 2022

初版 opensergo-go-sdk, 可配合 opensergo-control-plane 进行联调.

联调步骤:
详细可参考 README.md 或 main.go


OpenSergo Go SDK

How to use

scene 1 : subscribe config-data

package main

func main() {
    // add console logger (optional)
    //logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
    // add file logger (optional)
    //logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
    
    // instant OpenSergoClient
    openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
    
    // register SubscribeInfo of FaultToleranceRule
    // 1. instant SubscribeKey
    faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
    // 2. construct SubscribeInfo
    faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
    // 3. register
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // start OpensergoClient
    openSergoClient.Start()
    
    // register after OpenSergoClient started
    // register SubscribeInfo of RateLimitStrategy
    rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
    rateLimitSubscribeInfo := client.NewSubscribeInfo(*rateLimitSubscribeKey)
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)

    select {}
}

scene 2 : subscribe config-data with custom-logic when config-data changed.

Add a subscriber by implements the function in subscribe.Subscriber.
There are some samples in sample directory : sample/main/sample_faulttolerance_rule_subscriber.go and sample_ratelimit_strategy_subscriber.go

    type SampleFaultToleranceRuleSubscriber struct {
    }
    
    func (sampleFaultToleranceRuleSubscriber SampleFaultToleranceRuleSubscriber) OnSubscribeDataUpdate(subscribeKey subscribe.SubscribeKey, dataSlice []protoreflect.ProtoMessage) bool {
        // TODO add custom-logic when config-data change
        // ......
        return true
    }

And then register it to subscriber.SubscriberRegistry.
Demo-code is in sample/main directory

package main

func main() {
    // add console logger (optional)
    //logging.NewConsoleLogger(logging.InfoLevel, logging.SeparateFormat)
    // add file logger (optional)
    //logging.NewFileLogger("/logs/opensergo/opensergo-universal-transport-service.log", logging.InfoLevel, logging.JsonFormat)
    
    // instant OpenSergoClient
    openSergoClient := client.NewOpenSergoClient("33.1.33.1",10246)
    
    // registry SubscribeInfo of FaultToleranceRule
    // 1. instant SubscribeKey
    faultToleranceSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefFaultToleranceRule{})
    // 2. instant Subscriber
    sampleFaultToleranceRuleSubscriber := new(SampleFaultToleranceRuleSubscriber)
    // 3. construct SubscribeInfo
    faultToleranceSubscribeInfo := client.NewSubscribeInfo(faultToleranceSubscribeKey)
    faultToleranceSubscribeInfo.AppendSubscriber(sampleFaultToleranceRuleSubscriber)
    // 4. register
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // register SubscribeInfo of RateLimitStrategy
    rateLimitSubscribeKey := subscribe.NewSubscribeKey("default", "foo-app", configkind.ConfigKindRefRateLimitStrategy{})
    sampleRateLimitStrategySubscriber := new(SampleRateLimitStrategySubscriber)
    rateLimitSubscribeInfo := client.NewSubscribeInfo(rateLimitSubscribeKey)
    rateLimitSubscribeInfo.AppendSubscriber(sampleRateLimitStrategySubscriber)
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
    
    // start OpensergoClient
    openSergoClient.Start()
    
    // register after OpenSergoClient started
    faultToleranceSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
    openSergoClient.RegisterSubscribeInfo(faultToleranceSubscribeInfo)
    
    // register after OpenSergoClient started
    rateLimitSubscribeInfo.AppendSubscriber(new(subscribe.DefaultSubscriber))
    openSergoClient.RegisterSubscribeInfo(rateLimitSubscribeInfo)
    
    select {}
}

@jnan806 jnan806 force-pushed the initial-version branch 2 times, most recently from 914c1c4 to 6accd10 Compare September 30, 2022 14:41
@sczyh30 sczyh30 added the kind/feature Category issues or PRs related to feature request label Oct 1, 2022
@sczyh30
Copy link
Member

sczyh30 commented Oct 9, 2022

Could you please reformat your code with goimports?

@jnan806
Copy link
Collaborator Author

jnan806 commented Oct 9, 2022

Could you please reformat your code with goimports?

fixed ! thanks for your review ~

@jnan806 jnan806 requested review from sczyh30 and removed request for robberphex October 9, 2022 15:55
@jnan806 jnan806 force-pushed the initial-version branch 10 times, most recently from 4027c1a to c312329 Compare October 10, 2022 12:14
@sczyh30 sczyh30 changed the title initial-version Initial version of OpenSergo Go SDK Oct 11, 2022
@jnan806 jnan806 force-pushed the initial-version branch 4 times, most recently from b4da979 to fa863b7 Compare October 13, 2022 01:21
@iamharvey
Copy link

是否可以对项目结构的设计原则做一下说明?

项目结构:

  • api: proto 文件及存根
  • cmd: 如果有cli工具(通过go install可以安装的)
  • docs: 文档
  • internal: 不想暴露给SDK用户的代码

对于可暴露的代码,建议取消pkg,直接暴露出来,比如
client、transport ...

pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
@sczyh30
Copy link
Member

sczyh30 commented Nov 21, 2022

==================
WARNING: DATA RACE
Read at 0x00c00008c6f0 by goroutine 17:
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).SubscribeConfig()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:161 +0x8b
  main.main.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:72 +0x144

Previous write at 0x00c00008c6f0 by goroutine 14:
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:91 +0x275
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).observeReceive()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:100 +0x3d1
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start.func1()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0x39

Goroutine 17 (running) created at:
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:68 +0x3d6

Goroutine 14 (finished) created at:
  github.com/opensergo/opensergo-go/pkg/client.(*SubscribeConfigStreamObserver).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client_observer.go:54 +0xe4
  github.com/opensergo/opensergo-go/pkg/client.(*OpenSergoClient).Start()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/pkg/client/opensergo_client.go:137 +0x3b5
  main.main()
      /Users/sczyh30/temp/to-review/opensergo-go-sdk-jnan806/samples/main/main.go:56 +0x2a5
==================

pkg/client/opensergo_client.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client.go Outdated Show resolved Hide resolved
pkg/client/opensergo_client_observer.go Outdated Show resolved Hide resolved
@sczyh30
Copy link
Member

sczyh30 commented Nov 21, 2022

As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of subscribeConfigStream of OpenSergoClient.

How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.

pkg/transport/subscribe/subscriber_registry.go Outdated Show resolved Hide resolved
pkg/transport/subscribe/subscriber_registry.go Outdated Show resolved Hide resolved
Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
@jnan806 jnan806 force-pushed the initial-version branch 2 times, most recently from 601764a to 88c3aa7 Compare November 22, 2022 04:04
@jnan806
Copy link
Collaborator Author

jnan806 commented Nov 22, 2022

As discussed and debugged with @jnan806, these weird race conditions were mainly caused by non-thread-safe access of subscribeConfigStream of OpenSergoClient.

How to reproduce it: when a client has established a connection to the control plane, we restart the control plane; in the meantime we initiate Subscribe or Unsubscribe command concurrently.

Yeah, and has resolve concurrent-safety with several atomic.Value, and replacing recursion by for-loop or for-select.

Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
Copy link
Member

@sczyh30 sczyh30 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@sczyh30
Copy link
Member

sczyh30 commented Nov 26, 2022

Nice work. Since this is a large PR containing the initial implementation of the SDK, I'll create a merge commit for this PR.

@sczyh30 sczyh30 merged commit 3e0f44b into opensergo:main Nov 26, 2022
@sczyh30
Copy link
Member

sczyh30 commented Nov 26, 2022

Fabulous! Thanks for contributing!

PS: We may improve the SDK implementation later :)

@jnan806
Copy link
Collaborator Author

jnan806 commented Nov 26, 2022

Fabulous! Thanks for contributing!

PS: We may improve the SDK implementation later :)

It's my pleasure ~ And I will keep following and improving it. 😃

@jnan806 jnan806 deleted the initial-version branch November 26, 2022 14:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Category issues or PRs related to feature request
Projects
None yet
5 participants