Skip to content

Commit

Permalink
storagenode/piecestore: switch usedserials db for in-memory usedseria…
Browse files Browse the repository at this point in the history
…ls store

Part 2 of moving usedserials in memory
* Drop usedserials table in storagenodedb
* Use in-memory usedserials store in place of db for order limit
verification
* Update order limit grace period to be only one hour - this means
uplinks must send their order limits to storagenodes within an hour of
receiving them

Change-Id: I37a0e1d2ca6cb80854a3ef495af2d1d1f92e9f03
  • Loading branch information
mobyvb committed May 28, 2020
1 parent 909d6d9 commit dc57640
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 287 deletions.
1 change: 1 addition & 0 deletions private/testplanet/storagenode.go
Expand Up @@ -153,6 +153,7 @@ func (planet *Planet) newStorageNodes(count int, whitelistedSatellites storj.Nod
CachePath: filepath.Join(storageDir, "trust-cache.json"),
RefreshInterval: defaultInterval,
},
MaxUsedSerialsSize: memory.MiB,
},
Pieces: pieces.DefaultConfig,
Filestore: filestore.DefaultConfig,
Expand Down
10 changes: 4 additions & 6 deletions storagenode/collector/service.go
Expand Up @@ -13,7 +13,7 @@ import (

"storj.io/common/sync2"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/piecestore/usedserials"
)

