Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix go example for pulsar-examples #30

Merged
merged 8 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 77 additions & 14 deletions cloud/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,61 +25,110 @@ The following clients and Pulsar CLI tools are supported to connect to cluster t
- pulsar-client
- pulsar-perf

## How to get token options
Before starting, we assume that the `service account`, `pulsar instance` and `pulsar cluster` have been created in the current environment, and their `names` and `namespace` as follows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add link of how these resources are created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is currently no link, please add an issue to add it later.


When you use Token to connect to Pulsar cluster, you need to provide the following options:
/ | name | namespace
---|---|---
service account | test-service-account-name | test-service-account-namespace
pulsar instance | test-pulsar-instance-name | test-pulsar-instance-namespace
pulsar cluster | test-pulsar-cluster-name | test-pulsar-cluster-namespace

After the above resources are created, you can get the expected output through the following command.

## How to get service URL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest we add a full pre-condition, e.g. created a instance like instance-Name; created cluster like cluster-name; create ns like ns-name ....
so all the shell command will contain a command as current, and also a real command with above pre-condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiazhai done PTAL again, thanks


- `SERVICE_URL`
- `WEB_SERVICE_URL`
- `AUTH_PARAMS`

For the `SERVICE_URL` field, you can get the **hostname** through the following command:

```shell script
$ snctl get pulsarclusters [CLUSTER_NAME] -n [NAMESPACE] -o json | jq '.spec.serviceEndpoints[0].dnsName'
$ snctl get pulsarclusters [PULSAR_CLUSTER_NAME] -n [PULSAT_CLUSTER_NAMESPACE] -o json | jq '.spec.serviceEndpoints[0].dnsName'

# e.g:
$ snctl pulsarclusters get test-pulsar-cluster-name -n test-pulsar-cluster-namespace -o json | jq '.spec.serviceEndpoints[0].dnsName'
```

Output:

```text
api.test.cloud.xxx.streamnative.dev
cloud.streamnative.dev
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the pre-condition, it still not describe where cloud.streamnative.dev comes from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cloud.streamnative.dev is obtained by the following command:

snctl pulsarclusters get test-pulsar-cluster-name -n test-pulsar-cluster-namespace -o json | jq '.spec.serviceEndpoints[0].dnsName'

```

A `SERVICE_URL` is a combination of protocol, hostname and port, so an example of a complete `SERVICE_URL` is as follows:


```text
pulsar://api.test.cloud.xxx.streamnative.dev:6650
pulsar://cloud.streamnative.dev:6650

# For tls
pulsar+ssl://api.test.cloud.xxx.streamnative.dev:6651
pulsar+ssl://cloud.streamnative.dev:6651
```

For the `WEB_SERVICE_URL` field, you can get the **hostname** through the following command:

```shell script
$ snctl get pulsarclusters [CLUSTER_NAME] -n [NAMESPACE] -o json | jq '.spec.serviceEndpoints[0].dnsName'
$ snctl get pulsarclusters [PULSAR_CLUSTER_NAME] -n [PULSAT_CLUSTER_NAMESPACE] -o json | jq '.spec.serviceEndpoints[0].dnsName'

Flags:
-h, --help help for get-token
-f, --key-file string Path to the private key file
--login Use an interactive login
--skip-open if the web browser should not be opened automatically

# e.g:
$ snctl get pulsarclusters test-pulsar-cluster-name -n test-pulsar-cluster-namespace -o json | jq '.spec.serviceEndpoints[0].dnsName'
```

Output:

```text
api.test.cloud.xxx.streamnative.dev
cloud.streamnative.dev
```

A `WEB_SERVICE_URL` is a combination of protocol, hostname and port, so an example of a complete `WEB_SERVICE_URL` is as follows:

```text
http://api.test.cloud.xxx.streamnative.dev:8080
http://cloud.streamnative.dev:8080

# For tls
https://api.test.cloud.xxx.streamnative.dev:8443
https://cloud.streamnative.dev:443
```

## How to get token options

When you use Token to connect to Pulsar cluster, you need to provide the following options:

- `AUTH_PARAMS`

For the `AUTH_PARAMS` field, you can get it through the following command:

```shell script
$ snctl auth get-token [INSTANCE] [flags]
$ snctl auth get-token [PULSAR_INSTANCE_NAME] -n [PULSAR_INSTANCE_NAMESPACE] [flags]

Flags:
-h, --help help for get-token
-f, --key-file string Path to the private key file
--login Use an interactive login
--skip-open if the web browser should not be opened automatically

# e.g:
$ snctl auth get-token test-pulsar-instance-name -n test-pulsar-instance-namespace --login
```

Output:

```text
We've launched your web browser to complete the login process.
Verification code: ABCD-EFGH

Waiting for login to complete...
Logged in as cloud@streamnative.io.
Welcome to Apache Pulsar!

Use the following access token to access Pulsar instance '[PULSAR_INSTANCE_NAMESPACE]/[PULSAR_INSTANCE_NAME]':

abcdefghijklmnopqrstuiwxyz0123456789
```

