Skip to content

Commit

Permalink
all: remove code related to PointerDB
Browse files Browse the repository at this point in the history
Change-Id: I6675c9597f87019020f6233b83ab2f1119d2bc46
  • Loading branch information
kaloyan-raev authored and egonelbre committed Apr 21, 2021
1 parent 785adfb commit 2ee3030
Show file tree
Hide file tree
Showing 21 changed files with 76 additions and 819 deletions.
20 changes: 4 additions & 16 deletions cmd/satellite/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()

pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Config.Metainfo.DatabaseURL, "satellite-api")
if err != nil {
return errs.New("Error creating metainfodb connection on satellite api: %+v", err)
}
defer func() {
err = errs.Combine(err, pointerDB.Close())
}()

metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Config.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection on satellite api: %+v", err)
Expand Down Expand Up @@ -87,7 +79,7 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()

peer, err := satellite.NewAPI(log, identity, db, pointerDB, metabaseDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build, process.AtomicLevel(cmd))
peer, err := satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, accountingCache, rollupsWriteCache, &runCfg.Config, version.Build, process.AtomicLevel(cmd))
if err != nil {
return err
}
Expand All @@ -101,14 +93,10 @@ func cmdAPIRun(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on satellite api", zap.Error(err))
}

err = pointerDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating metainfodb tables on satellite api: %+v", err)
}

err = metabaseDB.MigrateToLatest(ctx)
err = metabaseDB.CheckVersion(ctx)
if err != nil {
return errs.New("Error creating metabase tables on satellite api: %+v", err)
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}

err = db.CheckVersion(ctx)
Expand Down
5 changes: 0 additions & 5 deletions cmd/satellite/entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@ set -euo pipefail

SETUP_PARAMS=""

if [[ -n "${STORJ_DATABASE:-}" ]]; then
export STORJ_POINTER_DB_DATABASE_URL="${STORJ_DATABASE}"
fi


if [[ -n "${IDENTITY_ADDR:-}" ]]; then
export STORJ_SERVER_ADDRESS="${IDENTITY_ADDR}"
fi
Expand Down
15 changes: 4 additions & 11 deletions cmd/satellite/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()

pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-gc")
if err != nil {
return errs.New("Error creating pointerDB connection GC: %+v", err)
}
defer func() {
err = errs.Combine(err, pointerDB.Close())
}()

metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
Expand All @@ -60,7 +52,7 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, revocationDB.Close())
}()

peer, err := satellite.NewGarbageCollection(log, identity, db, pointerDB, metabaseDB, revocationDB, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
peer, err := satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
if err != nil {
return err
}
Expand All @@ -74,9 +66,10 @@ func cmdGCRun(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on satellite GC", zap.Error(err))
}

err = pointerDB.MigrateToLatest(ctx)
err = metabaseDB.CheckVersion(ctx)
if err != nil {
return errs.New("Error creating pointerDB tables GC: %+v", err)
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}

err = db.CheckVersion(ctx)
Expand Down
32 changes: 4 additions & 28 deletions cmd/satellite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,14 +376,6 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()

pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-core")
if err != nil {
return errs.New("Error creating metainfodb connection: %+v", err)
}
defer func() {
err = errs.Combine(err, pointerDB.Close())
}()

metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
Expand Down Expand Up @@ -419,7 +411,7 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, rollupsWriteCache.CloseAndFlush(context2.WithoutCancellation(ctx)))
}()

peer, err := satellite.New(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, version.Build, &runCfg.Config, process.AtomicLevel(cmd))
if err != nil {
return err
}
Expand All @@ -434,14 +426,10 @@ func cmdRun(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher", zap.Error(err))
}

err = pointerDB.MigrateToLatest(ctx)
err = metabaseDB.CheckVersion(ctx)
if err != nil {
return errs.New("Error creating metainfodb tables: %+v", err)
}

err = metabaseDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating metabase tables: %+v", err)
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}

err = db.CheckVersion(ctx)
Expand Down Expand Up @@ -472,18 +460,6 @@ func cmdMigrationRun(cmd *cobra.Command, args []string) (err error) {
return errs.New("Error creating tables for master database on satellite: %+v", err)
}

pdb, err := metainfo.OpenStore(ctx, log.Named("migration"), runCfg.Metainfo.DatabaseURL, "satellite-migration")
if err != nil {
return errs.New("Error creating pointer database connection on satellite: %+v", err)
}
defer func() {
err = errs.Combine(err, pdb.Close())
}()
err = pdb.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating tables for pointer database on satellite: %+v", err)
}

metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
Expand Down
19 changes: 3 additions & 16 deletions cmd/satellite/repairer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
err = errs.Combine(err, db.Close())
}()

pointerDB, err := metainfo.OpenStore(ctx, log.Named("pointerdb"), runCfg.Metainfo.DatabaseURL, "satellite-repairer")
if err != nil {
return errs.New("Error creating metainfo database connection: %+v", err)
}
defer func() {
err = errs.Combine(err, pointerDB.Close())
}()

