Skip to content

Commit

Permalink
feat: add init command (#572)
Browse files Browse the repository at this point in the history
* feat: add init command

* add tests
  • Loading branch information
robdefeo committed Feb 18, 2020
1 parent 0b524b2 commit 7a20ac9
Show file tree
Hide file tree
Showing 20 changed files with 712 additions and 107 deletions.
65 changes: 65 additions & 0 deletions cmd/indexer/commands/connections.go
@@ -0,0 +1,65 @@
package commands

import (
"fmt"

"github.com/jmoiron/sqlx"
"github.com/mailchain/mailchain/cmd/internal/datastore/pq"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

func newMasterConnection(cmd *cobra.Command) (*sqlx.DB, error) {
host, _ := cmd.Flags().GetString("postgres-host")
port, _ := cmd.Flags().GetInt("postgres-port")
sslmode, _ := cmd.Flags().GetString("postgres-sslmode")

user, err := cmd.Flags().GetString("master-postgres-user")
if err != nil {
return nil, err
}

password, err := cmd.Flags().GetString("master-postgres-password")
if err != nil {
return nil, err
}

db, err := sqlx.Connect("postgres", fmt.Sprintf(
"user=%s password=%s host=%s port=%d sslmode=%s",
user, password, host, port, sslmode))
if err != nil {
return nil, errors.Wrapf(err, "could not open connection: %s", host)
}

return db, nil
}

// newPostgresConnection returns a connection to a postgres database.
// The arguments are parsed from cmd.
func newPostgresConnection(cmd *cobra.Command, kind string) (*sqlx.DB, error) {
host, _ := cmd.Flags().GetString("postgres-host")
port, _ := cmd.Flags().GetInt("postgres-port")
sslmode, _ := cmd.Flags().GetString("postgres-sslmode")

user, _ := cmd.Flags().GetString(kind + "-postgres-user")
if user == "" {
return nil, errors.Errorf("flag must not be empty: %s-postgres-user", kind)
}

password, _ := cmd.Flags().GetString(kind + "-postgres-password")
if password == "" {
return nil, errors.Errorf("flag must not be empty: %s-postgres-password", kind)
}

dbname, _ := cmd.Flags().GetString(kind + "-postgres-name")
if dbname == "" {
return nil, errors.Errorf("flag must not be empty: %s-postgres-name", kind)
}

// use default dbname, if not provided
if dbname == "" {
dbname = user
}

return pq.NewConnection(user, password, dbname, host, sslmode, port)
}
154 changes: 154 additions & 0 deletions cmd/indexer/commands/database.go
@@ -0,0 +1,154 @@
package commands

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
libpq "github.com/lib/pq"
"github.com/mailchain/mailchain/cmd/internal/datastore/pq"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

func databaseCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "database",
Short: "configure database",
}

cmd.AddCommand(upCmd())
cmd.AddCommand(downCmd())

cmd.PersistentFlags().String("master-postgres-user", "", "Postgres database user")
cmd.PersistentFlags().String("master-postgres-password", "", "Postgres database password")

return cmd
}

func upCmd() *cobra.Command {
return &cobra.Command{
Use: "up",
Short: "bring database up to latest version",
RunE: func(cmd *cobra.Command, args []string) error {
masterConn, err := newMasterConnection(cmd)
if err != nil {
return err
}

if err := upsertDatabase(cmd, masterConn, "indexer"); err != nil {
return errors.WithStack(err)
}

if err := migrateDatabase(cmd, "indexer", true, pq.MigrateIndexer); err != nil {
return errors.WithStack(err)
}

if err := upsertDatabase(cmd, masterConn, "pubkey"); err != nil {
return errors.WithStack(err)
}

if err := migrateDatabase(cmd, "pubkey", true, pq.MigratePublicKey); err != nil {
return errors.WithStack(err)
}

if err := upsertDatabase(cmd, masterConn, "envelope"); err != nil {
return errors.WithStack(err)
}

if err := migrateDatabase(cmd, "envelope", true, pq.MigrateEnvelope); err != nil {
return errors.WithStack(err)
}

fmt.Printf("competed!\n")

return nil
},
}
}

