This repository has been archived by the owner on Mar 1, 2024. It is now read-only.
forked from alpacahq/marketstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
wal.go
857 lines (797 loc) · 26 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
package executor
import (
"crypto/md5"
"fmt"
goio "io"
"os"
"time"
"bytes"
"io/ioutil"
"path/filepath"
"sort"
"github.com/alpacahq/marketstore/executor/buffile"
"github.com/alpacahq/marketstore/utils/io"
. "github.com/alpacahq/marketstore/utils/log"
"github.com/golang/glog"
)
/*
NOTE: Access to the WAL structures for a single WAL File is single threaded
of the CommandChannel.
*/
type WALFileType struct {
// These three fields plus the MID form the WAL Header, written at the beginning of the WAL File
FileStatus FileStatusEnum
ReplayState ReplayStateEnum
OwningInstanceID int64
// End of WAL Header
RootPath string // Path to the root directory, base of FileName
FilePath string // WAL file full path
lastCommittedTGID int64 // TGID to be checkpointed
FilePtr *os.File // Active file pointer to FileName
}
func NewWALFile(rootDir string, existingFilePath string) (wf *WALFileType, err error) {
wf = new(WALFileType)
wf.lastCommittedTGID = 0
if len(existingFilePath) == 0 {
if err = wf.createFile(rootDir); err != nil {
Log(FATAL, "%v: Can not create new WALFile - Error: %v", io.GetCallerFileContext(0), err)
}
wf.WriteStatus(OPEN, NOTREPLAYED)
} else {
if err = wf.takeOverFile(rootDir, existingFilePath); err != nil {
Log(FATAL, "%v: Can not take over existing WALFile - Error: %v", io.GetCallerFileContext(0), err)
}
// We call this to take over the file by writing our PID to it
fileStatus, replayState, _ := wf.readStatus()
wf.WriteStatus(fileStatus, replayState)
}
return wf, nil
}
func (wf *WALFileType) createFile(rootDir string) error {
wf.RootPath = rootDir
now := time.Now().UTC()
nowNano := now.UnixNano()
wf.FilePath = filepath.Join(rootDir, "WALFile")
wf.FilePath = fmt.Sprintf("%s.%d", wf.FilePath, nowNano)
wf.FilePath = wf.FilePath + ".walfile"
// Try to open the file for writing, creating it in the process
err := wf.Open()
if err != nil {
return WALCreateError("CreateFile" + err.Error())
}
return nil
}
func (wf *WALFileType) takeOverFile(rootDir string, existingPath string) error {
wf.RootPath = rootDir
wf.FilePath = existingPath
err := wf.Open()
if err != nil {
return WALTakeOverError("TakeOverFile" + err.Error())
}
if wf.callerOwnsFile() {
return WALTakeOverError("TakeOver: File file is owned by calling process")
}
return nil
}
func (wf *WALFileType) Open() error {
var err error
wf.FilePtr, err = os.OpenFile(wf.FilePath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return fmt.Errorf(io.GetCallerFileContext(0) + err.Error())
}
return nil
}
func (wf *WALFileType) Close(ReplayStatus ReplayStateEnum) {
wf.WriteStatus(CLOSED, ReplayStatus)
wf.FilePtr.Close()
}
func (wf *WALFileType) Delete() (err error) {
if !wf.IsOpen() {
Log(WARNING, io.GetCallerFileContext(0)+": Can not delete open WALFile")
return fmt.Errorf("WAL File is open")
}
if wf.isActive() {
Log(WARNING, io.GetCallerFileContext(0)+": Can not delete active WALFile")
return fmt.Errorf("WAL File is active")
}
if wf.NeedsReplay() {
Log(WARNING, io.GetCallerFileContext(0)+": WALFile needs replay, can not delete")
return fmt.Errorf("WAL File needs replay")
}
wf.Close(REPLAYED)
if err = os.Remove(wf.FilePath); err != nil {
Log(FATAL, io.GetCallerFileContext(0)+": Can not remove WALFile")
}
return nil
}
func (wf *WALFileType) read(targetOffset int64, buffer []byte) (result []byte, newOffset int64, err error) {
/*
Read from the WAL file
targetOffset: -1 will read from current position
*/
offset, err := wf.FilePtr.Seek(0, os.SEEK_CUR)
if err != nil {
Log(FATAL, io.GetCallerFileContext(0)+": Unable to seek in WALFile")
}
if targetOffset != -1 {
if offset != targetOffset {
wf.FilePtr.Seek(targetOffset, os.SEEK_SET)
}
}
numToRead := len(buffer)
n, err := wf.FilePtr.Read(buffer)
if n != numToRead {
msg := fmt.Sprintf("Read: Expected: %d Got: %d", numToRead, n)
err = ShortReadError(msg)
} else if err != nil {
Log(FATAL, io.GetCallerFileContext(0)+": Unable to read WALFile")
}
return buffer, offset + int64(n), err
}
func (wf *WALFileType) FullPathToWALKey(fullPath string) (keyPath string) {
/*
NOTE: This key includes the year filename at the end of the metadata key
*/
// Chops rootPath from fullPath to produce a WAL Key
keyPath, _ = filepath.Rel(wf.RootPath, fullPath)
return keyPath
}
func (wf *WALFileType) WALKeyToFullPath(keyPath string) (fullPath string) {
// Adds rootPath to keyPath to produce a fullPath
return filepath.Join(wf.RootPath, keyPath)
}
type offsetIndexBuffer []byte
func (b offsetIndexBuffer) Offset() int64 {
return io.ToInt64(b[:8])
}
func (b offsetIndexBuffer) Index() int64 {
return io.ToInt64(b[8:16])
}
func (b offsetIndexBuffer) IndexAndPayload() []byte {
return b[8:]
}
func (b offsetIndexBuffer) Payload() []byte {
return b[16:]
}
type CachedFP struct {
fileName string
fp *os.File
}
func NewCachedFP() *CachedFP {
return new(CachedFP)
}
func (cfp *CachedFP) GetFP(fileName string) (fp *os.File, err error) {
if fileName == cfp.fileName {
return cfp.fp, nil
} else if len(cfp.fileName) != 0 {
cfp.fp.Close()
}
cfp.fp, err = os.OpenFile(fileName, os.O_RDWR, 0700)
if err != nil {
return nil, err
}
cfp.fileName = fileName
return cfp.fp, nil
}
func (cfp *CachedFP) Close() error {
if cfp.fp != nil {
return cfp.fp.Close()
}
return nil
}
// A.k.a. Commit transaction
func (wf *WALFileType) flushToWAL(tgc *TransactionPipe) (err error) {
/*
Here we flush the contents of the Mkts write cache to:
- Primary storage via the OS write cache - data is visible to readers
- WAL file with synchronization to physical storage - in case we need to recover from a crash
*/
defer dispatchWrittenIndexes()
WALBypass := ThisInstance.WALBypass
//WALBypass = true // Bypass all writing to the WAL File, leaving the writes to the primary
// Count of WT Sets in this TG as of now
if tgc == nil {
return nil
}
WTCount := len(tgc.writeChannel)
if WTCount == 0 {
// refresh TGID so requester can confirm it went through even if nothing is written
tgc.NewTGID()
return nil
}
if !WALBypass {
if !wf.CanWrite("WriteTG") {
panic("Failed attempt to write to WAL")
}
// WAL Transaction Preparing Message
wf.WriteTransactionInfo(tgc.TGID(), WAL, PREPARING)
}
// Serialize all data to be written except for the size of this buffer
var TG_Serialized, TGLen_Serialized []byte
TG_Serialized, _ = io.Serialize(TG_Serialized, tgc.TGID())
TG_Serialized, _ = io.Serialize(TG_Serialized, int64(WTCount))
writesPerFile := map[string][]offsetIndexBuffer{}
fileRecordTypes := map[string]io.EnumRecordType{}
/*
This loop serializes write transactions from the channel for writing to disk
*/
for i := 0; i < WTCount; i++ {
command := <-tgc.writeChannel
TG_Serialized, _ = io.Serialize(TG_Serialized, int8(command.RecordType))
TG_Serialized, _ = io.Serialize(TG_Serialized, int16(len(command.WALKeyPath)))
TG_Serialized, _ = io.Serialize(TG_Serialized, command.WALKeyPath)
TG_Serialized, _ = io.Serialize(TG_Serialized, int32(len(command.Data)))
oStart := len(TG_Serialized)
bufferSize := 8 + 8 + len(command.Data)
TG_Serialized, _ = io.Serialize(TG_Serialized, command.Offset)
TG_Serialized, _ = io.Serialize(TG_Serialized, command.Index)
TG_Serialized = append(TG_Serialized, command.Data...)
keyPath := command.WALKeyPath
// Store the data in a buffer for primary storage writes after WAL writes are done
writesPerFile[keyPath] = append(writesPerFile[keyPath],
offsetIndexBuffer(TG_Serialized[oStart:oStart+bufferSize]))
if _, ok := fileRecordTypes[keyPath]; !ok {
fileRecordTypes[keyPath] = command.RecordType
}
}
if !WALBypass {
// Serialize the size of the buffer into another buffer
TGLen_Serialized, _ = io.Serialize(TGLen_Serialized, int64(len(TG_Serialized)))
// Calculate the MD5 checksum, including the value of TGLen
hash := md5.New()
hash.Write(TGLen_Serialized)
hash.Write(TG_Serialized)
wf.FilePtr.Write(wf.initMessage(TGDATA)) // Write the Message ID to identify TG Data
// Write the TG Data and the checksum and Sync()
wf.FilePtr.Write(TGLen_Serialized)
wf.FilePtr.Write(TG_Serialized)
cksum := hash.Sum(nil)
wf.FilePtr.Write(cksum) // Checksum
wf.FilePtr.Sync() // Flush the OS buffer
// WAL Transaction Commit Complete Message
TGID := tgc.TGID()
wf.WriteTransactionInfo(TGID, WAL, COMMITCOMPLETE)
wf.lastCommittedTGID = TGID
tgc.NewTGID()
}
/*
Write the buffers to primary files (should happen after WAL writes)
*/
for keyPath, writes := range writesPerFile {
recordType := fileRecordTypes[keyPath]
if err := wf.writePrimary(keyPath, writes, recordType); err != nil {
return err
}
for i, buffer := range writes {
addWrittenIndex(keyPath, buffer.Index())
writes[i] = nil // for GC
}
writesPerFile[keyPath] = nil // for GC
}
return nil
}
func (wf *WALFileType) writePrimary(keyPath string, writes []offsetIndexBuffer, recordType io.EnumRecordType) error {
fullPath := wf.WALKeyToFullPath(keyPath)
type WriteAtCloser interface {
goio.WriterAt
goio.Closer
}
const batchThreshold = 100
var fp WriteAtCloser
var err error
if recordType == io.FIXED && len(writes) >= batchThreshold {
fp, err = buffile.New(fullPath)
} else {
fp, err = os.OpenFile(fullPath, os.O_RDWR, 0700)
}
if err != nil {
// this is critical, in fact, since tx has been committed
glog.Errorf("cannot open file %s for write: %v", fullPath, err)
return err
}
defer fp.Close()
for _, buffer := range writes {
switch recordType {
case io.FIXED:
err = WriteBufferToFile(fp, buffer)
case io.VARIABLE:
err = WriteBufferToFileIndirect(fp.(*os.File), buffer)
}
if err != nil {
glog.Errorf("failed to write committed data: %v", err)
return err
}
}
return nil
}
// createCheckpoint flushes all primary dirty pages to disk, and
// so closes out the previous WAL state to end. Note, this is
// not goroutine-safe with flushToWAL and caller should make sure
// it is streamlined.
func (wf *WALFileType) createCheckpoint() error {
if wf.lastCommittedTGID == 0 {
return nil
}
if ThisInstance.WALBypass {
io.Syncfs()
} else {
// WAL Transaction Preparing Message
// Get the latest TGID and write a prepare message
TGID := wf.lastCommittedTGID
wf.WriteTransactionInfo(TGID, CHECKPOINT, PREPARING)
// Sync the filesystem, after this point the filesystem cache data is committed to disk
io.Syncfs()
wf.WriteTransactionInfo(TGID, CHECKPOINT, COMMITCOMPLETE)
}
wf.lastCommittedTGID = 0
return nil
}
type TGIDlist []int64
func (tgl TGIDlist) Len() int { return len(tgl) }
func (tgl TGIDlist) Less(i, j int) bool { return tgl[i] < tgl[j] }
func (tgl TGIDlist) Swap(i, j int) { tgl[i], tgl[j] = tgl[j], tgl[i] }
func (wf *WALFileType) Replay(writeData bool) error {
/*
Replay this WAL File's unwritten transactions.
We will do this in two passes, in the first pass we will collect the Transaction Group IDs that are
not yet durably written to the primary store. In the second pass, we write the data into the
Primary Store directly and then flush the results.
Finally we close the WAL File and mark it completely written.
1) First WAL Pass: Locate unwritten TGIDs
2) Second WAL Pass: Load the open TG data into the Primary Data files
3) Flush the TG Cache to primary and mark this WAL File completely processed
Note that the TG Data for any given TGID should appear in the WAL only once. We verify it in the first
pass.
*/
// Make sure this file needs replay
if !wf.NeedsReplay() {
err := fmt.Errorf("WALFileType.NeedsReplay No Replay Needed")
Log(INFO, err.Error())
return err
}
// Take control of this file and set the status
if writeData {
wf.WriteStatus(OPEN, REPLAYINPROCESS)
}
// First pass of WAL Replay: determine transaction states and record locations of TG data
txnStateWAL := make(map[int64]TxnStatusEnum, 0)
txnStatePrimary := make(map[int64]TxnStatusEnum, 0)
offsetTGDataInWAL := make(map[int64]int64, 0)
fullRead := func(err error) bool {
// Check to see if we have read only partial data
if err != nil {
if _, ok := err.(ShortReadError); ok {
Log(INFO, "Partial Read")
return false
} else {
Log(FATAL, io.GetCallerFileContext(0)+": Uncorrectable IO error in WAL Replay")
}
}
return true
}
Log(INFO, "Beginning WAL Replay")
if !writeData {
Log(INFO, "Debugging mode enabled - no writes will be performed...")
}
// Create a map to store the TG Data prior to replay
TGData := make(map[int64][]byte)
wf.FilePtr.Seek(0, os.SEEK_SET)
continueRead := true
for continueRead {
MID, err := wf.readMessageID()
if continueRead = fullRead(err); !continueRead {
break // Break out of read loop
}
switch MID {
case TGDATA:
// Read a TGData
offset, _ := wf.FilePtr.Seek(0, os.SEEK_CUR)
TGID, TG_Serialized, err := wf.readTGData()
TGData[TGID] = TG_Serialized
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
// Throw FATAL if there is already a TG data location in this WAL
if _, ok := offsetTGDataInWAL[TGID]; ok {
Log(FATAL, io.GetCallerFileContext(0)+": Duplicate TG Data in WAL")
}
// Log(INFO, "Successfully read past TG data for TGID: %v", TGID)
// Save the offset of this TG Data for the second pass
offsetTGDataInWAL[TGID] = offset
case TXNINFO:
// Read a TXNInfo
TGID, destination, txnStatus, err := wf.readTransactionInfo()
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
switch destination {
case WAL:
txnStateWAL[TGID] = txnStatus
case CHECKPOINT:
if _, ok := TGData[TGID]; ok && txnStatus == COMMITCOMPLETE {
// Remove all TGData for TGID less than this complete one
for tgid, _ := range TGData {
if tgid <= TGID {
TGData[tgid] = nil
delete(TGData, tgid)
}
}
} else {
// Record this txnStatus for later analysis
txnStatePrimary[TGID] = txnStatus
}
}
case STATUS:
// Read the status - note that this message should only be at the file beginning
_, _, _, err := wf.ReadStatus()
if continueRead = fullRead(err); !continueRead {
break // Break out of switch
}
default:
glog.Warningf("Unknown meessage id %d", MID)
}
}
// Second Pass of WAL Replay: Find any pending transactions based on the state and load the TG data into cache
Log(INFO, "Entering replay of TGData")
// We need to replay TGs in descending TGID order
// StringSlice attaches the methods of Interface to []string, sorting in increasing order.
var sortedTGIDs TGIDlist
for tgid := range TGData {
sortedTGIDs = append(sortedTGIDs, tgid)
}
sort.Sort(sortedTGIDs)
//for tgid, TG_Serialized := range TGData {
for _, tgid := range sortedTGIDs {
TG_Serialized := TGData[tgid]
if TG_Serialized != nil {
// Note that only TG data that did not have a COMMITCOMPLETE record are replayed
if writeData {
Log(INFO, "Replaying TGID: %d, data length is: %d bytes", tgid, len(TG_Serialized))
if err := wf.replayTGData(TG_Serialized); err != nil {
return err
}
} else {
Log(INFO, "Replay for TGID: %d, data length is: %d bytes", tgid, len(TG_Serialized))
}
}
}
Log(INFO, "Replay of WAL file %s finished", wf.FilePath)
if writeData {
wf.WriteStatus(OPEN, REPLAYED)
}
Log(INFO, "Finished replay of TGData")
return nil
}
func (wf *WALFileType) WriteStatus(FileStatus FileStatusEnum, ReplayState ReplayStateEnum) {
wf.FileStatus = FileStatus
wf.ReplayState = ReplayState
// This process now owns this file
wf.OwningInstanceID = ThisInstance.InstanceID
buffer := wf.initMessage(STATUS)
buffer, _ = io.Serialize(buffer, int8(wf.FileStatus))
buffer, _ = io.Serialize(buffer, int8(wf.ReplayState))
buffer, _ = io.Serialize(buffer, int64(wf.OwningInstanceID))
wf.FilePtr.Seek(0, os.SEEK_SET)
wf.FilePtr.Write(buffer)
wf.FilePtr.Sync()
wf.FilePtr.Seek(0, os.SEEK_END)
}
func (wf *WALFileType) write(buffer []byte) {
wf.FilePtr.Write(buffer)
wf.FilePtr.Sync()
}
func (wf *WALFileType) WriteTransactionInfo(tid int64, did DestEnum, txnStatus TxnStatusEnum) {
buffer := wf.initMessage(TXNINFO)
buffer, _ = io.Serialize(buffer, tid)
buffer, _ = io.Serialize(buffer, did)
buffer, _ = io.Serialize(buffer, txnStatus)
wf.write(buffer)
}
func (wf *WALFileType) readTransactionInfo() (tgid int64, destination DestEnum, txnStatus TxnStatusEnum, err error) {
var buffer [10]byte
buf, _, err := wf.read(-1, buffer[:])
if err != nil {
return 0, 0, 0, ShortReadError("WALFileType.readTransactionInfo")
}
tgid, destination, txnStatus = io.ToInt64(buf), DestEnum(buf[8]), TxnStatusEnum(buf[9])
switch destination {
case CHECKPOINT, WAL:
break
default:
return 0, 0, 0, fmt.Errorf("WALFileType.readTransactionInfo Invalid destination ID: %d", destination)
}
switch txnStatus {
case PREPARING, COMMITINTENDED, COMMITCOMPLETE:
break
default:
return 0, 0, 0, fmt.Errorf("WALFileType.readTransactionInfo Invalid Txn Status: %d", txnStatus)
}
return tgid, destination, txnStatus, nil
}
func (wf *WALFileType) initMessage(mid MIDEnum) []byte {
buffer, _ := io.Serialize([]byte{}, mid)
return buffer
}
func (wf *WALFileType) writeMessageID(mid MIDEnum) {
wf.write(wf.initMessage(mid))
}
func (wf *WALFileType) readMessageID() (mid MIDEnum, err error) {
var buffer [1]byte
buf, _, err := wf.read(-1, buffer[:])
if err != nil {
return 0, ShortReadError("WALFileType.ReadMessageID")
}
MID := MIDEnum(buf[0])
switch MID {
case TGDATA, TXNINFO, STATUS:
return MID, nil
}
return 99, fmt.Errorf("WALFileType.ReadMessageID Incorrect MID read, value: %d", MID)
}
func (wf *WALFileType) readTGData() (TGID int64, TG_Serialized []byte, err error) {
TGLen_Serialized := make([]byte, 8)
TGLen_Serialized, _, err = wf.read(-1, TGLen_Serialized)
if err != nil {
return 0, nil, ShortReadError(io.GetCallerFileContext(0))
}
TGLen := io.ToInt64(TGLen_Serialized)
if !wf.sanityCheckValue(TGLen) {
return 0, nil, fmt.Errorf(io.GetCallerFileContext(0) + fmt.Sprintf(": Insane TG Length: %d", TGLen))
}
// Read the data
TG_Serialized = make([]byte, TGLen)
n, err := wf.FilePtr.Read(TG_Serialized)
if int64(n) != TGLen || err != nil {
return 0, nil, ShortReadError(io.GetCallerFileContext(0) + ":Reading Data")
}
TGID = io.ToInt64(TG_Serialized[:7])
// Compute the checksum
hash := md5.New()
hash.Write(TGLen_Serialized)
hash.Write(TG_Serialized)
cksum := hash.Sum(nil)
// Read the checksum
checkBuf := make([]byte, 16)
n, err = wf.FilePtr.Read(checkBuf)
if n != 16 || err != nil {
return 0, nil, ShortReadError(io.GetCallerFileContext(0) + ":Reading Checksum")
}
if !bytes.Equal(cksum, checkBuf) {
return 0, nil, fmt.Errorf(io.GetCallerFileContext(0) + fmt.Sprintf(":Checksum was: %v should be: %v", cksum, checkBuf))
}
return TGID, TG_Serialized, nil
}
func (wf *WALFileType) replayTGData(TG_Serialized []byte) (err error) {
TGID := io.ToInt64(TG_Serialized[0:8])
WTCount := io.ToInt64(TG_Serialized[8:16])
cursor := 16
if int(WTCount) != 0 {
cfp := NewCachedFP() // Cached open file pointer
defer cfp.Close()
for i := 0; i < int(WTCount); i++ {
RecordType := int(io.ToInt8(TG_Serialized[cursor : cursor+1]))
cursor += 1
FPLen := int(io.ToInt16(TG_Serialized[cursor : cursor+2]))
cursor += 2
WALKeyPath := bytes.NewBuffer(TG_Serialized[cursor : cursor+FPLen]).String()
cursor += FPLen
dataLen := int(io.ToInt32(TG_Serialized[cursor : cursor+4]))
cursor += 4
fullPath := wf.WALKeyToFullPath(WALKeyPath)
fp, err := cfp.GetFP(fullPath)
if err != nil {
return err
}
switch io.EnumRecordType(RecordType) {
case io.FIXED:
if err = WriteBufferToFile(fp, TG_Serialized[cursor:cursor+8+8+dataLen]); err != nil {
return err
}
case io.VARIABLE:
if err = WriteBufferToFileIndirect(fp, TG_Serialized[cursor:cursor+8+8+dataLen]); err != nil {
return err
}
default:
return fmt.Errorf("Error: Record Type is incorrect from WALFile, invalid/outdated WAL file?")
}
cursor += 8 + 8 + dataLen
}
wf.lastCommittedTGID = TGID
wf.createCheckpoint()
}
return nil
}
func (wf *WALFileType) ReadStatus() (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, OwningInstanceID int64, err error) {
var buffer [10]byte
buf, _, err := wf.read(-1, buffer[:])
return FileStatusEnum(buf[0]), ReplayStateEnum(buf[1]), io.ToInt64(buf[2:]), err
}
func (wf *WALFileType) IsOpen() bool {
_, err := wf.FilePtr.Stat()
if err != nil {
Log(INFO, io.GetCallerFileContext(0)+": File stat failed, file probably deleted: "+err.Error())
return false
}
if wf.FileStatus != OPEN {
Log(INFO, io.GetCallerFileContext(0)+": File not opened")
return false
}
return true
}
func (wf *WALFileType) syncStatusRead() {
_, err := wf.FilePtr.Stat()
if err != nil {
Log(FATAL, io.GetCallerFileContext(0)+": File stat failed")
}
wf.FileStatus, wf.ReplayState, wf.OwningInstanceID = wf.readStatus()
}
func (wf *WALFileType) readStatus() (fileStatus FileStatusEnum, replayStatus ReplayStateEnum, owningInstanceID int64) {
// Read from beginning of file +1 to skip over the MID
wf.FilePtr.Seek(1, os.SEEK_SET)
var err error
fileStatus, replayStatus, owningInstanceID, err = wf.ReadStatus()
if err != nil {
Log(FATAL, io.GetCallerFileContext(0)+": Unable to ReadStatus()")
}
// wf.FileStatus, wf.ReplayState, wf.OwningInstanceID = fileStatus, replayStatus, owningInstanceID
// Reset the file pointer to the end of the file
wf.FilePtr.Seek(0, os.SEEK_END)
return fileStatus, replayStatus, owningInstanceID
}
func (wf *WALFileType) callerOwnsFile() bool {
// syncStatus() should be called prior to this
return ThisInstance.InstanceID == wf.OwningInstanceID
}
func (wf *WALFileType) isActive() bool {
// syncStatus() should be called prior to this
rState := wf.ReplayState
return wf.IsOpen() && wf.callerOwnsFile() && rState == NOTREPLAYED
}
func (wf *WALFileType) NeedsReplay() bool {
wf.syncStatusRead()
if wf.ReplayState == NOTREPLAYED || wf.ReplayState == REPLAYINPROCESS {
return true
}
return false
}
func (wf *WALFileType) CanWrite(msg string) bool {
wf.syncStatusRead()
if !wf.isActive() {
Log(WARNING, io.GetCallerFileContext(0)+": Inactive WALFile")
return false
}
return true
}
func (wf *WALFileType) CanDeleteSafely() bool {
wf.syncStatusRead()
if wf.isActive() {
Log(WARNING, io.GetCallerFileContext(0)+": WALFile is active, can not delete")
return false
}
if wf.NeedsReplay() {
Log(WARNING, io.GetCallerFileContext(0)+": WALFile needs replay, can not delete")
return false
}
return true
}
func (wf *WALFileType) sanityCheckValue(value int64) (isSane bool) {
// As a sanity check, get the file size to ensure that TGLen is reasonable prior to buffer allocations
fstat, _ := wf.FilePtr.Stat()
sanityLen := 1000 * fstat.Size()
return value < sanityLen
}
func (wf *WALFileType) cleanupOldWALFiles(rootDir string) {
rootDir = filepath.Clean(rootDir)
files, err := ioutil.ReadDir(rootDir)
if err != nil {
Log(FATAL, "Unable to read root directory %s\n%s", rootDir, err)
}
myFileBase := filepath.Base(wf.FilePath)
Log(INFO, "My WALFILE: %s", myFileBase)
for _, file := range files {
if !file.IsDir() {
filename := file.Name()
if filepath.Ext(filename) == ".walfile" {
if filename != myFileBase {
Log(INFO, "Found a WALFILE: %s, entering replay...", filename)
filePath := filepath.Join(rootDir, filename)
fi, _ := os.Stat(filePath)
if fi.Size() < 11 {
Log(INFO, "WALFILE: %s is empty, removing it...", filename)
os.Remove(filePath)
} else {
w, err := NewWALFile(rootDir, filePath)
if err != nil {
Log(FATAL, "Opening %s\n%s", filename, err)
}
if err = w.Replay(true); err != nil {
Log(FATAL, "Unable to replay %s\n%s", filename, err)
}
if !w.CanDeleteSafely() {
Log(FATAL, "Unable to delete %s after replay", filename)
}
w.Delete()
}
}
}
}
}
}
func StartupCacheAndWAL(rootDir string) (tgc *TransactionPipe, wf *WALFileType, err error) {
wf, err = NewWALFile(rootDir, "")
if err != nil {
Log(ERROR, "%s", err.Error())
return nil, nil, err
}
wf.cleanupOldWALFiles(rootDir)
return NewTransactionPipe(), wf, nil
}
var haveWALWriter = false
func (wf *WALFileType) SyncWAL(WALRefresh, PrimaryRefresh time.Duration, walRotateInterval int) {
/*
Example: syncWAL(500 * time.Millisecond, 15 * time.Minute)
*/
haveWALWriter = true
tickerWAL := time.NewTicker(WALRefresh)
tickerPrimary := time.NewTicker(PrimaryRefresh)
tickerCheck := time.NewTicker(WALRefresh / 100)
primaryFlushCounter := 0
chanCap := cap(ThisInstance.TXNPipe.writeChannel)
for {
if !ThisInstance.ShutdownPending {
select {
case <-tickerWAL.C:
if err := wf.flushToWAL(ThisInstance.TXNPipe); err != nil {
Log(FATAL, err.Error())
}
case f := <-ThisInstance.TXNPipe.flushChannel:
if err := wf.flushToWAL(ThisInstance.TXNPipe); err != nil {
Log(FATAL, err.Error())
}
f <- struct{}{}
case <-tickerCheck.C:
queued := len(ThisInstance.TXNPipe.writeChannel)
if float64(queued)/float64(chanCap) >= 0.8 {
if err := wf.flushToWAL(ThisInstance.TXNPipe); err != nil {
Log(FATAL, err.Error())
}
}
case <-tickerPrimary.C:
wf.createCheckpoint()
primaryFlushCounter++
if primaryFlushCounter%walRotateInterval == 0 {
Log(INFO, "Truncating WAL file...")
wf.FilePtr.Truncate(0)
wf.WriteStatus(OPEN, NOTREPLAYED)
primaryFlushCounter = 0
}
}
} else {
haveWALWriter = false
glog.Info("Flushing to WAL...")
wf.flushToWAL(ThisInstance.TXNPipe)
glog.Info("Flushing to disk...")
wf.createCheckpoint()
ThisInstance.WALWg.Done()
return
}
}
}
// RequestFlush requests WAL Flush to the WAL writer goroutine
// if it exists, or just does the work in the same goroutine otherwise.
// The function blocks if there are no current queued flushes, and
// returns if there is already one queued which will handle the data
// present in the write channel, as it will flush as soon as possible.
func (wf *WALFileType) RequestFlush() {
if !haveWALWriter {
wf.flushToWAL(ThisInstance.TXNPipe)
return
}
// if there's already a queued flush, no need to queue another
if len(ThisInstance.TXNPipe.flushChannel) > 0 {
return
}
f := make(chan struct{})
ThisInstance.TXNPipe.flushChannel <- f
<-f
}