metabaseDB, err := metainfo.OpenMetabase(ctx, log.Named("metabase"), runCfg.Metainfo.DatabaseURL)
if err != nil {
return errs.New("Error creating metabase connection: %+v", err)
Expand All @@ -71,7 +63,6 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
peer, err := satellite.NewRepairer(
log,
identity,
pointerDB,
metabaseDB,
revocationDB,
db.RepairQueue(),
Expand All @@ -96,14 +87,10 @@ func cmdRepairerRun(cmd *cobra.Command, args []string) (err error) {
log.Warn("Failed to initialize telemetry batcher on repairer", zap.Error(err))
}

err = pointerDB.MigrateToLatest(ctx)
if err != nil {
return errs.New("Error creating tables for metainfo database: %+v", err)
}

err = metabaseDB.MigrateToLatest(ctx)
err = metabaseDB.CheckVersion(ctx)
if err != nil {
return errs.New("Error creating tables for metabase: %+v", err)
log.Error("Failed metabase database version check.", zap.Error(err))
return errs.New("failed metabase version check: %+v", err)
}

err = db.CheckVersion(ctx)
Expand Down
3 changes: 1 addition & 2 deletions private/testplanet/planet.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ type Config struct {

// DatabaseConfig defines connection strings for database.
type DatabaseConfig struct {
SatelliteDB string
SatellitePointerDB string
SatelliteDB string
}

// Planet is a full storj system setup.
Expand Down
1 change: 0 additions & 1 deletion private/testplanet/reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// Reconfigure allows to change node configurations.
type Reconfigure struct {
SatelliteDB func(log *zap.Logger, index int, db satellite.DB) (satellite.DB, error)
SatellitePointerDB func(log *zap.Logger, index int, db metainfo.PointerDB) (metainfo.PointerDB, error)
SatelliteMetabaseDB func(log *zap.Logger, index int, db metainfo.MetabaseDB) (metainfo.MetabaseDB, error)
Satellite func(log *zap.Logger, index int, config *satellite.Config)

Expand Down
37 changes: 10 additions & 27 deletions private/testplanet/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ type Satellite struct {
}

Metainfo struct {
Database metainfo.PointerDB
Metabase metainfo.MetabaseDB
Service *metainfo.Service
Endpoint2 *metainfo.Endpoint
Expand Down Expand Up @@ -356,21 +355,6 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
}
planet.databases = append(planet.databases, db)

pointerDB, err := satellitedbtest.CreatePointerDB(ctx, log.Named("pointerdb"), planet.config.Name, "P", index, databases.PointerDB)
if err != nil {
return nil, err
}

if planet.config.Reconfigure.SatellitePointerDB != nil {
var newPointerDB metainfo.PointerDB
newPointerDB, err = planet.config.Reconfigure.SatellitePointerDB(log.Named("pointerdb"), index, pointerDB)
if err != nil {
return nil, errs.Combine(err, pointerDB.Close())
}
pointerDB = newPointerDB
}
planet.databases = append(planet.databases, pointerDB)

metabaseDB, err := satellitedbtest.CreateMetabaseDB(context.TODO(), log.Named("metabase"), planet.config.Name, "M", index, databases.MetabaseDB)
if err != nil {
return nil, err
Expand Down Expand Up @@ -646,7 +630,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})

peer, err := satellite.New(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
peer, err := satellite.New(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, versionInfo, &config, nil)
if err != nil {
return nil, err
}
Expand All @@ -661,7 +645,7 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}

api, err := planet.newAPI(ctx, index, identity, db, pointerDB, metabaseDB, config, versionInfo)
api, err := planet.newAPI(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
Expand All @@ -671,12 +655,12 @@ func (planet *Planet) newSatellite(ctx context.Context, prefix string, index int
return nil, err
}

repairerPeer, err := planet.newRepairer(ctx, index, identity, db, pointerDB, metabaseDB, config, versionInfo)
repairerPeer, err := planet.newRepairer(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}

gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, pointerDB, metabaseDB, config, versionInfo)
gcPeer, err := planet.newGarbageCollection(ctx, index, identity, db, metabaseDB, config, versionInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -711,7 +695,6 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
system.Overlay.Service = api.Overlay.Service
system.Overlay.DQStrayNodes = peer.Overlay.DQStrayNodes

system.Metainfo.Database = api.Metainfo.Database
system.Metainfo.Metabase = api.Metainfo.Metabase
system.Metainfo.Service = peer.Metainfo.Service
system.Metainfo.Endpoint2 = api.Metainfo.Endpoint2
Expand Down Expand Up @@ -756,7 +739,7 @@ func createNewSystem(name string, log *zap.Logger, config satellite.Config, peer
return system
}

func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.API, error) {
prefix := "satellite-api" + strconv.Itoa(index)
log := planet.log.Named(prefix)
var err error
Expand All @@ -776,7 +759,7 @@ func (planet *Planet) newAPI(ctx context.Context, index int, identity *identity.
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})

return satellite.NewAPI(log, identity, db, pointerDB, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
return satellite.NewAPI(log, identity, db, metabaseDB, revocationDB, liveAccounting, rollupsWriteCache, &config, versionInfo, nil)
}

func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, config satellite.Config, versionInfo version.Info) (*satellite.Admin, error) {
Expand All @@ -786,7 +769,7 @@ func (planet *Planet) newAdmin(ctx context.Context, index int, identity *identit
return satellite.NewAdmin(log, identity, db, versionInfo, &config, nil)
}

func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
func (planet *Planet) newRepairer(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.Repairer, error) {
prefix := "satellite-repairer" + strconv.Itoa(index)
log := planet.log.Named(prefix)

Expand All @@ -799,7 +782,7 @@ func (planet *Planet) newRepairer(ctx context.Context, index int, identity *iden
rollupsWriteCache := orders.NewRollupsWriteCache(log.Named("orders-write-cache"), db.Orders(), config.Orders.FlushBatchSize)
planet.databases = append(planet.databases, rollupsWriteCacheCloser{rollupsWriteCache})

return satellite.NewRepairer(log, identity, pointerDB, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), rollupsWriteCache, db.Irreparable(), versionInfo, &config, nil)
return satellite.NewRepairer(log, identity, metabaseDB, revocationDB, db.RepairQueue(), db.Buckets(), db.OverlayCache(), rollupsWriteCache, db.Irreparable(), versionInfo, &config, nil)
}

type rollupsWriteCacheCloser struct {
Expand All @@ -810,7 +793,7 @@ func (cache rollupsWriteCacheCloser) Close() error {
return cache.RollupsWriteCache.CloseAndFlush(context.TODO())
}

func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, pointerDB metainfo.PointerDB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
func (planet *Planet) newGarbageCollection(ctx context.Context, index int, identity *identity.FullIdentity, db satellite.DB, metabaseDB metainfo.MetabaseDB, config satellite.Config, versionInfo version.Info) (*satellite.GarbageCollection, error) {
prefix := "satellite-gc" + strconv.Itoa(index)
log := planet.log.Named(prefix)

Expand All @@ -819,7 +802,7 @@ func (planet *Planet) newGarbageCollection(ctx context.Context, index int, ident
return nil, errs.Wrap(err)
}
planet.databases = append(planet.databases, revocationDB)
return satellite.NewGarbageCollection(log, identity, db, pointerDB, metabaseDB, revocationDB, versionInfo, &config, nil)
return satellite.NewGarbageCollection(log, identity, db, metabaseDB, revocationDB, versionInfo, &config, nil)
}

// atLeastOne returns 1 if value < 1, or value otherwise.
Expand Down
25 changes: 6 additions & 19 deletions private/testplanet/uplink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"storj.io/storj/pkg/revocation"
"storj.io/storj/pkg/server"
"storj.io/storj/private/testplanet"
"storj.io/storj/satellite/metainfo/metabase"
"storj.io/uplink"
"storj.io/uplink/private/metainfo"
)
Expand Down Expand Up @@ -88,31 +87,19 @@ func TestDownloadWithSomeNodesOffline(t *testing.T) {
err := ul.Upload(ctx, satellite, "testbucket", "test/path", testData)
require.NoError(t, err)

// get a remote segment from pointerdb
pdb := satellite.Metainfo.Service
listResponse, _, err := pdb.List(ctx, metabase.SegmentKey{}, "", true, 0, 0)
// get a remote segment
segments, err := satellite.Metainfo.Metabase.TestingAllSegments(ctx)
require.NoError(t, err)

var path string
var pointer *pb.Pointer
for _, v := range listResponse {
path = v.GetPath()
pointer, err = pdb.Get(ctx, metabase.SegmentKey(path))
require.NoError(t, err)
if pointer.GetType() == pb.Pointer_REMOTE {
break
}
}

// calculate how many storagenodes to kill
redundancy := pointer.GetRemote().GetRedundancy()
remotePieces := pointer.GetRemote().GetRemotePieces()
minReq := redundancy.GetMinReq()
redundancy := segments[0].Redundancy
remotePieces := segments[0].Pieces
minReq := redundancy.RequiredShares
numPieces := len(remotePieces)
toKill := numPieces - int(minReq)

for _, piece := range remotePieces[:toKill] {
err := planet.StopNodeAndUpdate(ctx, planet.FindNode(piece.NodeId))
err := planet.StopNodeAndUpdate(ctx, planet.FindNode(piece.StorageNode))
require.NoError(t, err)
}

Expand Down

0 comments on commit 2ee3030

Please sign in to comment.