Skip to content

Commit

Permalink
Send promise balance messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Waldz committed Oct 2, 2018
1 parent 214c53f commit ea03bce
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 44 deletions.
62 changes: 62 additions & 0 deletions core/promise/methods/noop/dialog_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2018 The "MysteriumNetwork/node" Authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package noop

import (
"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/identity"
)

type fakeDialog struct {
returnError error
returnReceiveMessage interface{}
lastSendMessage interface{}
}

func (fd *fakeDialog) PeerID() identity.Identity {
return identity.Identity{}
}

func (fd *fakeDialog) Close() error {
return nil
}

func (fd *fakeDialog) Receive(consumer communication.MessageConsumer) error {
if fd.returnError != nil {
return fd.returnError
}

consumer.Consume(fd.returnReceiveMessage)
return nil
}
func (fd *fakeDialog) Respond(consumer communication.RequestConsumer) error {
return nil
}

func (fd *fakeDialog) Send(producer communication.MessageProducer) error {
if fd.returnError != nil {
return fd.returnError
}

fd.lastSendMessage = producer.Produce()
return nil
}

func (fd *fakeDialog) Request(producer communication.RequestProducer) (responsePtr interface{}, err error) {
return nil, nil
}
46 changes: 5 additions & 41 deletions core/promise/methods/noop/promise_issuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package noop

