Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blank address in NotifyUTXOsChanged to get all updates #2027

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/rpc/rpccontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewContext(cfg *config.Config,
UTXOIndex: utxoIndex,
ShutDownChan: shutDownChan,
}
context.NotificationManager = NewNotificationManager()
context.NotificationManager = NewNotificationManager(cfg.ActiveNetParams)

return context
}
66 changes: 58 additions & 8 deletions app/rpc/rpccontext/notificationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package rpccontext
import (
"sync"

"github.com/kaspanet/kaspad/domain/dagconfig"

"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/domain/utxoindex"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
Expand All @@ -13,6 +17,7 @@ import (
type NotificationManager struct {
sync.RWMutex
listeners map[*routerpkg.Router]*NotificationListener
params *dagconfig.Params
}

// UTXOsChangedNotificationAddress represents a kaspad address.
Expand All @@ -24,6 +29,8 @@ type UTXOsChangedNotificationAddress struct {

// NotificationListener represents a registered RPC notification listener
type NotificationListener struct {
params *dagconfig.Params

propagateBlockAddedNotifications bool
propagateVirtualSelectedParentChainChangedNotifications bool
propagateFinalityConflictNotifications bool
Expand All @@ -39,8 +46,9 @@ type NotificationListener struct {
}

// NewNotificationManager creates a new NotificationManager
func NewNotificationManager() *NotificationManager {
func NewNotificationManager(params *dagconfig.Params) *NotificationManager {
return &NotificationManager{
params: params,
listeners: make(map[*routerpkg.Router]*NotificationListener),
}
}
Expand All @@ -50,7 +58,7 @@ func (nm *NotificationManager) AddListener(router *routerpkg.Router) {
nm.Lock()
defer nm.Unlock()

listener := newNotificationListener()
listener := newNotificationListener(nm.params)
nm.listeners[router] = listener
}

Expand Down Expand Up @@ -174,15 +182,18 @@ func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOCha
for router, listener := range nm.listeners {
if listener.propagateUTXOsChangedNotifications {
// Filter utxoChanges and create a notification
notification := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
notification, err := listener.convertUTXOChangesToUTXOsChangedNotification(utxoChanges)
if err != nil {
return err
}

// Don't send the notification if it's empty
if len(notification.Added) == 0 && len(notification.Removed) == 0 {
continue
}

// Enqueue the notification
err := router.OutgoingRoute().Enqueue(notification)
err = router.OutgoingRoute().Enqueue(notification)
if err != nil {
return err
}
Expand Down Expand Up @@ -265,8 +276,10 @@ func (nm *NotificationManager) NotifyPruningPointUTXOSetOverride() error {
return nil
}

func newNotificationListener() *NotificationListener {
func newNotificationListener(params *dagconfig.Params) *NotificationListener {
return &NotificationListener{
params: params,

propagateBlockAddedNotifications: false,
propagateVirtualSelectedParentChainChangedNotifications: false,
propagateFinalityConflictNotifications: false,
Expand Down Expand Up @@ -339,7 +352,7 @@ func (nl *NotificationListener) StopPropagatingUTXOsChangedNotifications(address
}

func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
utxoChanges *utxoindex.UTXOChanges) *appmessage.UTXOsChangedNotificationMessage {
utxoChanges *utxoindex.UTXOChanges) (*appmessage.UTXOsChangedNotificationMessage, error) {

// As an optimization, we iterate over the smaller set (O(n)) among the two below
// and check existence over the larger set (O(1))
Expand All @@ -360,7 +373,7 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
} else if addressesSize > 0 {
for _, listenerAddress := range nl.propagateUTXOsChangedNotificationAddresses {
listenerScriptPublicKeyString := listenerAddress.ScriptPublicKeyString
if addedPairs, ok := utxoChanges.Added[listenerScriptPublicKeyString]; ok {
Expand All @@ -372,9 +385,46 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
for scriptPublicKeyString, addedPairs := range utxoChanges.Added {
addressString, err := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
if err != nil {
return nil, err
}

utxosByAddressesEntries := ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(addressString, addedPairs)
notification.Added = append(notification.Added, utxosByAddressesEntries...)
}
for scriptPublicKeyString, removedOutpoints := range utxoChanges.Removed {
addressString, err := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
if err != nil {
return nil, err
}

utxosByAddressesEntries := convertUTXOOutpointsToUTXOsByAddressesEntries(addressString, removedOutpoints)
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}

return notification
return notification, nil
}

func (nl *NotificationListener) scriptPubKeyStringToAddressString(scriptPublicKeyString utxoindex.ScriptPublicKeyString) (string, error) {
scriptPubKey := utxoindex.ConvertStringToScriptPublicKey(scriptPublicKeyString)

// ignore error because it is often returned when the script is of unknown type
scriptType, address, err := txscript.ExtractScriptPubKeyAddress(scriptPubKey, nl.params)
if err != nil {
return "", err
}

var addressString string
if scriptType == txscript.NonStandardTy {
addressString = ""
} else {
addressString = address.String()
}
return addressString, nil
}

// PropagateVirtualSelectedParentBlueScoreChangedNotifications instructs the listener to send
Expand Down
2 changes: 1 addition & 1 deletion domain/consensus/utils/txscript/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func PushedData(script []byte) ([][]byte, error) {
// as public keys which are invalid will return a nil address.
func ExtractScriptPubKeyAddress(scriptPubKey *externalapi.ScriptPublicKey, dagParams *dagconfig.Params) (ScriptClass, util.Address, error) {
if scriptPubKey.Version > constants.MaxScriptPublicKeyVersion {
return NonStandardTy, nil, errors.Errorf("Script version is unknown.")
return NonStandardTy, nil, nil
}
// No valid address if the script doesn't parse.
pops, err := parseScript(scriptPubKey.Script)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ message GetHeadersResponseMessage{
//
// See: UtxosChangedNotificationMessage
message NotifyUtxosChangedRequestMessage {
repeated string addresses = 1;
repeated string addresses = 1; // Leave empty to get all updates
}

message NotifyUtxosChangedResponseMessage {
Expand Down