forked from aliyun/aliyun-log-go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
copy_data.go
67 lines (62 loc) · 2.02 KB
/
copy_data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main
import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/consumer"
"github.com/go-kit/kit/log/level"
"os"
"os/signal"
)
// README :
// This is an E2E test, which creates another logstore under the same project to simulate consumption.
// If you don't want to use this method, you can comment out 31-45 lines of code and override your own prcess function
// Be careful not to change the parameter type of process function.
var option consumerLibrary.LogHubConfig
var client sls.Client
var logStore *sls.LogStore
func main() {
option = consumerLibrary.LogHubConfig{
Endpoint: "",
AccessKeyID: "",
AccessKeySecret: "",
Project: "",
Logstore: "",
ConsumerGroupName: "",
ConsumerName: "",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: consumerLibrary.BEGIN_CURSOR,
}
client = sls.Client{
Endpoint: option.Endpoint,
AccessKeyID: option.AccessKeyID,
AccessKeySecret: option.AccessKeySecret,
}
logStore = &sls.LogStore{
Name: "copy-logstore",
TTL: 1,
ShardCount: 2,
}
err := client.CreateLogStoreV2(option.Project, logStore)
if err != nil {
fmt.Println(err)
}
consumerWorker := consumerLibrary.InitConsumerWorker(option, process)
ch := make(chan os.Signal)
signal.Notify(ch)
consumerWorker.Start()
if _, ok := <-ch; ok {
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
consumerWorker.StopAndWait()
}
}
func process(shardId int, logGroupList *sls.LogGroupList) string {
for _, logGroup := range logGroupList.LogGroups {
err := client.PutLogs(option.Project, "copy-logstore", logGroup)
if err != nil {
fmt.Println(err)
}
}
fmt.Println("shardId %v processing works sucess", shardId)
return ""
}