-
Notifications
You must be signed in to change notification settings - Fork 31
/
configure.go
102 lines (91 loc) · 4.08 KB
/
configure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main
import (
"context"
"log"
"strconv"
"time"
"github.com/replicase/pgcapture/pkg/decode"
"github.com/replicase/pgcapture/pkg/pb"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"
)
var (
AgentAddr string
AgentCommand string
ConfigPGConnURL string
ConfigPGReplURL string
ConfigPulsarURL string
ConfigPulsarTopic string
ConfigPGLogPath string
ConfigStartLSN string
ConfigPulsarTracker string
ConfigPulsarTrackerInterval string
ConfigPulsarTrackerReplicateState bool
ConfigDecodePlugin string
ConfigBatchTXSize int
)
func init() {
rootCmd.AddCommand(configure)
configure.Flags().StringVarP(&AgentAddr, "AgentAddr", "", "", "connection addr to pgcapture agent")
configure.Flags().StringVarP(&AgentCommand, "AgentCommand", "", "", "agent command to configure")
configure.Flags().StringVarP(&ConfigPGConnURL, "PGConnURL", "", "", "connection url to install pg extension and fetching schema information")
configure.Flags().StringVarP(&ConfigPGReplURL, "PGReplURL", "", "", "connection url to fetching logs from logical replication slot")
configure.Flags().StringVarP(&ConfigPulsarURL, "PulsarURL", "", "", "connection url to sink pulsar cluster")
configure.Flags().StringVarP(&ConfigPulsarTopic, "PulsarTopic", "", "", "the sink pulsar topic name and as well as the logical replication slot name")
configure.Flags().StringVarP(&ConfigPGLogPath, "PGLogPath", "", "", "pg log path for finding last checkpoint lsn")
configure.Flags().StringVarP(&ConfigStartLSN, "StartLSN", "", "", "the LSN position to start the pg2pulsar process, optional")
configure.Flags().StringVarP(&ConfigPulsarTracker, "PulsarTracker", "", "", "the tracker type for pg2pulsar, optional")
configure.Flags().StringVarP(&ConfigPulsarTrackerInterval, "PulsarTrackerInterval", "", "", "the commit interval for the pg2pulsar, optional")
configure.Flags().BoolVarP(&ConfigPulsarTrackerReplicateState, "PulsarTrackerReplicateState", "", false, "the replicate state for the pg2pulsar, optional")
configure.Flags().StringVarP(&ConfigDecodePlugin, "DecodePlugin", "", decode.PGOutputPlugin, "the logical decoding plugin name")
configure.Flags().IntVarP(&ConfigBatchTXSize, "BatchTxSize", "", 100, "the max number of tx in a pipeline")
configure.MarkFlagRequired("AgentAddr")
configure.MarkFlagRequired("AgentCommand")
configure.MarkFlagRequired("PGConnURL")
configure.MarkFlagRequired("PulsarURL")
configure.MarkFlagRequired("PulsarTopic")
}
var configure = &cobra.Command{
Use: "configure",
Short: "Poke agent's Configure endpoint repeatedly",
RunE: func(cmd *cobra.Command, args []string) (err error) {
params, err := structpb.NewStruct(map[string]interface{}{
"Command": AgentCommand,
"PGConnURL": ConfigPGConnURL,
"PGReplURL": ConfigPGReplURL,
"PulsarURL": ConfigPulsarURL,
"PulsarTopic": ConfigPulsarTopic,
"PGLogPath": ConfigPGLogPath,
"StartLSN": ConfigStartLSN,
"PulsarTracker": ConfigPulsarTracker,
"PulsarTrackerInterval": ConfigPulsarTrackerInterval,
"PulsarTrackerReplicateState": strconv.FormatBool(ConfigPulsarTrackerReplicateState),
"DecodePlugin": ConfigDecodePlugin,
"BatchTxSize": ConfigBatchTXSize,
})
if err != nil {
panic(err)
}
for {
if err := poke(AgentAddr, params); err != nil {
log.Println("Err", err)
}
time.Sleep(5 * time.Second)
}
},
}
func poke(addr string, params *structpb.Struct) error {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return err
}
defer conn.Close()
client := pb.NewAgentClient(conn)
resp, err := client.Configure(context.Background(), &pb.AgentConfigRequest{Parameters: params})
if err != nil {
return err
}
log.Println("Success", resp.String())
return nil
}