Skip to content

Commit

Permalink
Cleanup jobs refactor (centrifuge#994)
Browse files Browse the repository at this point in the history
  • Loading branch information
vedhavyas authored and mikiquantum committed Jun 25, 2019
1 parent 7619341 commit 097feb7
Show file tree
Hide file tree
Showing 44 changed files with 354 additions and 394 deletions.
6 changes: 0 additions & 6 deletions anchors/anchor_repository_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestPreCommitAnchor_Integration(t *testing.T) {
t.Parallel()
anchorID := utils.RandomSlice(32)
signingRoot := utils.RandomSlice(32)

anchorIDTyped, err := anchors.ToAnchorID(anchorID)
assert.NoError(t, err)
preCommitAnchor(t, anchorID, signingRoot)
Expand Down Expand Up @@ -110,11 +109,8 @@ func commitAnchor(t *testing.T, anchorID, documentRoot []byte, documentProof [32

ctx := testingconfig.CreateAccountContext(t, cfg)
done, err := anchorRepo.CommitAnchor(ctx, anchorIDTyped, docRootTyped, documentProof)

isDone := <-done

assert.True(t, isDone, "isDone should be true")

assert.Nil(t, err)
}

Expand All @@ -125,9 +121,7 @@ func preCommitAnchor(t *testing.T, anchorID, documentRoot []byte) {

ctx := testingconfig.CreateAccountContext(t, cfg)
done, err := anchorRepo.PreCommitAnchor(ctx, anchorIDTyped, docRootTyped)

isDone := <-done

assert.True(t, isDone, "isDone should be true")
assert.Nil(t, err)
}
Expand Down
2 changes: 0 additions & 2 deletions anchors/anchor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import (
)

var ctx = map[string]interface{}{}
var cfg Config

func TestMain(m *testing.M) {
ibootstappers := []bootstrap.TestBootstrapper{
&config.Bootstrapper{},
}
bootstrap.RunTestBootstrappers(ibootstappers, ctx)
cfg = ctx[bootstrap.BootstrappedConfig].(Config)
result := m.Run()
bootstrap.RunTestTeardown(ibootstappers)
os.Exit(result)
Expand Down
6 changes: 3 additions & 3 deletions anchors/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ func (Bootstrapper) Bootstrap(ctx map[string]interface{}) error {
return err
}

txManager, ok := ctx[jobs.BootstrappedService].(jobs.Manager)
jobsMan, ok := ctx[jobs.BootstrappedService].(jobs.Manager)
if !ok {
return errors.New("transactions repository not initialised")
return errors.New("jobs repository not initialised")
}

queueSrv, ok := ctx[bootstrap.BootstrappedQueueServer].(*queue.Server)
if !ok {
return errors.New("queue hasn't been initialized")
}

repo := newService(cfg, repositoryContract, queueSrv, client, txManager)
repo := newService(cfg, repositoryContract, queueSrv, client, jobsMan)
ctx[BootstrappedAnchorRepo] = repo

return nil
Expand Down
16 changes: 8 additions & 8 deletions anchors/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ type service struct {
anchorRepositoryContract anchorRepositoryContract
client ethereum.Client
queue *queue.Server
txManager jobs.Manager
jobsMan jobs.Manager
}

func newService(config Config, anchorContract anchorRepositoryContract, queue *queue.Server, client ethereum.Client, txManager jobs.Manager) AnchorRepository {
return &service{config: config, anchorRepositoryContract: anchorContract, client: client, queue: queue, txManager: txManager}
func newService(config Config, anchorContract anchorRepositoryContract, queue *queue.Server, client ethereum.Client, jobsMan jobs.Manager) AnchorRepository {
return &service{config: config, anchorRepositoryContract: anchorContract, client: client, queue: queue, jobsMan: jobsMan}
}

// HasValidPreCommit checks if the given anchorID has a valid pre-commit
Expand Down Expand Up @@ -94,7 +94,7 @@ func (s *service) PreCommitAnchor(ctx context.Context, anchorID AnchorID, signin
}

log.Infof("Add Anchor to Pre-commit %s from did:%s", anchorID.String(), did.ToAddress().String())
_, done, err := s.txManager.ExecuteWithinJob(ctx, did, jobID, "Check Job for anchor commit",
_, done, err := s.jobsMan.ExecuteWithinJob(ctx, did, jobID, "Check Job for anchor commit",
s.ethereumTX(opts, s.anchorRepositoryContract.PreCommit, pc.AnchorID.BigInt(), pc.SigningRoot))
if err != nil {
return nil, err
Expand All @@ -104,8 +104,8 @@ func (s *service) PreCommitAnchor(ctx context.Context, anchorID AnchorID, signin
}

// ethereumTX is submitting an Ethereum transaction and starts a task to wait for the transaction result
func (s service) ethereumTX(opts *bind.TransactOpts, contractMethod interface{}, params ...interface{}) func(accountID identity.DID, jobID jobs.JobID, txMan jobs.Manager, errOut chan<- error) {
return func(accountID identity.DID, jobID jobs.JobID, txMan jobs.Manager, errOut chan<- error) {
func (s service) ethereumTX(opts *bind.TransactOpts, contractMethod interface{}, params ...interface{}) func(accountID identity.DID, jobID jobs.JobID, jobsMan jobs.Manager, errOut chan<- error) {
return func(accountID identity.DID, jobID jobs.JobID, jobMan jobs.Manager, errOut chan<- error) {
ethTX, err := s.client.SubmitTransactionWithRetries(contractMethod, opts, params...)
if err != nil {
errOut <- err
Expand All @@ -118,7 +118,7 @@ func (s service) ethereumTX(opts *bind.TransactOpts, contractMethod interface{},
return
}

_, err = res.Get(txMan.GetDefaultTaskTimeout())
_, err = res.Get(jobMan.GetDefaultTaskTimeout())
if err != nil {
errOut <- err
return
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *service) CommitAnchor(ctx context.Context, anchorID AnchorID, documentR
cd := NewCommitData(h.Number.Uint64(), anchorID, documentRoot, proof)

log.Infof("Add Anchor to Commit %s from did:%s", anchorID.String(), did.ToAddress().String())
_, done, err := s.txManager.ExecuteWithinJob(ctx, did, jobID, "Check Job for anchor commit",
_, done, err := s.jobsMan.ExecuteWithinJob(ctx, did, jobID, "Check Job for anchor commit",
s.ethereumTX(opts, s.anchorRepositoryContract.Commit, cd.AnchorID.BigInt(), cd.DocumentRoot, cd.DocumentProof))
if err != nil {
return nil, err
Expand Down
61 changes: 20 additions & 41 deletions anchors/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,36 @@
package anchors

import (
"math/big"
"testing"

"github.com/centrifuge/go-centrifuge/crypto/secp256k1"
"github.com/centrifuge/go-centrifuge/identity"
"github.com/centrifuge/go-centrifuge/utils"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type mockAnchorRepo struct {
mock.Mock
anchorRepositoryContract
}

func (m *mockAnchorRepo) GetAnchorById(opts *bind.CallOpts, anchorID *big.Int) (struct {
AnchorId *big.Int
DocumentRoot [32]byte
BlockNumber uint32
}, error) {
args := m.Called(opts, anchorID)
type Response struct {
AnchorId *big.Int
DocumentRoot [32]byte
BlockNumber uint32
}
r := Response{}
dr := args.Get(0).([32]byte)
r.DocumentRoot = dr

return r, args.Error(1)
}

func TestCorrectCommitSignatureGen(t *testing.T) {
anchorID, _ := hexutil.Decode("0x154cc26833dec2f4ad7ead9d65f9ec968a1aa5efbf6fe762f8f2a67d18a2d9b1")
documentRoot, _ := hexutil.Decode("0x65a35574f70281ae4d1f6c9f3adccd5378743f858c67a802a49a08ce185bc975")
address, _ := hexutil.Decode("0x89b0a86583c4444acfd71b463e0d3c55ae1412a5")
anchorID, err := hexutil.Decode("0x154cc26833dec2f4ad7ead9d65f9ec968a1aa5efbf6fe762f8f2a67d18a2d9b1")
assert.NoError(t, err)
documentRoot, err := hexutil.Decode("0x65a35574f70281ae4d1f6c9f3adccd5378743f858c67a802a49a08ce185bc975")
assert.NoError(t, err)
address, err := hexutil.Decode("0x89b0a86583c4444acfd71b463e0d3c55ae1412a5")
assert.NoError(t, err)
correctCommitToSign := "0x004a050342f1edda2462288b9e0123a2e1bcc4f978efdc08c07bbf0c3ccc8ddd"
correctCommitSignature := "0x4a73286521114f528967674bae4ecdc6cc94789255495429a7f58ca3ef0158ae257dd02a0ccb71d817e480d06f60f640ec021ade2ff90fe601bb7a5f4ddc569700"
testPrivateKey, _ := hexutil.Decode("0x17e063fa17dd8274b09c14b253697d9a20afff74ace3c04fdb1b9c814ce0ada5")
anchorIDTyped, _ := ToAnchorID(anchorID)
centIdTyped, err := identity.NewDIDFromBytes(address)
testPrivateKey, err := hexutil.Decode("0x17e063fa17dd8274b09c14b253697d9a20afff74ace3c04fdb1b9c814ce0ada5")
assert.NoError(t, err)
anchorIDTyped, err := ToAnchorID(anchorID)
assert.NoError(t, err)
docRootTyped, _ := ToDocumentRoot(documentRoot)
messageToSign := GenerateCommitHash(anchorIDTyped, centIdTyped, docRootTyped)
didTyped, err := identity.NewDIDFromBytes(address)
assert.NoError(t, err)
docRootTyped, err := ToDocumentRoot(documentRoot)
assert.NoError(t, err)
messageToSign := GenerateCommitHash(anchorIDTyped, didTyped, docRootTyped)
assert.Equal(t, correctCommitToSign, hexutil.Encode(messageToSign), "messageToSign not calculated correctly")
signature, _ := secp256k1.SignEthereum(messageToSign, testPrivateKey)
signature, err := secp256k1.SignEthereum(messageToSign, testPrivateKey)
assert.NoError(t, err)
assert.Equal(t, correctCommitSignature, hexutil.Encode(signature), "signature not correct")
}

Expand All @@ -64,13 +45,11 @@ func TestGenerateAnchor(t *testing.T) {
copy(documentRoot32Bytes[:], currentDocumentRoot[:32])

commitData := NewCommitData(0, currentAnchorID, documentRoot32Bytes, documentProof)

anchorID, _ := ToAnchorID(currentAnchorID[:])
docRoot, _ := ToDocumentRoot(documentRoot32Bytes[:])

anchorID, err := ToAnchorID(currentAnchorID[:])
assert.NoError(t, err)
docRoot, err := ToDocumentRoot(documentRoot32Bytes[:])
assert.NoError(t, err)
assert.Equal(t, commitData.AnchorID, anchorID, "Anchor should have the passed ID")
assert.Equal(t, commitData.DocumentRoot, docRoot, "Anchor should have the passed document root")

assert.Equal(t, commitData.DocumentProof, documentProof, "Anchor should have the document proofs")

}
2 changes: 0 additions & 2 deletions anchors/test_bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/centrifuge/go-centrifuge/bootstrap"
)

const BootstrappedAnchorRepository string = "BootstrappedAnchorRepository"

func (b Bootstrapper) TestBootstrap(context map[string]interface{}) error {
if _, ok := context[bootstrap.BootstrappedConfig]; !ok {
return errors.New("config hasn't been initialized")
Expand Down
8 changes: 4 additions & 4 deletions api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func registerServices(ctx context.Context, cfg Config, grpcServer *grpc.Server,
return errors.New("failed to get %s", config.BootstrappedConfigStorage)
}

InvoiceUnpaidService, ok := nodeObjReg[bootstrap.BootstrappedInvoiceUnpaid].(nft.InvoiceUnpaid)
invoiceUnpaidService, ok := nodeObjReg[bootstrap.BootstrappedInvoiceUnpaid].(nft.InvoiceUnpaid)
if !ok {
return errors.New("failed to get %s", bootstrap.BootstrappedInvoiceUnpaid)
}
Expand All @@ -64,7 +64,7 @@ func registerServices(ctx context.Context, cfg Config, grpcServer *grpc.Server,
}

// register other api endpoints
err = registerAPIs(ctx, cfg, InvoiceUnpaidService, configService, nodeObjReg, grpcServer, gwmux, addr, dopts)
err = registerAPIs(ctx, cfg, invoiceUnpaidService, configService, nodeObjReg, grpcServer, gwmux, addr, dopts)
if err != nil {
return err
}
Expand Down Expand Up @@ -97,8 +97,8 @@ func registerAPIs(ctx context.Context, cfg Config, InvoiceUnpaidService nft.Invo
}

// transactions
txSrv := nodeObjReg[jobs.BootstrappedService].(jobs.Manager)
h := jobsv1.GRPCHandler(txSrv, configService)
jobsMan := nodeObjReg[jobs.BootstrappedService].(jobs.Manager)
h := jobsv1.GRPCHandler(jobsMan, configService)
jobspb.RegisterJobServiceServer(grpcServer, h)
return jobspb.RegisterJobServiceHandlerFromEndpoint(ctx, gwmux, addr, dopts)
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/centrifuge/generate_signing_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ func init() {
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
if createSigningKeyParam {
crypto.GenerateSigningKeyPair(publicKeyFileParam, privateKeyFileParam, curveTypeParam)
err := crypto.GenerateSigningKeyPair(publicKeyFileParam, privateKeyFileParam, curveTypeParam)
if err != nil {
log.Fatal(err)
}
}

if createEncryptionKeyParam {
panic("Not implemented")
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/centrifuge/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func Execute() {
gologging.SetBackend(backend)

if err := rootCmd.Execute(); err != nil {
log.Error(err)
os.Exit(1)
log.Fatal(err)
}
}

Expand Down
2 changes: 0 additions & 2 deletions cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stretchr/testify/assert"
)

var cfg config.Configuration
var ctx = map[string]interface{}{}

func TestMain(m *testing.M) {
Expand All @@ -44,7 +43,6 @@ func TestMain(m *testing.M) {
}

bootstrap.RunTestBootstrappers(bootstrappers, ctx)
cfg = ctx[bootstrap.BootstrappedConfig].(config.Configuration)
result := m.Run()
bootstrap.RunTestTeardown(bootstrappers)
os.Exit(result)
Expand Down
8 changes: 5 additions & 3 deletions config/configstore/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,21 @@ func TestMain(m *testing.M) {
}

func TestNewLevelDBRepository(t *testing.T) {
repo, _, _ := getRandomStorage()
repo, _, err := getRandomStorage()
assert.NoError(t, err)
assert.NotNil(t, repo)
}

func TestUnregisteredModel(t *testing.T) {
repo, _, _ := getRandomStorage()
repo, _, err := getRandomStorage()
assert.NoError(t, err)
assert.NotNil(t, repo)
id := utils.RandomSlice(32)
newaccount := &Account{
IdentityID: id,
EthereumDefaultAccountName: "main",
}
err := repo.CreateAccount(id, newaccount)
err = repo.CreateAccount(id, newaccount)
assert.Nil(t, err)

// Error on non registered model
Expand Down
3 changes: 2 additions & 1 deletion config/configstore/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestMain(m *testing.M) {
func TestService_GenerateAccountHappy(t *testing.T) {
tct, err := cfgSvc.GenerateAccount()
assert.NoError(t, err)
i, _ := tct.GetIdentityID()
i, err := tct.GetIdentityID()
assert.NoError(t, err)
tc, err := cfgSvc.GetAccount(i)
assert.NoError(t, err)
assert.NotNil(t, tc)
Expand Down
8 changes: 4 additions & 4 deletions contextutil/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (

self = contextKey("self")

tx = contextKey("tx")
job = contextKey("job")
)

// New creates new instance of the request headers.
Expand All @@ -31,16 +31,16 @@ func New(ctx context.Context, cfg config.Account) (context.Context, error) {

// WithJob returns a context with Job ID
func WithJob(ctx context.Context, jobID jobs.JobID) context.Context {
return context.WithValue(ctx, tx, jobID)
return context.WithValue(ctx, job, jobID)
}

// Job returns current jobID
func Job(ctx context.Context) jobs.JobID {
tid, ok := ctx.Value(tx).(jobs.JobID)
jobID, ok := ctx.Value(job).(jobs.JobID)
if !ok {
return jobs.NilJobID()
}
return tid
return jobID
}

// AccountDID extracts the AccountConfig DID from the given context value
Expand Down
16 changes: 8 additions & 8 deletions documents/anchor_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ func (d *documentAnchorTask) RunTask() (res interface{}, err error) {
}

// InitDocumentAnchorTask enqueues a new document anchor task for a given combination of accountID/modelID/txID.
func InitDocumentAnchorTask(jobMan jobs.Manager, tq queue.TaskQueuer, accountID identity.DID, modelID []byte, txID jobs.JobID) (queue.TaskResult, error) {
func InitDocumentAnchorTask(jobMan jobs.Manager, tq queue.TaskQueuer, accountID identity.DID, modelID []byte, jobID jobs.JobID) (queue.TaskResult, error) {
params := map[string]interface{}{
jobs.JobIDParam: txID.String(),
jobs.JobIDParam: jobID.String(),
DocumentIDParam: hexutil.Encode(modelID),
AccountIDParam: accountID.String(),
}

err := jobMan.UpdateTaskStatus(accountID, txID, jobs.Pending, documentAnchorTaskName, "init")
err := jobMan.UpdateTaskStatus(accountID, jobID, jobs.Pending, documentAnchorTaskName, "init")
if err != nil {
return nil, err
}
Expand All @@ -141,15 +141,15 @@ func InitDocumentAnchorTask(jobMan jobs.Manager, tq queue.TaskQueuer, accountID
return tr, nil
}

// CreateAnchorTransaction creates a transaction for anchoring a document using transaction manager
func CreateAnchorTransaction(jobMan jobs.Manager, tq queue.TaskQueuer, self identity.DID, jobID jobs.JobID, documentID []byte) (jobs.JobID, chan bool, error) {
jobID, done, err := jobMan.ExecuteWithinJob(context.Background(), self, jobID, "anchor document", func(accountID identity.DID, TID jobs.JobID, txMan jobs.Manager, errChan chan<- error) {
tr, err := InitDocumentAnchorTask(txMan, tq, accountID, documentID, TID)
// CreateAnchorJob creates a job for anchoring a document using jobs manager
func CreateAnchorJob(jobsMan jobs.Manager, tq queue.TaskQueuer, self identity.DID, jobID jobs.JobID, documentID []byte) (jobs.JobID, chan bool, error) {
jobID, done, err := jobsMan.ExecuteWithinJob(context.Background(), self, jobID, "anchor document", func(accountID identity.DID, jobID jobs.JobID, jobsMan jobs.Manager, errChan chan<- error) {
tr, err := InitDocumentAnchorTask(jobsMan, tq, accountID, documentID, jobID)
if err != nil {
errChan <- err
return
}
_, err = tr.Get(txMan.GetDefaultTaskTimeout())
_, err = tr.Get(jobsMan.GetDefaultTaskTimeout())
if err != nil {
errChan <- err
return
Expand Down

0 comments on commit 097feb7

Please sign in to comment.