> Tips: In code implementation, for safety and convenience, you can consider setting `AUTH_PARAMS` as an environment variable.
Expand All @@ -99,12 +148,15 @@ For the OAuth2 `type` field, currently you only support `client_credentials`. So
For the `privateKey` field, you need to get the path of a private key data file. The following example shows how to get this file:

```shell script
$ snctl auth export-service-account [NAME] [flags]
$ snctl auth export-service-account [SERVICE_ACCOUNT_NAME] -n [SERVICE_ACCOUNT_NAMESPACE] [flags]

Flags:
-h, --help help for export-service-account
-f, --key-file string Path to the private key file.
--no-wait Skip waiting for service account readiness.

#e.g:
$ snctl auth export-service-account test-service-account-name -n test-service-account-namespace -f [/path/to/key/file.txt]
```

Output:
Expand All @@ -125,4 +177,15 @@ For the `clientId` and `issuerUrl` fields, you can get the corresponding value f
}
```

For the `audience` field, is the address of the accessed service.
For the `audience` field, it is the combination of the name and namespace of pulsar instance and `urn:sn:pulsar`, the example as follows:

```text
urn:sn:pulsar:test-pulsar-instance-namespace:test-pulsar-instance-name
```

You can get all the pulsar instances in the current environment through the following command:

```shell script
$ snctl pulsarinstances get -A
```

61 changes: 35 additions & 26 deletions cloud/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ Since the `go mod` package management tool is used in this project, **Go 1.11 or

- Go 1.11+
- pulsar-client-go 0.1.1+(not include 0.1.1)
- Get the `serviceURL` of your StreamNative Cloud Pulsar cluster: [How to get service URL](#How to get service URL)
- Get the OAuth2 `privateKeyFile` of a service account to access your StreamNative Cloud Pulsar cluster: [How to get OAuth2 options](#How to get OAuth2 options)
- Get the `audience` of the your StreamNative Cloud Pulsar cluster: [How to get OAuth2 options](#How to get OAuth2 options)
- Get the `issuerUrl` of the your StreamNative Cloud Pulsar cluster: [How to get OAuth2 options](#How to get OAuth2 options)
- Get the `clientId` of the your StreamNative Cloud Pulsar cluster: [How to get OAuth2 options](#How to get OAuth2 options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clientId is already contained in "privateKeyFile"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced with @zymap, this is special in go for support different type


# Example

Expand All @@ -23,46 +28,50 @@ The consumer receives the message from the `topic-1` and `acknowledges` each rec

```bash
$ go build -o consumer sampleConsumer.go
$ ./consumer
$ ./consumer -serviceURL pulsar+ssl://cloud.streamnative.dev:6651 \
-privateKeyFile /path/to/private/key/file.txt\
-audience urn:sn:pulsar:pulsar-instance-ns:pulsar-instance-name\
-issuerUrl https://cloud.streamnative.dev\
-clientId abcdefghigk0123456789
```

Output:

```text
time="2020-08-03T09:11:16+08:00" level=info msg="Created consumer" name=yulhd subscription=my-sub topic="persistent://public/default/topic-1"
Received message msgId: {27 0 0 0 <nil> 0xc0002ce000 {13817596638641025024 7933499488 0x4cc9f80}} -- content: 'hello-0'
Received message msgId: {27 1 0 0 <nil> 0xc0002ce000 {13817596638653740024 7946215066 0x4cc9f80}} -- content: 'hello-1'
Received message msgId: {27 2 0 0 <nil> 0xc0002ce000 {13817596638653748024 7946222530 0x4cc9f80}} -- content: 'hello-2'
Received message msgId: {27 3 0 0 <nil> 0xc0002ce000 {13817596638653750024 7946224792 0x4cc9f80}} -- content: 'hello-3'
Received message msgId: {27 4 0 0 <nil> 0xc0002ce000 {13817596638653752024 7946226367 0x4cc9f80}} -- content: 'hello-4'
Received message msgId: {27 5 0 0 <nil> 0xc0002ce000 {13817596638666750024 7959224273 0x4cc9f80}} -- content: 'hello-5'
Received message msgId: {27 6 0 0 <nil> 0xc0002ce000 {13817596638666760024 7959233979 0x4cc9f80}} -- content: 'hello-6'
Received message msgId: {27 7 0 0 <nil> 0xc0002ce000 {13817596638666764024 7959238686 0x4cc9f80}} -- content: 'hello-7'
Received message msgId: {27 8 0 0 <nil> 0xc0002ce000 {13817596638666766024 7959240510 0x4cc9f80}} -- content: 'hello-8'
Received message msgId: {27 9 0 0 <nil> 0xc0002ce000 {13817596638677280024 7969754185 0x4cc9f80}} -- content: 'hello-9'
time="2020-08-03T09:11:24+08:00" level=info msg="The consumer[1] successfully unsubscribed" name=yulhd subscription=my-sub topic="persistent://public/default/topic-1"
Received message msgId: {{10 17 0 0} <nil> 0xc0000e0160 {13817980335716751128 17136978 0x4cf4080}} -- content: 'hello-7'
Received message msgId: {{10 18 0 0} <nil> 0xc0000e0160 {13817980335716772128 17157780 0x4cf4080}} -- content: 'hello-8'
Received message msgId: {{10 19 0 0} <nil> 0xc0000e0160 {13817980335716774128 17160202 0x4cf4080}} -- content: 'hello-9'
Received message msgId: {{10 20 0 0} <nil> 0xc0000e0160 {13817980335716776128 17162019 0x4cf4080}} -- content: 'hello-0'
Received message msgId: {{10 21 0 0} <nil> 0xc0000e0160 {13817980335716780128 17165615 0x4cf4080}} -- content: 'hello-1'
Received message msgId: {{10 22 0 0} <nil> 0xc0000e0160 {13817980335716781128 17167300 0x4cf4080}} -- content: 'hello-2'
Received message msgId: {{10 23 0 0} <nil> 0xc0000e0160 {13817980335716783128 17169197 0x4cf4080}} -- content: 'hello-3'
Received message msgId: {{10 24 0 0} <nil> 0xc0000e0160 {13817980335716788128 17174514 0x4cf4080}} -- content: 'hello-4'
Received message msgId: {{10 25 0 0} <nil> 0xc0000e0160 {13817980335716790128 17176145 0x4cf4080}} -- content: 'hello-5'
Received message msgId: {{10 26 0 0} <nil> 0xc0000e0160 {13817980335716792128 17177978 0x4cf4080}} -- content: 'hello-6'```
```

