Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add node merge tool #5685

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/dockerhub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ jobs:
uses: actions/checkout@v4
- name: Build docker images
run: |
export VERSION="${{ github.ref_name }}"
make dockerbuild-go
make dockerbuild-bs

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ jobs:
run: |
make install
make build VERSION=${{ github.ref_name }}
- name: Build node-merge
shell: bash
run: |
make node-merge VERSION=${{ github.ref_name }}

- name: Create release archive
shell: bash
Expand Down
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ubuntu:22.04 AS linux
ENV DEBIAN_FRONTEND noninteractive
ENV SHELL /bin/bash
ARG TZ=Etc/UTC
ENV TZ $TZ
ENV TZ=${TZ}
USER root
RUN set -ex \
&& apt-get update --fix-missing \
Expand All @@ -27,6 +27,8 @@ ENV LANGUAGE en_US.UTF-8
ENV LC_ALL en_US.UTF-8

FROM golang:1.21 as builder
ARG VERSION=""
ENV VERSION=${VERSION}
RUN set -ex \
&& apt-get update --fix-missing \
&& apt-get install -qy --no-install-recommends \
Expand All @@ -48,8 +50,9 @@ RUN go mod download
COPY . .

# And compile the project
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make build
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make build VERSION=${VERSION}
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make gen-p2p-identity
RUN --mount=type=cache,id=build,target=/root/.cache/go-build make merge-nodes VERSION=${VERSION}

# In this last stage, we start from a fresh image, to reduce the image size and not ship the Go compiler in our production artifacts.
FROM linux AS spacemesh
Expand All @@ -59,6 +62,7 @@ COPY --from=builder /src/build/go-spacemesh /bin/
COPY --from=builder /src/build/service /bin/
COPY --from=builder /src/build/libpost.so /bin/
COPY --from=builder /src/build/gen-p2p-identity /bin/
COPY --from=builder /src/build/merge-nodes /bin/

ENTRYPOINT ["/bin/go-spacemesh"]
EXPOSE 7513
Expand Down
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ get-libs: get-postrs-lib get-postrs-service

get-profiler: get-postrs-profiler

merge-nodes:
cd cmd/merge-nodes ; go build -o $(BIN_DIR)$@$(EXE) -ldflags "-X main.version=${VERSION}" .
.PHONY: merge-nodes

gen-p2p-identity:
cd cmd/gen-p2p-identity ; go build -o $(BIN_DIR)$@$(EXE) .
.PHONY: gen-p2p-identity
Expand Down Expand Up @@ -149,7 +153,10 @@ list-versions:
.PHONY: list-versions

dockerbuild-go:
DOCKER_BUILDKIT=1 docker build -t go-spacemesh:$(SHA) -t $(DOCKER_HUB)/$(DOCKER_IMAGE_REPO):$(DOCKER_IMAGE_VERSION) .
DOCKER_BUILDKIT=1 docker build \
--build-arg VERSION=${VERSION} \
-t go-spacemesh:$(SHA) \
-t $(DOCKER_HUB)/$(DOCKER_IMAGE_REPO):$(DOCKER_IMAGE_VERSION) .
.PHONY: dockerbuild-go

