Skip to content

Commit

Permalink
replace flock syscall with package gofrs/flock
Browse files Browse the repository at this point in the history
  • Loading branch information
Kriechi committed Feb 21, 2020
1 parent 2ea7937 commit f1c0006
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
8 changes: 8 additions & 0 deletions go.mod
@@ -1,3 +1,11 @@
module github.com/joncrlsn/dque

require github.com/pkg/errors v0.9.1

require (
github.com/gofrs/flock v0.7.1
github.com/kr/pretty v0.2.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
)

go 1.13
11 changes: 9 additions & 2 deletions go.sum
@@ -1,4 +1,11 @@
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
48 changes: 20 additions & 28 deletions queue.go
Expand Up @@ -12,8 +12,8 @@ package dque
import (
"strconv"
"sync"
"syscall"

"github.com/gofrs/flock"
"github.com/pkg/errors"

"io/ioutil"
Expand Down Expand Up @@ -52,7 +52,7 @@ type DQue struct {
config config

fullPath string
lockfile *os.File // as long as this file is open the queue can be used
fileLock *flock.Flock
firstSegment *qSegment
lastSegment *qSegment
builder func() interface{} // builds a structure to load via gob
Expand Down Expand Up @@ -157,25 +157,17 @@ func NewOrOpen(name string, dirPath string, itemsPerSegment int, builder func()
// Close releases the lock on the queue rendering it unusable for further usage by this instance.
// Close will return an error if it has already been called.
func (q *DQue) Close() error {
if q.lockfile == nil {
if q.fileLock == nil {
return ErrQueueClosed
}

err := syscall.Flock(int(q.lockfile.Fd()), syscall.F_UNLCK)
if err != nil {
return err
}
err = q.lockfile.Close()
if err != nil {
return err
}
err = os.Remove(path.Join(q.DirPath, q.Name, LOCK_FILE))
err := q.fileLock.Close()
if err != nil {
return err
}

// Finally mark this instance as closed to prevent any further access
q.lockfile = nil
q.fileLock = nil

// Safe-guard ourself from accidentally using segments after closing the queue
q.firstSegment = nil
Expand All @@ -186,7 +178,7 @@ func (q *DQue) Close() error {

// Enqueue adds an item to the end of the queue
func (q *DQue) Enqueue(obj interface{}) error {
if q.lockfile == nil {
if q.fileLock == nil {
return ErrQueueClosed
}

Expand Down Expand Up @@ -228,7 +220,7 @@ func (q *DQue) Enqueue(obj interface{}) error {
// Dequeue removes and returns the first item in the queue.
// When the queue is empty, nil and dque.ErrEmpty are returned.
func (q *DQue) Dequeue() (interface{}, error) {
if q.lockfile == nil {
if q.fileLock == nil {
return nil, ErrQueueClosed
}

Expand Down Expand Up @@ -292,7 +284,7 @@ func (q *DQue) Dequeue() (interface{}, error) {
// When the queue is empty, nil and dque.ErrEmpty are returned.
// Do not use this method with multiple dequeueing threads or you may regret it.
func (q *DQue) Peek() (interface{}, error) {
if q.lockfile == nil {
if q.fileLock == nil {
return nil, ErrQueueClosed
}

Expand All @@ -317,7 +309,7 @@ func (q *DQue) Peek() (interface{}, error) {
// size... unless you have changed the itemsPerSegment value since the queue
// was last empty. Then it could be wildly inaccurate.
func (q *DQue) Size() int {
if q.lockfile == nil {
if q.fileLock == nil {
return 0
}

Expand All @@ -336,7 +328,7 @@ func (q *DQue) Size() int {
// Also, because this method is not synchronized, the size may change after
// entering this method.
func (q *DQue) SizeUnsafe() int {
if q.lockfile == nil {
if q.fileLock == nil {
return 0
}
if q.firstSegment.number == q.lastSegment.number {
Expand All @@ -349,7 +341,7 @@ func (q *DQue) SizeUnsafe() int {
// SegmentNumbers returns the number of both the first last segmment.
// There is likely no use for this information other than testing.
func (q *DQue) SegmentNumbers() (int, int) {
if q.lockfile == nil {
if q.fileLock == nil {
return 0, 0
}
return q.firstSegment.number, q.lastSegment.number
Expand All @@ -366,7 +358,7 @@ func (q *DQue) Turbo() bool {
// risk of losing data if a power-loss occurs.
// If turbo is already on an error is returned
func (q *DQue) TurboOn() error {
if q.lockfile == nil {
if q.fileLock == nil {
return ErrQueueClosed
}

Expand All @@ -383,7 +375,7 @@ func (q *DQue) TurboOn() error {
// they happen.
// If turbo is already off an error is returned
func (q *DQue) TurboOff() error {
if q.lockfile == nil {
if q.fileLock == nil {
return ErrQueueClosed
}

Expand All @@ -403,7 +395,7 @@ func (q *DQue) TurboOff() error {
// TurboSync allows you to fsync changes to disk, but only if turbo is on.
// If turbo is off an error is returned
func (q *DQue) TurboSync() error {
if q.lockfile == nil {
if q.fileLock == nil {
return ErrQueueClosed
}
if !q.turbo {
Expand Down Expand Up @@ -484,16 +476,16 @@ func (q *DQue) load() error {

func (q *DQue) lock() error {
l := path.Join(q.DirPath, q.Name, LOCK_FILE)
f, err := os.OpenFile(l, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
fileLock := flock.New(l)

err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
locked, err := fileLock.TryLock()
if err != nil {
return err
}
if !locked {
return errors.New("failed to acquire flock")
}

q.lockfile = f
q.fileLock = fileLock
return nil
}

0 comments on commit f1c0006

Please sign in to comment.