/
fstorage_io.go
136 lines (125 loc) · 3.1 KB
/
fstorage_io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package queue
import (
"os"
"runtime"
"sync"
"time"
)
const (
handleInAction = iota
handleInChanel
)
type fileWrite struct {
*os.File
FileIdx StorageIdx
}
type filequeue struct {
sync.RWMutex
memMutex sync.Mutex
storage *fileStorage
state map[StorageIdx]int
toOut chan *fileWrite
}
func createIOQueue(storage *fileStorage) *filequeue {
return &filequeue{
state: make(map[StorageIdx]int, 16),
toOut: make(chan *fileWrite, storage.options.MaxOneTimeOpenedFiles),
storage: storage,
}
}
func (fq *filequeue) getHandle(recordSize uint32, idx StorageIdx, timeout time.Duration) (*fileWrite, error) {
fq.RLock()
defer fq.RUnlock()
var to <-chan time.Time
if timeout != 0 {
to = time.NewTimer(timeout).C
}
for {
select {
case wrk := <-fq.toOut:
if wrk.File == nil {
continue
}
offset, err := wrk.Seek(0, 1)
if err == nil && offset+int64(recordSize) < fq.storage.options.MaxDataFileSize {
fq.memMutex.Lock()
fq.state[wrk.FileIdx] = handleInAction
fq.memMutex.Unlock()
return wrk, nil
}
fq.storage.log.Trace("[FSQ:%s] Datafile {%d} is oversize. Will be closed...", fq.storage.name, idx)
fq.memMutex.Lock()
delete(fq.state, wrk.FileIdx)
fq.memMutex.Unlock()
wrk.Close()
case <-to:
return nil, &queueError{ErrorType: errorTimeOut}
default:
fq.memMutex.Lock()
if len(fq.state) < int(fq.storage.options.MaxOneTimeOpenedFiles) {
datFileName := fq.storage.folder + dataFileNameByID(idx)
file, err := os.Create(datFileName)
if err != nil {
fq.storage.log.Error("[FSQ:%s] Cannot create datafile {%d} Error: %s", fq.storage.name, idx, err.Error())
return nil, err
}
fq.storage.log.Trace("[FSQ:%s] Create datafile {%d}", fq.storage.name, idx)
saveDataFileHeader(file)
tmp := &fileWrite{
File: file,
FileIdx: idx,
}
fq.state[idx] = handleInAction
fq.memMutex.Unlock()
return tmp, nil
}
fq.memMutex.Unlock()
runtime.Gosched()
}
}
}
func (fq *filequeue) putHandle(handle *fileWrite, err error) {
fq.memMutex.Lock()
defer fq.memMutex.Unlock()
fq.storage.log.Trace("[FSQ:%s] Received {%d}", fq.storage.name, handle.FileIdx)
if err == nil {
fq.toOut <- handle
}
if err == nil && handle.File != nil {
fq.state[handle.FileIdx] = handleInChanel
} else {
delete(fq.state, handle.FileIdx)
if handle.File != nil {
handle.Close()
}
}
}
func (fq *filequeue) canClear(idx StorageIdx) bool {
fq.memMutex.Lock()
defer fq.memMutex.Unlock()
fq.storage.log.Trace("[FSQ:%s] Received request to delete {%d}", fq.storage.name, idx)
if _, ok := fq.state[idx]; !ok {
fq.storage.log.Error("[FSQ:%s] Not information about this file {%d}", fq.storage.name, idx)
return true
}
return false
}
func (fq *filequeue) free() {
fq.memMutex.Lock()
defer fq.memMutex.Unlock()
close(fq.toOut)
for wrk := range fq.toOut {
wrk.Close()
}
fq.state = nil
}
func (fq *filequeue) clear() {
fq.memMutex.Lock()
defer fq.memMutex.Unlock()
close(fq.toOut)
for handle := range fq.toOut {
handle.Close()
}
fq.state = make(map[StorageIdx]int, 16)
fq.toOut = make(chan *fileWrite, cap(fq.toOut))
}