Skip to content

Commit

Permalink
Allow blank address in NotifyUTXOsChanged to get all updates (#2027)
Browse files Browse the repository at this point in the history
* Allow blank address in NotifyUTXOsChanged to get all updates

* To see if address is possible to extract: Check for NonStandardTy rather than error

* Don't swallow errors

Co-authored-by: Ori Newman <orinewman1@gmail.com>
  • Loading branch information
svarogg and someone235 committed May 18, 2022
1 parent 8735da0 commit 5d24e2a
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
2 changes: 1 addition & 1 deletion app/rpc/rpccontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewContext(cfg *config.Config,
UTXOIndex: utxoIndex,
ShutDownChan: shutDownChan,
}
context.NotificationManager = NewNotificationManager()
context.NotificationManager = NewNotificationManager(cfg.ActiveNetParams)

return context
}
66 changes: 58 additions & 8 deletions app/rpc/rpccontext/notificationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package rpccontext
import (
"sync"

"github.com/kaspanet/kaspad/domain/dagconfig"

"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/domain/utxoindex"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
Expand All @@ -13,6 +17,7 @@ import (
type NotificationManager struct {
sync.RWMutex
listeners map[*routerpkg.Router]*NotificationListener
params *dagconfig.Params
}

// UTXOsChangedNotificationAddress represents a kaspad address.
Expand All @@ -24,6 +29,8 @@ type UTXOsChangedNotificationAddress struct {

// NotificationListener represents a registered RPC notification listener
type NotificationListener struct {
params *dagconfig.Params

propagateBlockAddedNotifications bool
propagateVirtualSelectedParentChainChangedNotifications bool
propagateFinalityConflictNotifications bool
Expand All @@ -39,8 +46,9 @@ type NotificationListener struct {
}

// NewNotificationManager creates a new NotificationManager
func NewNotificationManager() *NotificationManager {
func NewNotificationManager(params *dagconfig.Params) *NotificationManager {
return &NotificationManager{
params: params,
listeners: make(map[*routerpkg.Router]*NotificationListener),
}
}
Expand All @@ -50,7 +58,7 @@ func (nm *NotificationManager) AddListener(router *routerpkg.Router) {
nm.Lock()
defer nm.Unlock()

listener := newNotificationListener()
listener := newNotificationListener(nm.params)
nm.listeners[router] = listener
}

Expand Down Expand Up @@ -174,15 +182,18 @@ func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOCha
for router, listener := range nm.listeners {
if listener.propagateUTXOsChangedNotifications {
// Filter utxoChanges and create a notification
notification := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
notification, err := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
if err != nil {
return err
}

// Don't send the notification if it's empty
if len(notification.Added) == 0 && len(notification.Removed) == 0 {
continue
}

// Enqueue the notification
err := router.OutgoingRoute().Enqueue(notification)
err = router.OutgoingRoute().Enqueue(notification)
if err != nil {
return err
}
Expand Down Expand Up @@ -265,8 +276,10 @@ func (nm *NotificationManager) NotifyPruningPointUTXOSetOverride() error {
return nil
}

func newNotificationListener() *NotificationListener {
func newNotificationListener(params *dagconfig.Params) *NotificationListener {
return &NotificationListener{
params: params,

propagateBlockAddedNotifications: false,
propagateVirtualSelectedParentChainChangedNotifications: false,
propagateFinalityConflictNotifications: false,
Expand Down Expand Up @@ -339,7 +352,7 @@ func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(address
}

func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
utxoChanges *utxoindex.UTXOChanges) *appmessage.UTXOsChangedNotificationMessage {
utxoChanges *utxoindex.UTXOChanges) (*appmessage.UTXOsChangedNotificationMessage, error) {

// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
Expand All @@ -360,7 +373,7 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
} else if addressesSize > 0 {
for _, listenerAddress := range nl.propagateUTXOsChangedNotificationAddresses {
listenerScriptPublicKeyString := listenerAddress.ScriptPublicKeyString
if addedPairs, ok := utxoChanges.Added[listenerScriptPublicKeyString]; ok {
Expand All @@ -372,9 +385,46 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
for scriptPublicKeyString, addedPairs := range utxoChanges.Added {
addressString, err := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
if err != nil {
return nil, err
}

utxosByAddressesEntries := ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(addressString, addedPairs)
notification.Added = append(notification.Added, utxosByAddressesEntries...)
}
for scriptPublicKeyString, removedOutpoints := range utxoChanges.Removed {
addressString, err := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
if err != nil {
return nil, err
}

utxosByAddressesEntries := convertUTXOOutpointsToUTXOsByAddressesEntries(addressString, removedOutpoints)
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}

return notification
return notification, nil
}

func (nl *NotificationListener) scriptPubKeyStringToAddressString(scriptPublicKeyString utxoindex.ScriptPublicKeyString) (string, error) {
scriptPubKey := utxoindex.ConvertStringToScriptPublicKey(scriptPublicKeyString)

// ignore error because it is often returned when the script is of unknown type
scriptType, address, err := txscript.ExtractScriptPubKeyAddress(scriptPubKey, nl.params)
if err != nil {
return "", err
}

var addressString string
if scriptType == txscript.NonStandardTy {
addressString = ""
} else {
addressString = address.String()
}
return addressString, nil
}

// PropagateVirtualSelectedParentBlueScoreChangedNotifications instructs the listener to send
Expand Down
2 changes: 1 addition & 1 deletion domain/consensus/utils/txscript/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func PushedData(script []byte) ([][]byte, error) {
// as public keys which are invalid will return a nil address.
func ExtractScriptPubKeyAddress(scriptPubKey *externalapi.ScriptPublicKey, dagParams *dagconfig.Params) (ScriptClass, util.Address, error) {
if scriptPubKey.Version > constants.MaxScriptPublicKeyVersion {
return NonStandardTy, nil, errors.Errorf("Script version is unknown.")
return NonStandardTy, nil, nil
}
// No valid address if the script doesn't parse.
pops, err := parseScript(scriptPubKey.Script)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ message GetHeadersResponseMessage{
//
// See: UtxosChangedNotificationMessage
message NotifyUtxosChangedRequestMessage {
repeated string addresses = 1;
repeated string addresses = 1; // Leave empty to get all updates
}

message NotifyUtxosChangedResponseMessage {
Expand Down

0 comments on commit 5d24e2a

Please sign in to comment.