import (
"testing"

"errors"
"testing"

"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/core/connection"
"github.com/mysteriumnetwork/node/core/promise"
"github.com/mysteriumnetwork/node/identity"
Expand All @@ -43,7 +41,7 @@ var _ connection.PromiseIssuer = &PromiseIssuer{}

func TestPromiseIssuer_Start_SubscriptionFails(t *testing.T) {
dialog := &fakeDialog{
onReceiveReturnError: errors.New("reject subscriptions"),
returnError: errors.New("reject subscriptions"),
}
issuer := &PromiseIssuer{dialog: dialog}

Expand All @@ -57,7 +55,9 @@ func TestPromiseIssuer_Start_SubscriptionFails(t *testing.T) {
}

func TestPromiseIssuer_Start_SubscriptionOfBalances(t *testing.T) {
dialog := &fakeDialog{}
dialog := &fakeDialog{
returnReceiveMessage: &promise.BalanceMessage{1, true, testToken(10)},
}
issuer := &PromiseIssuer{dialog: dialog}

logs := make([]string, 0)
Expand All @@ -67,46 +67,10 @@ func TestPromiseIssuer_Start_SubscriptionOfBalances(t *testing.T) {
err := issuer.Start(proposal)
assert.NoError(t, err)

dialog.onReceiveLastConsumer.Consume(
&promise.BalanceMessage{1, true, testToken(10)},
)
assert.Len(t, logs, 1)
assert.Equal(t, "[promise-issuer] Promise balance notified: 1000000000TEST", logs[0])
}

func testToken(amount float64) money.Money {
return money.NewMoney(amount, money.Currency("TEST"))
}

type fakeDialog struct {
onReceiveLastConsumer communication.MessageConsumer
onReceiveReturnError error
}

func (fd *fakeDialog) PeerID() identity.Identity {
return providerID
}

func (fd *fakeDialog) Close() error {
return nil
}

func (fd *fakeDialog) Receive(consumer communication.MessageConsumer) error {
if fd.onReceiveReturnError != nil {
return fd.onReceiveReturnError
}

fd.onReceiveLastConsumer = consumer
return nil
}
func (fd *fakeDialog) Respond(consumer communication.RequestConsumer) error {
return nil
}

func (fd *fakeDialog) Send(producer communication.MessageProducer) error {
return nil
}

func (fd *fakeDialog) Request(producer communication.RequestProducer) (responsePtr interface{}, err error) {
return nil, nil
}
59 changes: 58 additions & 1 deletion core/promise/methods/noop/promise_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,85 @@
package noop

import (
"fmt"
"time"

log "github.com/cihub/seelog"
"github.com/mysteriumnetwork/node/communication"
"github.com/mysteriumnetwork/node/core/promise"
"github.com/mysteriumnetwork/node/money"
discovery_dto "github.com/mysteriumnetwork/node/service_discovery/dto"
)

const (
processorLogPrefix = "[promise-processor] "

balanceNotifying = balanceState("Notifying")
balanceStopped = balanceState("Stopped")
)

// NewPromiseProcessor creates instance of PromiseProcessor
func NewPromiseProcessor(dialog communication.Dialog) *PromiseProcessor {
return &PromiseProcessor{
dialog: dialog,
dialog: dialog,
balanceInterval: 5 * time.Second,
}
}

type balanceState string

// PromiseProcessor process promises in such way, what no actual money is deducted from promise
type PromiseProcessor struct {
dialog communication.Dialog

balanceInterval time.Duration
balanceShutdown chan bool
balanceState balanceState

// these are populated later at runtime
lastPromise promise.Promise
}

// Start processing promises for given service proposal
func (processor *PromiseProcessor) Start(proposal discovery_dto.ServiceProposal) error {
processor.lastPromise = promise.Promise{
Amount: money.NewMoney(10, money.CURRENCY_MYST),
}

processor.balanceShutdown = make(chan bool, 1)
go processor.balanceLoop()

return nil
}

// Stop stops processing promises
func (processor *PromiseProcessor) Stop() error {
processor.balanceShutdown <- true
return nil
}

func (processor *PromiseProcessor) balanceLoop() {
processor.balanceState = balanceNotifying

balanceLoop:
for {
select {
case <-processor.balanceShutdown:
break balanceLoop

case <-time.After(processor.balanceInterval):
processor.balanceSend(
promise.BalanceMessage{1, true, processor.lastPromise.Amount},
)
}
}

processor.balanceState = balanceStopped
}

func (processor *PromiseProcessor) balanceSend(message promise.BalanceMessage) error {
log.Info(processorLogPrefix, fmt.Sprintf("Notifying balance %s", message.Balance.String()))
return processor.dialog.Send(&promise.BalanceMessageProducer{
Message: message,
})
}
67 changes: 66 additions & 1 deletion core/promise/methods/noop/promise_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,71 @@

package noop

import "github.com/mysteriumnetwork/node/session"
import (
"testing"

"time"

"github.com/mysteriumnetwork/node/core/promise"
"github.com/mysteriumnetwork/node/money"
"github.com/mysteriumnetwork/node/session"
"github.com/stretchr/testify/assert"
)

var _ session.PromiseProcessor = &PromiseProcessor{}

func TestPromiseProcessor_Start_SendsBalanceMessages(t *testing.T) {
dialog := &fakeDialog{}
processor := &PromiseProcessor{
dialog: dialog,
balanceInterval: time.Millisecond,
}

err := processor.Start(proposal)
assert.NoError(t, err)
waitForCondition(t, func() bool {
return processor.balanceState == balanceNotifying
})

waitForCondition(t, func() bool {
return dialog.lastSendMessage != nil
})
assert.Exactly(
t,
promise.BalanceMessage{1, true, money.NewMoney(10, money.CURRENCY_MYST)},
dialog.lastSendMessage,
)
}

func TestPromiseProcessor_Stop_StopsBalanceMessages(t *testing.T) {
dialog := &fakeDialog{}
processor := &PromiseProcessor{
dialog: dialog,
balanceInterval: time.Millisecond,
}

dialog.lastSendMessage = make(chan interface{}, 1)
err := processor.Start(proposal)
assert.NoError(t, err)
waitForCondition(t, func() bool {
return processor.balanceState == balanceNotifying
})

err = processor.Stop()
assert.NoError(t, err)
waitForCondition(t, func() bool {
return processor.balanceState == balanceStopped
})
}

func waitForCondition(t *testing.T, condition func() bool) {
for i := 0; i < 10; i++ {
switch {
case condition():
return
default:
time.Sleep(time.Millisecond)
}
}
assert.Fail(t, "condition not meet")
}
2 changes: 1 addition & 1 deletion session/create_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package session

import (
"errors"
"testing"

"github.com/mysteriumnetwork/node/identity"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand Down

0 comments on commit ea03bce

Please sign in to comment.