func downCmd() *cobra.Command {
return &cobra.Command{
Use: "down",
Short: "migrate down the database",
RunE: func(cmd *cobra.Command, args []string) error {
if err := migrateDatabase(cmd, "indexer", false, pq.MigrateIndexer); err != nil {
return errors.WithStack(err)
}

if err := migrateDatabase(cmd, "pubkey", false, pq.MigratePublicKey); err != nil {
return errors.WithStack(err)
}

if err := migrateDatabase(cmd, "envelope", false, pq.MigrateEnvelope); err != nil {
return errors.WithStack(err)
}

fmt.Printf("competed!\n")

return nil
},
}
}

func upsertDatabase(cmd *cobra.Command, connMaster *sqlx.DB, kind string) error {
var err error

password, _ := cmd.Flags().GetString(kind + "-postgres-password")
if password == "" {
return errors.Errorf("must not be empty: %s-postgres-password", kind)
}

user, _ := cmd.Flags().GetString(kind + "-postgres-user")
name, _ := cmd.Flags().GetString(kind + "-postgres-name")

_, err = connMaster.Exec(fmt.Sprintf("CREATE USER %s WITH PASSWORD '%s';", user, password))
if err != nil {
if pqErr, ok := err.(*libpq.Error); !ok || pqErr.Code != "42710" {
return err
}

fmt.Fprintf(cmd.OutOrStdout(), "skipping create user, already exists user: %s\n", user)
} else {
fmt.Fprintf(cmd.OutOrStdout(), "user created: %s\n", user)
}

_, err = connMaster.Exec(fmt.Sprintf("CREATE DATABASE %s;", name))
if err != nil {
if pqErr, ok := err.(*libpq.Error); !ok || pqErr.Code != "42P04" {
return err
}

fmt.Fprintf(cmd.OutOrStdout(), "skipping create database, already exists database: %s\n", name)
} else {
fmt.Fprintf(cmd.OutOrStdout(), "database created: %s\n", name)
}

_, err = connMaster.Exec(fmt.Sprintf("GRANT ALL PRIVILEGES ON DATABASE %s TO %s;", name, user))
if err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "skipping grant privileges, already exists on database: %s\n", name)
} else {
fmt.Fprintf(cmd.OutOrStdout(), "privileges granted on database: %s\n", name)
}

return nil
}

