/
svr.go
90 lines (80 loc) · 2.59 KB
/
svr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*
* Copyright (c) 2021 yedf. All rights reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package dtmsvr
import (
"fmt"
"net"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli/dtmimp"
"github.com/yedf/dtm/dtmgrpc/dtmgimp"
"github.com/yedf/dtmdriver"
"google.golang.org/grpc"
)
// StartSvr StartSvr
func StartSvr() {
dtmimp.Logf("start dtmsvr")
app := common.GetGinApp()
app = httpMetrics(app)
addRoute(app)
dtmimp.Logf("dtmsvr listen at: %d", config.HttpPort)
go app.Run(fmt.Sprintf(":%d", config.HttpPort))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", config.GrpcPort))
dtmimp.FatalIfError(err)
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc.UnaryServerInterceptor(grpcMetrics), grpc.UnaryServerInterceptor(dtmgimp.GrpcServerLog)),
))
dtmgimp.RegisterDtmServer(s, &dtmServer{})
dtmimp.Logf("grpc listening at %v", lis.Addr())
go func() {
err := s.Serve(lis)
dtmimp.FatalIfError(err)
}()
go updateBranchAsync()
time.Sleep(100 * time.Millisecond)
err = dtmdriver.Use(config.MicroService.Driver)
dtmimp.FatalIfError(err)
err = dtmdriver.GetDriver().RegisterGrpcService(config.MicroService.Target, config.MicroService.EndPoint)
dtmimp.FatalIfError(err)
}
// PopulateDB setup mysql data
func PopulateDB(skipDrop bool) {
GetStore().PopulateData(skipDrop)
}
// UpdateBranchAsyncInterval interval to flush branch
var UpdateBranchAsyncInterval = 200 * time.Millisecond
var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000)
func updateBranchAsync() {
for { // flush branches every second
defer common.RecoverPanic(nil)
updates := []TransBranch{}
started := time.Now()
checkInterval := 20 * time.Millisecond
for time.Since(started) < UpdateBranchAsyncInterval-checkInterval && len(updates) < 20 {
select {
case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{
ModelBase: common.ModelBase{ID: updateBranch.id},
Status: updateBranch.status,
FinishTime: updateBranch.finishTime,
})
case <-time.After(checkInterval):
}
}
for len(updates) > 0 {
dbr := GetStore().UpdateBranchesSql(updates, []string{"status", "finish_time", "update_time"})
dtmimp.Logf("flushed %d branch status to db. affected: %d", len(updates), dbr.RowsAffected)
if dbr.Error != nil {
dtmimp.LogRedf("async update branch status error: %v", dbr.Error)
time.Sleep(1 * time.Second)
} else {
updates = []TransBranch{}
}
}
}
}