Skip to content

Commit

Permalink
storagenode/orders: Open "writable unsent orders file" less
Browse files Browse the repository at this point in the history
Prior to this change, every time a new order limit needed to be written
to disk, the "unsent orders" file was opened, the order was written, and
the "unsent orders" file was closed.

After this change, the "unsent orders" file is opened, and stays open
for all order limits until an order arrives that should be inserted into
a different file (e.g. an hour has elapsed). At this point, the old
"active unsent orders" file is closed and replaced with the next one.

A lifecycle item is added to the storagenode peer for the file store, so
that an open "active unsent orders" file can be closed when the
storagenode shuts down.

https: //github.com/storj/storj-private/issues/716
Change-Id: I1fb00431734932634c6b9573e530a22936cd9815
  • Loading branch information
mobyvb authored and Storj Robot committed May 6, 2024
1 parent b9fe1e7 commit f00bf1e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
46 changes: 41 additions & 5 deletions storagenode/orders/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ type FileStore struct {
active map[activeWindow]int

// mutex for unsent directory
unsentMu sync.Mutex
unsentMu sync.Mutex
unsentOrdersFileName string
unsentOrdersFile ordersfile.Writable
// mutex for archive directory
archiveMu sync.Mutex

Expand All @@ -71,6 +73,19 @@ func NewFileStore(log *zap.Logger, ordersDir string, orderLimitGracePeriod time.
return fs, nil
}

// Close closes the file store.
func (store *FileStore) Close() error {
store.unsentMu.Lock()
defer store.unsentMu.Unlock()
store.activeMu.Lock()
defer store.activeMu.Unlock()

if store.unsentOrdersFile != nil {
return OrderError.Wrap(store.unsentOrdersFile.Close())
}
return nil
}

// BeginEnqueue returns a function that can be called to enqueue the passed in Info. If the Info
// is too old to be enqueued, then an error is returned.
func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Time) (commit func(*ordersfile.Info) error, err error) {
Expand Down Expand Up @@ -110,13 +125,10 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti
}

// write out the data
of, err := ordersfile.OpenWritableUnsent(store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation)
of, err := store.getWritableUnsent(store.unsentDir, info.Limit.SatelliteId, info.Limit.OrderCreation)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, OrderError.Wrap(of.Close()))
}()

err = of.Append(info)
if err != nil {
Expand All @@ -127,6 +139,30 @@ func (store *FileStore) BeginEnqueue(satelliteID storj.NodeID, createdAt time.Ti
}, nil
}

// getWritableUnsent retrieves an already open "unsent orders" file, or otherwise opens a new one.
// Caller must guarantee to obtain the unset lock before calling this method.
func (store *FileStore) getWritableUnsent(unsentDir string, satelliteID storj.NodeID, creationTime time.Time) (of ordersfile.Writable, err error) {
fileName := ordersfile.UnsentFileName(satelliteID, creationTime, ordersfile.V1)
// check whether the active unsent orders file needs to be closed and a new one needs to be opened
if fileName != store.unsentOrdersFileName {
if store.unsentOrdersFile != nil {
err := store.unsentOrdersFile.Close()
if err != nil {
return nil, OrderError.Wrap(err)
}
}
filePath := filepath.Join(unsentDir, fileName)
store.unsentOrdersFile, err = ordersfile.OpenWritableV1(filePath, satelliteID, creationTime)
if err != nil {
return nil, OrderError.Wrap(err)
}
store.unsentOrdersFileName = fileName
}

// return the active unsent orders file
return store.unsentOrdersFile, nil
}

// enqueueStartedLocked records that there is an order pending to be written to the window.
func (store *FileStore) enqueueStartedLocked(satelliteID storj.NodeID, createdAt time.Time) {
store.active[activeWindow{
Expand Down
4 changes: 4 additions & 0 deletions storagenode/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,10 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
if err != nil {
return nil, errs.Combine(err, peer.Close())
}
peer.Services.Add(lifecycle.Item{
Name: "ordersfilestore",
Close: peer.OrdersStore.Close,
})

peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
process.NamedLog(peer.Log, "piecestore"),
Expand Down

0 comments on commit f00bf1e

Please sign in to comment.