Skip to content

Commit

Permalink
storagenode/orders: begin implementation of file store for order limits
Browse files Browse the repository at this point in the history
* Will replace order limits database.
* This change adds functionality for storing and listing unsent orders.
* The next change will add functionality for order archival after
submission.

Change-Id: Ic5e2abc63991513245b6851a968ff2f2e18ce48d
  • Loading branch information
mobyvb committed Jun 3, 2020
1 parent 6b447a4 commit c8c0a42
Show file tree
Hide file tree
Showing 2 changed files with 434 additions and 0 deletions.
279 changes: 279 additions & 0 deletions storagenode/orders/store.go
@@ -0,0 +1,279 @@
// Copyright (C) 2020 Storj Labs, Inc.
// See LICENSE for copying information.

package orders

import (
"encoding/binary"
"io"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"github.com/zeebo/errs"

"storj.io/common/pb"
"storj.io/common/storj"
)

const (
unsentStagingFileName = "unsent-orders-staging"
unsentReadyFilePrefix = "unsent-orders-ready-"
)

// FileStore implements the orders.Store interface by appending orders to flat files.
type FileStore struct {
ordersDir string
unsentDir string
archiveDir string
mu sync.Mutex
}

// NewFileStore creates a new orders file store.
func NewFileStore(ordersDir string) *FileStore {
return &FileStore{
ordersDir: ordersDir,
unsentDir: filepath.Join(ordersDir, "unsent"),
archiveDir: filepath.Join(ordersDir, "archive"),
}
}

// Enqueue inserts order to the list of orders needing to be sent to the satellite.
func (store *FileStore) Enqueue(info *Info) (err error) {
store.mu.Lock()
defer store.mu.Unlock()

f, err := store.getUnsentStagingFile()
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, f.Close())
}()

err = writeLimit(f, info.Limit)
if err != nil {
return err
}
err = writeOrder(f, info.Order)
if err != nil {
return err
}
return nil
}

// ListUnsentBySatellite returns orders that haven't been sent yet grouped by satellite.
// It copies the staging file to a read-only "ready to send" file first.
// It should never be called concurrently with DeleteReadyToSendFiles.
func (store *FileStore) ListUnsentBySatellite() (infoMap map[storj.NodeID][]*Info, err error) {
err = store.convertUnsentStagingToReady()
if err != nil {
return infoMap, err
}

var errList error
infoMap = make(map[storj.NodeID][]*Info)

err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errList = errs.Combine(errList, err)
return nil
}
if info.IsDir() {
return nil
}
if info.Name() == unsentStagingFileName {
return nil
}

f, err := os.Open(path)
if err != nil {
return OrderError.Wrap(err)
}
defer func() {
err = errs.Combine(err, f.Close())
}()

for {
limit, err := readLimit(f)
if err != nil {
if errs.Is(err, io.EOF) {
break
}
return err
}
order, err := readOrder(f)
if err != nil {
return err
}

newInfo := &Info{
Limit: limit,
Order: order,
}
infoList := infoMap[limit.SatelliteId]
infoList = append(infoList, newInfo)
infoMap[limit.SatelliteId] = infoList
}
return nil
})
if err != nil {
errList = errs.Combine(errList, err)
}

return infoMap, errList
}

// DeleteReadyToSendFiles deletes all non-staging files in the "unsent" directory.
// It should be called after the order limits have been sent.
// It should never be called concurrently with ListUnsentBySatellite.
func (store *FileStore) DeleteReadyToSendFiles() (err error) {
var errList error

err = filepath.Walk(store.unsentDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
errList = errs.Combine(errList, OrderError.Wrap(err))
return nil
}
if info.IsDir() {
return nil
}
if info.Name() == unsentStagingFileName {
return nil
}
// delete all non-staging files
return OrderError.Wrap(os.Remove(path))
})
if err != nil {
errList = errs.Combine(errList, err)
}

return errList
}

