Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/myst-572 node rejects unregistered consumers #311

Merged
merged 9 commits into from Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 22 additions & 9 deletions cmd/commands/server/command_server.go
Expand Up @@ -22,8 +22,10 @@ import (
"time"

log "github.com/cihub/seelog"
"github.com/mysterium/node/blockchain"
"github.com/mysterium/node/communication"
"github.com/mysterium/node/identity"
"github.com/mysterium/node/identity/registry"
"github.com/mysterium/node/ip"
"github.com/mysterium/node/location"
"github.com/mysterium/node/metadata"
Expand All @@ -39,14 +41,15 @@ import (

// Command represent entrypoint for Mysterium server with top level components
type Command struct {
identityLoader func() (identity.Identity, error)
createSigner identity.SignerFactory
ipResolver ip.Resolver
mysteriumClient server.Client
natService nat.NATService
locationResolver location.Resolver

dialogWaiterFactory func(identity identity.Identity) communication.DialogWaiter
networkDefinition metadata.NetworkDefinition
identityLoader func() (identity.Identity, error)
createSigner identity.SignerFactory
ipResolver ip.Resolver
mysteriumClient server.Client
natService nat.NATService
locationResolver location.Resolver

dialogWaiterFactory func(identity identity.Identity, identityRegistry registry.IdentityRegistry) communication.DialogWaiter
dialogWaiter communication.DialogWaiter

sessionManagerFactory func(primitives *tls.Primitives, serverIP string) session.Manager
Expand Down Expand Up @@ -80,7 +83,17 @@ func (cmd *Command) Start() (err error) {
return err
}

cmd.dialogWaiter = cmd.dialogWaiterFactory(providerID)
ethClient, err := blockchain.NewClient(cmd.networkDefinition.EtherClientRPC)
if err != nil {
return err
}

identityRegistry, err := registry.NewIdentityRegistry(ethClient, cmd.networkDefinition.PaymentsContractAddress)
if err != nil {
return err
}

cmd.dialogWaiter = cmd.dialogWaiterFactory(providerID, identityRegistry)
providerContact, err := cmd.dialogWaiter.Start()
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion cmd/commands/server/factory.go
Expand Up @@ -27,6 +27,7 @@ import (
nats_dialog "github.com/mysterium/node/communication/nats/dialog"
nats_discovery "github.com/mysterium/node/communication/nats/discovery"
"github.com/mysterium/node/identity"
"github.com/mysterium/node/identity/registry"
"github.com/mysterium/node/ip"
"github.com/mysterium/node/location"
"github.com/mysterium/node/nat"
Expand Down Expand Up @@ -66,6 +67,7 @@ func NewCommand(options CommandOptions) *Command {
locationResolver := locationResolver(options)

return &Command{
networkDefinition: networkDefinition,
identityLoader: func() (identity.Identity, error) {
return identity_handler.LoadIdentity(identityHandler, options.Identity, options.Passphrase)
},
Expand All @@ -74,10 +76,11 @@ func NewCommand(options CommandOptions) *Command {
ipResolver: ipResolver,
mysteriumClient: mysteriumClient,
natService: natService,
dialogWaiterFactory: func(myID identity.Identity) communication.DialogWaiter {
dialogWaiterFactory: func(myID identity.Identity, identityRegistry registry.IdentityRegistry) communication.DialogWaiter {
return nats_dialog.NewDialogWaiter(
nats_discovery.NewAddressGenerate(networkDefinition.BrokerAddress, myID),
identity.NewSigner(keystoreInstance, myID),
identityRegistry,
)
},

Expand Down
43 changes: 34 additions & 9 deletions communication/nats/dialog/dialog_waiter.go
Expand Up @@ -22,27 +22,31 @@ import (
"github.com/mysterium/node/communication"

log "github.com/cihub/seelog"
"github.com/ethereum/go-ethereum/common"
"github.com/mysterium/node/communication/nats"
"github.com/mysterium/node/communication/nats/discovery"
"github.com/mysterium/node/identity"
"github.com/mysterium/node/identity/registry"
dto_discovery "github.com/mysterium/node/service_discovery/dto"
)

// NewDialogWaiter constructs new DialogWaiter which works thru NATS connection.
func NewDialogWaiter(address *discovery.AddressNATS, signer identity.Signer) *dialogWaiter {
func NewDialogWaiter(address *discovery.AddressNATS, signer identity.Signer, identityRegistry registry.IdentityRegistry) *dialogWaiter {
return &dialogWaiter{
myAddress: address,
mySigner: signer,
dialogs: make([]communication.Dialog, 0),
myAddress: address,
mySigner: signer,
dialogs: make([]communication.Dialog, 0),
identityRegistry: identityRegistry,
}
}

const waiterLogPrefix = "[NATS.DialogWaiter] "

type dialogWaiter struct {
myAddress *discovery.AddressNATS
mySigner identity.Signer
dialogs []communication.Dialog
myAddress *discovery.AddressNATS
mySigner identity.Signer
dialogs []communication.Dialog
identityRegistry registry.IdentityRegistry
}

func (waiter *dialogWaiter) Start() (dto_discovery.Contact, error) {
Expand All @@ -67,13 +71,21 @@ func (waiter *dialogWaiter) Stop() error {

func (waiter *dialogWaiter) ServeDialogs(dialogHandler communication.DialogHandler) error {
createDialog := func(request *dialogCreateRequest) (*dialogCreateResponse, error) {
if request.PeerID == "" {

valid, err := waiter.validateDialogRequest(request)
if err != nil {
log.Error(waiterLogPrefix, "Validation check failed: ", err.Error())
return &responseInternalError, nil
}
if !valid {
log.Error(waiterLogPrefix, "Rejecting invalid peerID: ", request.PeerID)
return &responseInvalidIdentity, nil
}

peerID := identity.FromAddress(request.PeerID)

dialog := waiter.newDialogToPeer(peerID, waiter.newCodecForPeer(peerID))
err := dialogHandler.Handle(dialog)
err = dialogHandler.Handle(dialog)
if err != nil {
log.Error(waiterLogPrefix, fmt.Sprintf("Failed dialog from: '%s'. %s", request.PeerID, err))
return &responseInternalError, nil
Expand Down Expand Up @@ -110,3 +122,16 @@ func (waiter *dialogWaiter) newDialogToPeer(peerID identity.Identity, peerCodec
Receiver: nats.NewReceiver(waiter.myAddress.GetConnection(), peerCodec, subTopic),
}
}

func (waiter *dialogWaiter) validateDialogRequest(request *dialogCreateRequest) (bool, error) {
if request.PeerID == "" {
return false, nil
}
registered, err := waiter.identityRegistry.IsRegistered(common.HexToAddress(request.PeerID))
if err != nil {
log.Warn(waiterLogPrefix, "Unregistered peerID: ", request.PeerID)
return false, err
}

return registered, nil
}
56 changes: 55 additions & 1 deletion communication/nats/dialog/dialog_waiter_test.go
Expand Up @@ -19,10 +19,12 @@ package dialog

import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/mysterium/node/communication"
"github.com/mysterium/node/communication/nats"
"github.com/mysterium/node/communication/nats/discovery"
"github.com/mysterium/node/identity"
"github.com/mysterium/node/identity/registry"
"github.com/stretchr/testify/assert"
"testing"
"time"
Expand All @@ -36,7 +38,7 @@ func TestDialogWaiter_Factory(t *testing.T) {
address := discovery.NewAddress("custom", "nats://far-server:4222")
signer := &identity.SignerFake{}

waiter := NewDialogWaiter(address, signer)
waiter := NewDialogWaiter(address, signer, &mockedIdentityRegistry{})
assert.NotNil(t, waiter)
assert.Equal(t, address, waiter.myAddress)
assert.Equal(t, signer, waiter.mySigner)
Expand Down Expand Up @@ -94,11 +96,52 @@ func TestDialogWaiter_ServeDialogsRejectInvalidSignature(t *testing.T) {
assert.Nil(t, dialogInstance)
}

func TestDialogWaiter_ServeDialogsRejectUnregisteredConsumers(t *testing.T) {
connection := nats.StartConnectionFake()
defer connection.Close()

signer := &identity.SignerFake{}

mockedRegistry := &mockedIdentityRegistry{
anyIdentityRegistered: false,
}

mockeDialogHandler := &dialogHandler{
dialogReceived: make(chan communication.Dialog),
}

waiter := NewDialogWaiter(discovery.NewAddressWithConnection(connection, "test-topic"), signer, mockedRegistry)

err := waiter.ServeDialogs(mockeDialogHandler)
assert.NoError(t, err)

msg, err := connection.Request("test-topic.dialog-create", []byte(`{
"payload": {"peer_id":"0x28bf83df144ab7a566bc8509d1fff5d5470bd4ea"},
"signature": "tl+WbYkJdXD5foaIP3bqVGFHfr6kdd5FzmJAmu1GdpINEnNR3bTto6wgEoke/Fpy4zsWOjrulDVfrc32f5ArTgA="
}`), 100*time.Millisecond)
assert.NoError(t, err)

assert.JSONEq(
t,
`{
"payload": {
"reason":400,
"reasonMessage":"Invalid identity"
},
"signature":"c2lnbmVkeyJyZWFzb24iOjQwMCwicmVhc29uTWVzc2FnZSI6IkludmFsaWQgaWRlbnRpdHkifQ=="
}`,
string(msg.Data),
)
}

func dialogServe(connection nats.Connection, mySigner identity.Signer) (waiter *dialogWaiter, handler *dialogHandler) {
myTopic := "my-topic"
waiter = &dialogWaiter{
myAddress: discovery.NewAddressWithConnection(connection, myTopic),
mySigner: mySigner,
identityRegistry: &mockedIdentityRegistry{
anyIdentityRegistered: true,
},
}
handler = &dialogHandler{
dialogReceived: make(chan communication.Dialog),
Expand Down Expand Up @@ -137,3 +180,14 @@ func (handler *dialogHandler) Handle(dialog communication.Dialog) error {
handler.dialogReceived <- dialog
return nil
}

type mockedIdentityRegistry struct {
anyIdentityRegistered bool
}

func (mir *mockedIdentityRegistry) IsRegistered(address common.Address) (bool, error) {
return mir.anyIdentityRegistered, nil
}

//check that we implemented mocked registry correctly
var _ registry.IdentityRegistry = &mockedIdentityRegistry{}
2 changes: 1 addition & 1 deletion communication/nats/request_bytes_test.go
Expand Up @@ -91,7 +91,7 @@ func TestBytesRespond(t *testing.T) {
err := receiver.Respond(consumer)
assert.NoError(t, err)

response, err := connection.Request("bytes-response", []byte("REQUEST"), time.Millisecond)
response, err := connection.Request("bytes-response", []byte("REQUEST"), 100*time.Millisecond)
assert.NoError(t, err)
assert.Equal(t, []byte("REQUEST"), *consumer.requestReceived.(*[]byte))
assert.Equal(t, []byte("RESPONSE"), response.Data)
Expand Down
38 changes: 21 additions & 17 deletions e2e/connection_test.go
Expand Up @@ -19,10 +19,8 @@ package e2e

import (
"github.com/cihub/seelog"
"github.com/mysterium/node/tequilapi/client"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestClientConnectsToNode(t *testing.T) {
Expand All @@ -33,21 +31,25 @@ func TestClientConnectsToNode(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, "NotConnected", status.Status)

identities, err := tequilApi.GetIdentities()
identity, err := tequilApi.NewIdentity("")
assert.NoError(t, err)

var identity client.IdentityDTO
if len(identities) < 1 {
identity, err = tequilApi.NewIdentity("")
assert.NoError(t, err)
} else {
identity = identities[0]
}
seelog.Info("Client identity is: ", identity.Address)

err = tequilApi.Unlock(identity.Address, "")
assert.NoError(t, err)

registrationData, err := tequilApi.RegistrationStatus(identity.Address)
assert.NoError(t, err)

err = registerIdentity(registrationData)
assert.NoError(t, err)

err = waitForCondition(func() (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice +1

regStatus, err := tequilApi.RegistrationStatus(identity.Address)
return regStatus.Registered, err
})
assert.NoError(t, err)

nonVpnIp, err := tequilApi.GetIP()
assert.NoError(t, err)
seelog.Info("Direct client address is: ", nonVpnIp)
Expand All @@ -69,10 +71,11 @@ func TestClientConnectsToNode(t *testing.T) {
_, err = tequilApi.Connect(identity.Address, proposal.ProviderID)
assert.NoError(t, err)

time.Sleep(10 * time.Second)
status, err = tequilApi.Status()
err = waitForCondition(func() (bool, error) {
status, err := tequilApi.Status()
return status.Status == "Connected", err
})
assert.NoError(t, err)
assert.Equal(t, "Connected", status.Status)

vpnIp, err := tequilApi.GetIP()
assert.NoError(t, err)
Expand All @@ -81,9 +84,10 @@ func TestClientConnectsToNode(t *testing.T) {
err = tequilApi.Disconnect()
assert.NoError(t, err)

time.Sleep(10 * time.Second)
status, err = tequilApi.Status()
err = waitForCondition(func() (bool, error) {
status, err := tequilApi.Status()
return status.Status == "NotConnected", err
})
assert.NoError(t, err)
assert.Equal(t, "NotConnected", status.Status)

}
40 changes: 40 additions & 0 deletions e2e/contition_checker.go
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2017 The "MysteriumNetwork/node" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package e2e

import (
"errors"
"time"
)

type conditionChecker func() (bool, error)

func waitForCondition(checkFunc conditionChecker) error {
for i := 0; i < 10; i++ {
state, err := checkFunc()
switch {
case err != nil:
return err
case state:
return nil
case !state:
time.Sleep(1 * time.Second)
}
}
return errors.New("state was still false")
}
2 changes: 2 additions & 0 deletions e2e/docker-compose.yml
Expand Up @@ -20,6 +20,8 @@ services:
--location.country=e2e-land
--openvpn.port=3000
--agreed-terms-and-conditions
--localnet
--ether.client.rpc=http://local-node:8545

client:
build:
Expand Down