/
shard.go
152 lines (131 loc) · 3.25 KB
/
shard.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright 2022 Tatris Project Authors. Licensed under Apache-2.0.
package core
import (
"fmt"
"sync"
"time"
"github.com/tatris-io/tatris/internal/common/utils"
"github.com/tatris-io/tatris/internal/core/wal/log"
"go.uber.org/zap"
"github.com/tatris-io/tatris/internal/common/log/logger"
)
// Shard is a logical split of the index
type Shard struct {
Index *Index `json:"-"`
ShardID int
Segments []*Segment
Stat ShardStat
Wal log.WalLog `json:"-"`
lock sync.RWMutex
}
func (shard *Shard) GetName() string {
return fmt.Sprintf("%s/%d", shard.Index.Name, shard.ShardID)
}
func (shard *Shard) GetSegmentNum() int {
return len(shard.Segments)
}
func (shard *Shard) GetSegments() []*Segment {
return shard.Segments
}
func (shard *Shard) GetSegment(idx int) *Segment {
return shard.Segments[idx]
}
func (shard *Shard) GetLatestSegmentID() int {
return shard.GetSegmentNum() - 1
}
func (shard *Shard) GetLatestSegment() *Segment {
SegmentID := shard.GetLatestSegmentID()
if SegmentID < 0 {
return nil
}
return shard.Segments[SegmentID]
}
func (shard *Shard) CheckSegments() {
lastedSegment := shard.GetLatestSegment()
if lastedSegment == nil || lastedSegment.IsMature() {
shard.lock.Lock()
defer shard.lock.Unlock()
lastedSegment = shard.GetLatestSegment()
if lastedSegment == nil || lastedSegment.IsMature() {
newID := shard.GetSegmentNum()
shard.addSegment(newID)
if lastedSegment != nil {
lastedSegment.OnMature()
}
logger.Info(
"add segment",
zap.String("index", shard.Index.Name),
zap.Int("shard", shard.ShardID),
zap.Int("segment", newID),
)
}
}
}
// ForceAddSegment forces adding a segment to current shard
func (shard *Shard) ForceAddSegment() {
shard.lock.Lock()
defer shard.lock.Unlock()
lastedSegment := shard.GetLatestSegment()
newID := shard.GetSegmentNum()
shard.addSegment(newID)
if lastedSegment != nil {
lastedSegment.OnMature()
}
logger.Info(
"add segment",
zap.String("index", shard.Index.Name),
zap.Int("shard", shard.ShardID),
zap.Int("segment", newID),
)
}
func (shard *Shard) UpdateStat(min, max time.Time, docs int64, wals uint64) {
mint := min.UnixMilli()
maxt := max.UnixMilli()
shard.lock.Lock()
defer shard.lock.Unlock()
if shard.Stat.MinTime == 0 {
shard.Stat.MinTime = mint
}
if shard.Stat.MaxTime == 0 {
shard.Stat.MaxTime = maxt
}
if mint != 0 && shard.Stat.MinTime > mint {
shard.Stat.MinTime = mint
}
if maxt != 0 && shard.Stat.MaxTime < maxt {
shard.Stat.MaxTime = maxt
}
shard.Stat.DocNum += docs
if wals != 0 {
shard.Stat.WalIndex = wals
}
logger.Info(
"update shard stat",
zap.Int64("minTime", shard.Stat.MinTime),
zap.Int64("maxTime", shard.Stat.MaxTime),
zap.Int64("docNum", shard.Stat.DocNum),
zap.Uint64("walIndex", shard.Stat.WalIndex),
)
}
func (shard *Shard) Destroy() error {
defer utils.Timerf("close shard finish, name:%s", shard.GetName())()
for _, segment := range shard.Segments {
segment.Destroy()
}
return nil
}
func (shard *Shard) addSegment(segmentID int) {
shard.Segments = append(
shard.Segments,
&Segment{
Shard: shard,
SegmentID: segmentID,
Stat: SegmentStat{
Stat: Stat{
CreateTime: time.Now().UnixMilli(),
},
},
SegmentStatus: SegmentStatusWritable,
},
)
}