2. Run the producer and publish messages to the `topic-1`:

```bash
$ go build -o producer sampleProdcer.go
$ ./producer
$ ./producer -serviceURL pulsar+ssl://cloud.streamnative.dev:6651 \
-privateKeyFile /path/to/private/key/file.txt\
-audience urn:sn:pulsar:pulsar-instance-ns:pulsar-instance-name\
-issuerUrl https://cloud.streamnative.dev\
-clientId abcdefghigk0123456789
```

Output:

```text
2020/08/03 09:11:24 Published message: {27 0 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 1 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 2 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 3 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 4 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 5 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 6 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 7 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 8 0 0 <nil> <nil> {0 0 <nil>}}
2020/08/03 09:11:24 Published message: {27 9 0 0 <nil> <nil> {0 0 <nil>}}
time="2020-08-03T09:11:24+08:00" level=info msg="Closing producer" producer_name=standalone-1-1 topic="persistent://public/default/topic-1"
time="2020-08-03T09:11:24+08:00" level=info msg="Closed producer" producer_name=standalone-1-1 topic="persistent://public/default/topic-1"
Published message: {10 20 0 0}
Published message: {10 21 0 0}
Published message: {10 22 0 0}
Published message: {10 23 0 0}
Published message: {10 24 0 0}
Published message: {10 25 0 0}
Published message: {10 26 0 0}
Published message: {10 27 0 0}
Published message: {10 28 0 0}
Published message: {10 29 0 0}
```
68 changes: 68 additions & 0 deletions cloud/go/ccloud/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ccloud

import (
"flag"
"log"

"github.com/apache/pulsar-client-go/pulsar"
)

var (
help bool
issuerUrl string
audience string
privateKey string
clientId string
serviceURL string
)

func CreateClient() pulsar.Client {
flag.Parse()

if help {
flag.Usage()
}

oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": issuerUrl,
"audience": audience,
"privateKey": privateKey,
"clientId": clientId,
})

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: serviceURL,
Authentication: oauth,
})
if err != nil {
log.Fatal(err)
}
return client
}

func init() {
flag.StringVar(&issuerUrl, "issuerUrl", "", "issuerUrl is a named external system that provides identity and API access by issuing OAuth access tokens")
flag.StringVar(&audience, "audience", "", "audience is the address of the accessed service")
flag.StringVar(&privateKey, "privateKey", "", "privateKey is the path of a service account to access your StreamNative Cloud Pulsar cluster")
flag.StringVar(&clientId, "clientId", "", "clientId is a public identifier for apps")
flag.StringVar(&serviceURL, "serviceURL", "", "serviceURL is the address of the accessed broker")
flag.BoolVar(&help, "help", false, "help cmd")
}
23 changes: 5 additions & 18 deletions cloud/go/sampleConsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,14 @@ package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/streamnative/pulsar-examples/cloud/go/ccloud"
)

func main() {
oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
"type": "client_credentials",
"issuerUrl": "",
"audience": "",
"privateKey": "",
"clientId": "",
})

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar+ssl://broker.example.com:6651/",
Authentication: oauth,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
client := ccloud.CreateClient()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
Expand All @@ -58,7 +45,7 @@ func main() {
log.Fatal(err)
}

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
fmt.Printf("Received message msgId: %v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
Expand Down
Loading