func migrateDatabase(cmd *cobra.Command, name string, up bool, migrateFunc func(db *sql.DB, up bool) (int, error)) error {
conn, err := newPostgresConnection(cmd, name)
if err != nil {
return errors.WithStack(err)
}

defer conn.Close()

n, err := migrateFunc(conn.DB, up)
if err != nil {
return errors.WithStack(err)
}

fmt.Fprintf(cmd.OutOrStdout(), "migrated database: %s\n", name)
fmt.Fprintf(cmd.OutOrStdout(), "applied migration files: %d\n", n)

return nil
}
64 changes: 19 additions & 45 deletions cmd/indexer/commands/ethereum.go
Expand Up @@ -33,14 +33,26 @@ func ethereumCmd() *cobra.Command {

rawStorePath, _ := cmd.Flags().GetString("raw-store-path")

conn, err := newPostgresConnection(cmd)
connIndexer, err := newPostgresConnection(cmd, "indexer")
if err != nil {
return err
}

defer conn.Close()
connPublicKey, err := newPostgresConnection(cmd, "pubkey")
if err != nil {
return err
}

connEnvelope, err := newPostgresConnection(cmd, "envelope")
if err != nil {
return err
}

defer connIndexer.Close()
defer connPublicKey.Close()
defer connEnvelope.Close()

seqProcessor, err := createEthereumProcessor(conn, blockNumber, protocol, network, rawStorePath, addressRPC)
seqProcessor, err := createEthereumProcessor(connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network, rawStorePath, addressRPC)
if err != nil {
return err
}
Expand All @@ -63,7 +75,7 @@ func ethereumCmd() *cobra.Command {
return cmd
}

func createEthereumProcessor(conn *sqlx.DB, blockNumber uint64, protocol, network, rawStorePath, addressRPC string) (*processor.Sequential, error) {
func createEthereumProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber uint64, protocol, network, rawStorePath, addressRPC string) (*processor.Sequential, error) {
ctx := context.Background()

ethClient, err := eth.NewRPC(addressRPC)
Expand All @@ -85,17 +97,17 @@ func createEthereumProcessor(conn *sqlx.DB, blockNumber uint64, protocol, networ
return nil, errors.Errorf("networkID from RPC does not match chain config network ID")
}

syncStore, err := pq.NewSyncStore(conn)
syncStore, err := pq.NewSyncStore(connIndexer)
if err != nil {
return nil, err
}

pubKeyStore, err := pq.NewPublicKeyStore(conn)
pubKeyStore, err := pq.NewPublicKeyStore(connPublicKey)
if err != nil {
return nil, err
}

transactionStore, err := pq.NewTransactionStore(conn)
transactionStore, err := pq.NewTransactionStore(connEnvelope)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -137,41 +149,3 @@ func chainConfig(network string) (*params.ChainConfig, error) {
return nil, errors.Errorf("can not determine chain config from network: %s", network)
}
}

func sslMode(useSSL bool) string {
if useSSL {
return "enable"
}

return "disable"
}

// newPostgresConnection returns a connection to a postgres database.
// The arguments are parsed from cmd.
func newPostgresConnection(cmd *cobra.Command) (*sqlx.DB, error) {
host, _ := cmd.Flags().GetString("postgres-host")
port, _ := cmd.Flags().GetInt("postgres-port")
useSSL, _ := cmd.Flags().GetBool("postgres-ssl")

user, err := cmd.Flags().GetString("postgres-user")
if err != nil {
return nil, err
}

psswd, err := cmd.Flags().GetString("postgres-password")
if err != nil {
return nil, err
}

dbname, err := cmd.Flags().GetString("postgres-name")
if err != nil {
return nil, err
}

// use default dbname, if not provided
if dbname == "" {
dbname = user
}

return pq.NewConnection(user, psswd, dbname, host, sslMode(useSSL), port)
}
18 changes: 14 additions & 4 deletions cmd/indexer/commands/root.go
Expand Up @@ -11,14 +11,24 @@ func rootCmd() *cobra.Command {
}

cmd.AddCommand(ethereumCmd())
cmd.AddCommand(databaseCmd())

cmd.PersistentFlags().String("postgres-host", "localhost", "Postgres server host")
cmd.PersistentFlags().String("postgres-user", "", "Postgres database user")
cmd.PersistentFlags().String("postgres-password", "", "Postgres database password")
cmd.PersistentFlags().String("postgres-name", "", "Postgres database name")
cmd.PersistentFlags().Bool("postgres-ssl", false, "Use SSL when connecting to Postgres")
cmd.PersistentFlags().String("postgres-sslmode", "disable", "Use SSL when connecting to Postgres")
cmd.PersistentFlags().Int("postgres-port", 5432, "Postgres server port")

cmd.PersistentFlags().String("indexer-postgres-user", "indexer", "Indexer postgres database user")
cmd.PersistentFlags().String("indexer-postgres-password", "", "Indexer postgres database password")
cmd.PersistentFlags().String("indexer-postgres-name", "indexer", "Indexer postgres database name")

cmd.PersistentFlags().String("pubkey-postgres-user", "pubkey", "Public key postgres database user")
cmd.PersistentFlags().String("pubkey-postgres-password", "", "Public key postgres database password")
cmd.PersistentFlags().String("pubkey-postgres-name", "pubkey", "Public key postgres database name")

cmd.PersistentFlags().String("envelope-postgres-user", "envelope", "Envelopes postgres database user")
cmd.PersistentFlags().String("envelope-postgres-password", "", "Envelopes postgres database password")
cmd.PersistentFlags().String("envelope-postgres-name", "envelope", "Envelopes postgres database name")

cmd.PersistentFlags().String("raw-store-path", "", "Path where raw transactions are stored")

return cmd
Expand Down
1 change: 0 additions & 1 deletion cmd/indexer/db/migrations/000001_public-key-index.down.sql

This file was deleted.

14 changes: 0 additions & 14 deletions cmd/indexer/db/migrations/000001_public-key-index.up.sql

This file was deleted.

1 change: 0 additions & 1 deletion cmd/indexer/db/migrations/000002_sync.down.sql

This file was deleted.

11 changes: 0 additions & 11 deletions cmd/indexer/db/migrations/000002_sync.up.sql

This file was deleted.

1 change: 0 additions & 1 deletion cmd/indexer/db/migrations/000003_transactions.down.sql

This file was deleted.

0 comments on commit 7a20ac9

Please sign in to comment.