/
shard.go
73 lines (60 loc) · 1.65 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package gosql
import (
"bytes"
"database/sql"
"fmt"
)
type DBHandler func(*sql.DB, int, *bytes.Buffer) error
var DefaultDBExecutor DbExecutor = func(db *sql.DB, handler DBHandler, node int, table *bytes.Buffer) {
_ = handler(db, node, table)
}
type StaticShard struct {
Master *sql.DB
Replica []*sql.DB
lb Replication
}
func (s *StaticShard) GetMaster() *sql.DB {
return s.Master
}
func (s *StaticShard) GetReplica() *sql.DB {
total := s.GetReplicaTotal()
if 0 == total {
return s.Master
}
i := s.lb.Replicate(total)
if i < total {
return s.Replica[i]
} else {
return s.Replica[i%total]
}
}
func (s *StaticShard) GetReplicaTotal() int {
return len(s.Replica)
}
func ProvideStaticShards(conf *Config, sharding Sharding, lb Replication) ([]Shard, error) {
if "static" != conf.GetShardingConfig().GetType() {
return nil, fmt.Errorf("static cluster type must be static ")
}
var shards []Shard
for i, c := range conf.GetShardsConfig() {
if 1 == conf.GetShardingConfig().GetTotal() {
shards = append(shards, NewShard(c, sharding.GetDbname(), lb))
} else {
shards = append(shards, NewShard(c, sharding.Allocation(i, sharding.GetDbname()), lb))
}
}
return shards, nil
}
func NewShard(conf ShardConfig, dbname string, lb Replication) *StaticShard {
conf.GetMasterConfig()
master := NewDB(conf.GetMasterConfig().GetDriver(), conf.GetMasterConfig().GetUrl(dbname), conf.GetMasterConfig().GetConn())
var replicas []*sql.DB
for _, c := range conf.GetReplicasConfig() {
replicas = append(replicas, NewDB(c.GetDriver(), c.GetUrl(dbname), c.GetConn()))
}
return &StaticShard{
Master: master,
Replica: replicas,
lb: lb,
}
}