// convertUnsentStagingToReady converts the unsent staging file to be read only, and renames it.
func (store *FileStore) convertUnsentStagingToReady() error {
// lock mutex so no one tries to write to the file while we do this
store.mu.Lock()
defer store.mu.Unlock()

oldFileName := unsentStagingFileName
oldFilePath := filepath.Join(store.unsentDir, oldFileName)
if _, err := os.Stat(oldFilePath); os.IsNotExist(err) {
return nil
}

// set file to readonly
err := os.Chmod(oldFilePath, 0444)
if err != nil {
return err
}
// make new file suffix the current time in case there are other "ready" files already
timeStr := strconv.FormatInt(time.Now().UnixNano(), 10)
newFilePath := filepath.Join(store.unsentDir, unsentReadyFilePrefix+timeStr)
// rename file
return os.Rename(oldFilePath, newFilePath)
}

// getUnsentStagingFile creates or gets the order limit file for appending unsent orders to.
// it expects the caller to lock the store's mutex before calling, and to handle closing the returned file.
func (store *FileStore) getUnsentStagingFile() (*os.File, error) {
if _, err := os.Stat(store.unsentDir); os.IsNotExist(err) {
err = os.Mkdir(store.unsentDir, 0700)
if err != nil {
return nil, OrderError.Wrap(err)
}
}

fileName := unsentStagingFileName
filePath := filepath.Join(store.unsentDir, fileName)
// create file if not exists or append
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, OrderError.Wrap(err)
}
return f, nil
}

// writeLimit writes the size of the order limit bytes, followed by the order limit bytes.
// it expects the caller to have locked the mutex.
func writeLimit(f io.Writer, limit *pb.OrderLimit) error {
limitSerialized, err := pb.Marshal(limit)
if err != nil {
return OrderError.Wrap(err)
}

sizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(sizeBytes[:], uint32(len(limitSerialized)))
if _, err = f.Write(sizeBytes[:]); err != nil {
return OrderError.New("Error writing serialized limit size: %w", err)
}

if _, err = f.Write(limitSerialized); err != nil {
return OrderError.New("Error writing serialized limit: %w", err)
}
return nil
}

// readLimit reads the size of the limit followed by the serialized limit, and returns the unmarshalled limit.
func readLimit(f io.Reader) (*pb.OrderLimit, error) {
sizeBytes := [4]byte{}
_, err := io.ReadFull(f, sizeBytes[:])
if err != nil {
return nil, OrderError.Wrap(err)
}
limitSize := binary.LittleEndian.Uint32(sizeBytes[:])
limitSerialized := make([]byte, limitSize)
_, err = io.ReadFull(f, limitSerialized)
if err != nil {
return nil, OrderError.Wrap(err)
}
limit := &pb.OrderLimit{}
err = pb.Unmarshal(limitSerialized, limit)
if err != nil {
return nil, OrderError.Wrap(err)
}
return limit, nil
}

// writeOrder writes the size of the order bytes, followed by the order bytes.
// it expects the caller to have locked the mutex.
func writeOrder(f io.Writer, order *pb.Order) error {
orderSerialized, err := pb.Marshal(order)
if err != nil {
return OrderError.Wrap(err)
}

sizeBytes := [4]byte{}
binary.LittleEndian.PutUint32(sizeBytes[:], uint32(len(orderSerialized)))
if _, err = f.Write(sizeBytes[:]); err != nil {
return OrderError.New("Error writing serialized order size: %w", err)
}
if _, err = f.Write(orderSerialized); err != nil {
return OrderError.New("Error writing serialized order: %w", err)
}
return nil
}

// readOrder reads the size of the order followed by the serialized order, and returns the unmarshalled order.
func readOrder(f io.Reader) (*pb.Order, error) {
sizeBytes := [4]byte{}
_, err := io.ReadFull(f, sizeBytes[:])
if err != nil {
return nil, OrderError.Wrap(err)
}
orderSize := binary.LittleEndian.Uint32(sizeBytes[:])
orderSerialized := make([]byte, orderSize)
_, err = io.ReadFull(f, orderSerialized)
if err != nil {
return nil, OrderError.Wrap(err)
}
order := &pb.Order{}
err = pb.Unmarshal(orderSerialized, order)
if err != nil {
return nil, OrderError.Wrap(err)
}
return order, nil
}

0 comments on commit c8c0a42

Please sign in to comment.