var mon = monkit.Package()
Expand All @@ -29,13 +29,13 @@ type Config struct {
type Service struct {
log *zap.Logger
pieces *pieces.Store
usedSerials piecestore.UsedSerials
usedSerials *usedserials.Table

Loop *sync2.Cycle
}

// NewService creates a new collector service.
func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials piecestore.UsedSerials, config Config) *Service {
func NewService(log *zap.Logger, pieces *pieces.Store, usedSerials *usedserials.Table, config Config) *Service {
return &Service{
log: log,
pieces: pieces,
Expand Down Expand Up @@ -70,9 +70,7 @@ func (service *Service) Close() (err error) {
func (service *Service) Collect(ctx context.Context, now time.Time) (err error) {
defer mon.Task()(&ctx)(&err)

if deleteErr := service.usedSerials.DeleteExpired(ctx, now); err != nil {
service.log.Error("unable to delete expired used serials", zap.Error(deleteErr))
}
service.usedSerials.DeleteExpired(now)

const maxBatches = 100
const batchSize = 1000
Expand Down
28 changes: 9 additions & 19 deletions storagenode/collector/service_test.go
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"

"storj.io/common/memory"
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/storj/private/testplanet"
Expand Down Expand Up @@ -45,7 +44,7 @@ func TestCollector(t *testing.T) {
// imagine we are 30 minutes in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.DB.UsedSerials()
usedSerials := storageNode.UsedSerials

// verify that we actually have some data on storage nodes
used, err := pieceStore.SpaceUsedForBlobs(ctx)
Expand All @@ -59,43 +58,37 @@ func TestCollector(t *testing.T) {
err = storageNode.Collector.Collect(ctx, time.Now().Add(30*time.Minute))
require.NoError(t, err)

// ensure we haven't deleted used serials
err = usedSerials.IterateAll(ctx, func(_ storj.NodeID, _ storj.SerialNumber, _ time.Time) {
serialsPresent++
})
require.NoError(t, err)
serialsPresent += usedSerials.Count()

collections++
}

require.NotZero(t, collections)
// ensure we haven't deleted used serials
require.Equal(t, 2, serialsPresent)

serialsPresent = 0

// imagine we are 2 hours in the future
for _, storageNode := range planet.StorageNodes {
usedSerials := storageNode.DB.UsedSerials()
usedSerials := storageNode.UsedSerials

// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(2*time.Hour))
require.NoError(t, err)

// ensure we have deleted used serials
err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) {
serialsPresent++
})
require.NoError(t, err)
serialsPresent += usedSerials.Count()

collections++
}

// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent)

// imagine we are 10 days in the future
for _, storageNode := range planet.StorageNodes {
pieceStore := storageNode.DB.Pieces()
usedSerials := storageNode.DB.UsedSerials()
usedSerials := storageNode.UsedSerials

// collect all the data
err = storageNode.Collector.Collect(ctx, time.Now().Add(10*24*time.Hour))
Expand All @@ -106,15 +99,12 @@ func TestCollector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(0), used)

// ensure we have deleted used serials
err = usedSerials.IterateAll(ctx, func(id storj.NodeID, serial storj.SerialNumber, expiration time.Time) {
serialsPresent++
})
require.NoError(t, err)
serialsPresent += usedSerials.Count()

collections++
}

// ensure we have deleted used serials
require.Equal(t, 0, serialsPresent)
})
}
15 changes: 9 additions & 6 deletions storagenode/peer.go
Expand Up @@ -46,6 +46,7 @@ import (
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore"
"storj.io/storj/storagenode/piecestore/usedserials"
"storj.io/storj/storagenode/preflight"
"storj.io/storj/storagenode/pricing"
"storj.io/storj/storagenode/reputation"
Expand Down Expand Up @@ -77,7 +78,6 @@ type DB interface {
PieceExpirationDB() pieces.PieceExpirationDB
PieceSpaceUsedDB() pieces.PieceSpaceUsedDB
Bandwidth() bandwidth.DB
UsedSerials() piecestore.UsedSerials
Reputation() reputation.DB
StorageUsage() storageusage.DB
Satellites() satellites.DB
Expand Down Expand Up @@ -181,9 +181,10 @@ func isAddressValid(addrstring string) error {
// architecture: Peer
type Peer struct {
// core dependencies
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
Log *zap.Logger
Identity *identity.FullIdentity
DB DB
UsedSerials *usedserials.Table

Servers *lifecycle.Group
Services *lifecycle.Group
Expand Down Expand Up @@ -471,6 +472,8 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
Close: peer.Storage2.RetainService.Close,
})

peer.UsedSerials = usedserials.NewTable(config.Storage2.MaxUsedSerialsSize)

peer.Storage2.Endpoint, err = piecestore.NewEndpoint(
peer.Log.Named("piecestore"),
signing.SignerFromFullIdentity(peer.Identity),
Expand All @@ -482,7 +485,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
peer.Storage2.PieceDeleter,
peer.DB.Orders(),
peer.DB.Bandwidth(),
peer.DB.UsedSerials(),
peer.UsedSerials,
config.Storage2,
)
if err != nil {
Expand Down Expand Up @@ -655,7 +658,7 @@ func New(log *zap.Logger, full *identity.FullIdentity, db DB, revocationDB exten
debug.Cycle("Graceful Exit", peer.GracefulExit.Chore.Loop))
}

peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.DB.UsedSerials(), config.Collector)
peer.Collector = collector.NewService(peer.Log.Named("collector"), peer.Storage2.Store, peer.UsedSerials, config.Collector)
peer.Services.Add(lifecycle.Item{
Name: "collector",
Run: peer.Collector.Run,
Expand Down
8 changes: 5 additions & 3 deletions storagenode/piecestore/endpoint.go
Expand Up @@ -31,6 +31,7 @@ import (
"storj.io/storj/storagenode/monitor"
"storj.io/storj/storagenode/orders"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/piecestore/usedserials"
"storj.io/storj/storagenode/retain"
"storj.io/storj/storagenode/trust"
)
Expand All @@ -55,11 +56,12 @@ type Config struct {
MaxConcurrentRequests int `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
DeleteWorkers int `help:"how many piece delete workers" default:"1"`
DeleteQueueSize int `help:"size of the piece delete queue" default:"10000"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"24h0m0s"`
OrderLimitGracePeriod time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
CacheSyncInterval time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
StreamOperationTimeout time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"`
RetainTimeBuffer time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s"`
ReportCapacityThreshold memory.Size `help:"threshold below which to immediately notify satellite of capacity" default:"500MB" hidden:"true"`
MaxUsedSerialsSize memory.Size `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"`

Trust trust.Config

Expand Down Expand Up @@ -87,14 +89,14 @@ type Endpoint struct {
store *pieces.Store
orders orders.DB
usage bandwidth.DB
usedSerials UsedSerials
usedSerials *usedserials.Table
pieceDeleter *pieces.Deleter

liveRequests int32
}

// NewEndpoint creates a new piecestore endpoint.
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials UsedSerials, config Config) (*Endpoint, error) {
func NewEndpoint(log *zap.Logger, signer signing.Signer, trust *trust.Pool, monitor *monitor.Service, retain *retain.Service, pingStats pingStatsSource, store *pieces.Store, pieceDeleter *pieces.Deleter, orders orders.DB, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error) {
return &Endpoint{
log: log,
config: config,
Expand Down
29 changes: 0 additions & 29 deletions storagenode/piecestore/serials.go

This file was deleted.

121 changes: 0 additions & 121 deletions storagenode/piecestore/serials_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion storagenode/piecestore/verification.go
Expand Up @@ -73,7 +73,7 @@ func (endpoint *Endpoint) verifyOrderLimit(ctx context.Context, limit *pb.OrderL
serialExpiration = graceExpiration
}

if err := endpoint.usedSerials.Add(ctx, limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil {
if err := endpoint.usedSerials.Add(limit.SatelliteId, limit.SerialNumber, serialExpiration); err != nil {
return rpcstatus.Wrap(rpcstatus.Unauthenticated, err)
}

Expand Down

0 comments on commit dc57640

Please sign in to comment.