dockerpush: dockerbuild-go dockerpush-only
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestNewLocalServer(t *testing.T) {
svc := NewNodeService(peerCounter, meshApi, genTime, syncer, "v0.0.0", "cafebabe")
grpcService, err := NewWithServices(cfg.PostListener, logger, cfg, []ServiceAPI{svc})
if tc.warn {
require.Equal(t, observedLogs.Len(), 1, "Expected a warning log")
require.Equal(t, 1, observedLogs.Len(), "Expected a warning log")
require.Equal(t, observedLogs.All()[0].Message, "unsecured grpc server is listening on a public IP address")
require.Equal(t, observedLogs.All()[0].ContextMap()["address"], tc.listener)
return
Expand Down
10 changes: 10 additions & 0 deletions cmd/merge-nodes/internal/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package internal

import (
"errors"
)

var (
ErrSupervisedNode = errors.New("merging of supervised smeshing nodes is not supported")
ErrInvalidSchema = errors.New("database has an invalid schema version")
)
204 changes: 204 additions & 0 deletions cmd/merge-nodes/internal/merge_action.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package internal

import (
"context"
"encoding/hex"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"slices"

"go.uber.org/zap"

"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
)

const (
localDbFile = "local.sql"

keyDir = "identities"
supervisedIDKeyFileName = "local.key"
)

func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error {
// Open the target database
var dstDB *localsql.Database
var err error
dstDB, err = openDB(dbLog, to)
switch {
case errors.Is(err, fs.ErrNotExist):
// target database does not exist, create it
dbLog.Info("target database does not exist, creating it", zap.String("path", to))
if err := os.MkdirAll(to, 0o700); err != nil {
return fmt.Errorf("create target directory: %w", err)
}
if err := os.MkdirAll(filepath.Join(to, keyDir), 0o700); err != nil {
return fmt.Errorf("create target key directory: %w", err)
}

dstDB, err = localsql.Open("file:"+filepath.Join(to, localDbFile),
sql.WithLogger(dbLog),
)
if err != nil {
return err
}
defer dstDB.Close()
case err != nil:
fasmat marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("open target database: %w", err)
default:
defer dstDB.Close()
// target database exists, check if there is at least one key in the target key directory
// not named supervisedIDKeyFileName
if err := checkIdentities(dbLog, to); err != nil {
dbLog.Error("target appears to be a supervised node - only merging of remote smeshers is supported")
return err
}
}

// Open the source database
srcDB, err := openDB(dbLog, from)
if err != nil {
return fmt.Errorf("open source database: %w", err)
}
if err := srcDB.Close(); err != nil {
return fmt.Errorf("close source database: %w", err)
}

if err := checkIdentities(dbLog, from); err != nil {
dbLog.Error("source appears to be a supervised node - only merging of remote smeshers is supported")
return err
}

// copy files from `from` to `to`
dir := filepath.Join(from, keyDir)
err = filepath.WalkDir(dir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("failed to walk directory at %s: %w", path, err)
}

// skip subdirectories and files in them
if d.IsDir() && path != dir {
return fs.SkipDir
}

// skip files that are not identity files
if filepath.Ext(path) != ".key" {
return nil
}

signer, err := signing.NewEdSigner(
signing.FromFile(path),
)
if err != nil {
return fmt.Errorf("not a valid key file %s: %w", d.Name(), err)
}

dstPath := filepath.Join(to, keyDir, d.Name())
if _, err := os.Stat(dstPath); err == nil {
return fmt.Errorf("identity file %s already exists: %w", d.Name(), fs.ErrExist)
}

dst := make([]byte, hex.EncodedLen(len(signer.PrivateKey())))
hex.Encode(dst, signer.PrivateKey())
err = os.WriteFile(dstPath, dst, 0o600)
if err != nil {
return fmt.Errorf("failed to write identity file: %w", err)
}

dbLog.Info("copied identity",
zap.String("name", d.Name()),
)
return nil
})
if err != nil {
return err
}

dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to))
err = dstDB.WithTx(ctx, func(tx *sql.Tx) error {
enc := func(stmt *sql.Statement) {
stmt.BindText(1, filepath.Join(from, localDbFile))
}
if _, err := tx.Exec("ATTACH DATABASE ?1 AS srcDB;", enc, nil); err != nil {
return fmt.Errorf("attach source database: %w", err)
}
if _, err := tx.Exec("INSERT INTO main.initial_post SELECT * FROM srcDB.initial_post;", nil, nil); err != nil {
return fmt.Errorf("merge initial_post: %w", err)
}
if _, err := tx.Exec("INSERT INTO main.challenge SELECT * FROM srcDB.challenge;", nil, nil); err != nil {
return fmt.Errorf("merge challenge: %w", err)
}
if _, err := tx.Exec(
"INSERT INTO main.poet_registration SELECT * FROM srcDB.poet_registration;", nil, nil,
); err != nil {
return fmt.Errorf("merge poet_registration: %w", err)
}
if _, err := tx.Exec("INSERT INTO main.nipost SELECT * FROM srcDB.nipost;", nil, nil); err != nil {
return fmt.Errorf("merge nipost: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("start transaction: %w", err)
}

if err := dstDB.Close(); err != nil {
return fmt.Errorf("close target database: %w", err)
}
return nil
}

func openDB(dbLog *zap.Logger, path string) (*localsql.Database, error) {
dbPath := filepath.Join(path, localDbFile)
if _, err := os.Stat(dbPath); err != nil {
return nil, fmt.Errorf("open database %s: %w", dbPath, err)
}

migrations, err := sql.LocalMigrations()
if err != nil {
return nil, fmt.Errorf("get local migrations: %w", err)
}

db, err := localsql.Open("file:"+dbPath,
sql.WithLogger(dbLog),
sql.WithMigrations([]sql.Migration{}), // do not migrate database when opening
)
if err != nil {
return nil, fmt.Errorf("open source database %s: %w", dbPath, err)
}

// check if the source database has the right schema
var version int
_, err = db.Exec("PRAGMA user_version;", nil, func(stmt *sql.Statement) bool {
version = stmt.ColumnInt(0)
return true
})
if err != nil {
return nil, fmt.Errorf("get source database schema for %s: %w", dbPath, err)
}
if version != len(migrations) {
db.Close()
return nil, ErrInvalidSchema
}
return db, nil
}

func checkIdentities(dbLog *zap.Logger, path string) error {
dir := filepath.Join(path, keyDir)
if err := os.MkdirAll(dir, 0o700); err != nil {
return err
}

files, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("read target key directory: %w", err)
}
if slices.ContainsFunc(files, func(e fs.DirEntry) bool { return e.Name() == supervisedIDKeyFileName }) {
return ErrSupervisedNode
}
return nil
}
Loading
Loading