/
index_file_writer.go
115 lines (103 loc) · 3.26 KB
/
index_file_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package data
import (
"encoding/binary"
"fmt"
"hash/crc32"
"os"
"path/filepath"
"github.com/polarstreams/polar/internal/conf"
"github.com/polarstreams/polar/internal/utils"
"github.com/rs/zerolog/log"
)
// Represents a writer for index & offset files
type indexFileWriter struct {
items chan indexFileItem
basePath string
config conf.DatalogConfig
closed chan bool
offsetWriter *offsetFileWriter
}
func newIndexFileWriter(basePath string, config conf.DatalogConfig) *indexFileWriter {
w := &indexFileWriter{
items: make(chan indexFileItem, 1), // Try not to block when sending
config: config,
basePath: basePath,
closed: make(chan bool, 1),
offsetWriter: newOffsetFileWriter(),
}
go w.writeLoop()
return w
}
// writeLoop writes the index files
func (w *indexFileWriter) writeLoop() {
var segmentId *int64
var file *os.File
lastStoredFileOffset := int64(0)
buffer := utils.NewBufferCap(16)
writeThreshold := int64(w.config.IndexFilePeriodBytes())
w.offsetWriter.create(w.basePath)
for item := range w.items {
// Always store the producer.offset file
w.offsetWriter.write(item.tailOffset)
if item.toClose {
// File closing
if file != nil {
if err := file.Close(); err != nil {
log.Err(err).Msgf("Index file closed with error on path %s", w.basePath)
} else {
log.Debug().Msgf("Index file closed on path %s", w.basePath)
}
file = nil
segmentId = nil
lastStoredFileOffset = 0
}
continue
}
if segmentId == nil {
name := fmt.Sprintf("%020d.%s", item.segmentId, conf.IndexFileExtension)
f, err := os.OpenFile(filepath.Join(w.basePath, name), conf.IndexFileWriteFlags, FilePermissions)
if err != nil {
log.Err(err).Msgf("Index file %s could not be created on path %s", w.basePath, name)
continue
} else {
log.Debug().Msgf("Index file created on path %s", w.basePath)
}
file = f
id := item.segmentId
segmentId = &id
}
if item.fileOffset-lastStoredFileOffset >= writeThreshold {
buffer.Reset()
utils.PanicIfErr(binary.Write(buffer, conf.Endianness, item.offset), "Error writing item.offset")
utils.PanicIfErr(binary.Write(buffer, conf.Endianness, item.fileOffset), "Error writing item.fileOffset")
utils.PanicIfErr(binary.Write(buffer, conf.Endianness, crc32.ChecksumIEEE(buffer.Bytes())),
"Error writing item.checksum")
if _, err := file.Write(buffer.Bytes()); err != nil {
log.Err(err).Msgf("There was an error writing to the index file on path %s", w.basePath)
} else {
log.Debug().Msgf("Written to %d index file on path %s", *segmentId, w.basePath)
}
lastStoredFileOffset = item.fileOffset
}
}
w.offsetWriter.close()
w.closed <- true
}
// When conditions apply, it adds a line to the index file mapping file offset with message offset
// in the background.
func (w *indexFileWriter) append(segmentId int64, offset int64, fileOffset int64, tailOffset int64) {
w.items <- indexFileItem{
segmentId: segmentId,
offset: offset,
fileOffset: fileOffset,
tailOffset: tailOffset,
}
}
// Closes the current file in the background
func (w *indexFileWriter) closeFile(segmentId int64, tailOffset int64) {
w.items <- indexFileItem{
segmentId: segmentId,
tailOffset: tailOffset,
toClose: true,
}
}