Skip to content

Commit

Permalink
Download events p tagging our users
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Dec 7, 2023
1 parent 886e5e8 commit 0a04e99
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 29 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ moderation service.
## Design

The service connects to all known relays and replicates the following events:
- all events of kinds defined in [`globalEventTypesToDownload`][global-event-kinds-to-download-search]
- all events created by Nos users or users in their contacts lists
- events of kinds defined in [`globalEventTypesToDownload`][global-event-kinds-to-download-search] created by anyone
- events of any kind created by Nos users or users in their contact lists
- events of any kind which include Nos users in their `p` tags

The relays are discovered by using the code located in
[`RelaysExtractor`][relays-extractor-search] to scan nostr events. There is also
Expand Down
26 changes: 15 additions & 11 deletions service/app/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/downloader"
)

type DatabaseRelaySource struct {
Expand Down Expand Up @@ -74,35 +75,38 @@ func NewDatabasePublicKeySource(transactionProvider TransactionProvider, logger
}
}

func (d *DatabasePublicKeySource) GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error) {
func (d *DatabasePublicKeySource) GetPublicKeys(ctx context.Context) (downloader.PublicKeys, error) {
start := time.Now()
defer func() {
d.logger.Debug().WithField("duration", time.Since(start)).Message("got public keys")
}()

result := internal.NewEmptySet[domain.PublicKey]()
publicKeysToMonitor := *internal.NewEmptySet[domain.PublicKey]()
publicKeysToMonitorFollowees := *internal.NewEmptySet[domain.PublicKey]()

if err := d.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
publicKeysToMonitor, err := adapters.PublicKeysToMonitor.List(ctx)
values, err := adapters.PublicKeysToMonitor.List(ctx)
if err != nil {
return errors.Wrap(err, "error getting public keys to monitor")
}

for _, v := range publicKeysToMonitor {
result.Put(v.PublicKey())
for _, value := range values {
publicKeysToMonitor.Put(value.PublicKey())

followees, err := adapters.Contacts.GetFollowees(ctx, v.PublicKey())
followees, err := adapters.Contacts.GetFollowees(ctx, value.PublicKey())
if err != nil {
return errors.Wrapf(err, "error getting followees of '%s", v.PublicKey().Hex())
return errors.Wrapf(err, "error getting followees of '%s", value.PublicKey().Hex())
}

result.PutMany(followees)
publicKeysToMonitorFollowees.PutMany(followees)
}

return nil
}); err != nil {
return nil, errors.Wrap(err, "transaction error")
return downloader.PublicKeys{}, errors.Wrap(err, "transaction error")
}

return result.List(), nil
return downloader.NewPublicKeys(
publicKeysToMonitor.List(),
publicKeysToMonitorFollowees.List(),
), nil
}
77 changes: 65 additions & 12 deletions service/domain/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,44 @@ type RelaySource interface {
}

type PublicKeySource interface {
GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error)
GetPublicKeys(ctx context.Context) (PublicKeys, error)
}

type PublicKeys struct {
publicKeysToMonitor *internal.Set[domain.PublicKey]
publicKeysToMonitorFollowees *internal.Set[domain.PublicKey]
}

func NewPublicKeys(publicKeysToMonitor []domain.PublicKey, publicKeysToMonitorFollowees []domain.PublicKey) PublicKeys {
return PublicKeys{
publicKeysToMonitor: internal.NewSet(publicKeysToMonitor),
publicKeysToMonitorFollowees: internal.NewSet(publicKeysToMonitorFollowees),
}
}

func (p PublicKeys) PublicKeysToMonitor() []domain.PublicKey {
return p.publicKeysToMonitor.List()
}

func (p PublicKeys) PublicKeysToMonitorFollowees() []domain.PublicKey {
return p.publicKeysToMonitorFollowees.List()
}

func (p PublicKeys) All() []domain.PublicKey {
v := internal.NewEmptySet[domain.PublicKey]()
v.PutMany(p.publicKeysToMonitor.List())
v.PutMany(p.publicKeysToMonitorFollowees.List())
return v.List()
}

func (p PublicKeys) Equal(o PublicKeys) bool {
return p.publicKeysToMonitor.Equal(o.publicKeysToMonitor) &&
p.publicKeysToMonitorFollowees.Equal(o.publicKeysToMonitorFollowees)
}

type Downloader struct {
relayDownloaders map[domain.RelayAddress]runningRelayDownloader
previousPublicKeys *internal.Set[domain.PublicKey]
previousPublicKeys PublicKeys
relayDownloadersLock sync.Mutex // protects relayDownloaders and previousPublicKeys

bootstrapRelaySource BootstrapRelaySource
Expand Down Expand Up @@ -94,7 +126,7 @@ func NewDownloader(
metrics: metrics,
relayDownloaderFactory: relayDownloaderFactory,

previousPublicKeys: internal.NewEmptySet[domain.PublicKey](),
previousPublicKeys: NewPublicKeys(nil, nil),
}
}

Expand Down Expand Up @@ -213,19 +245,18 @@ func (d *Downloader) updatePublicKeys(ctx context.Context) error {
return errors.Wrap(err, "error getting public keys")
}

publicKeysSet := internal.NewSet(publicKeys)

d.relayDownloadersLock.Lock()
defer d.relayDownloadersLock.Unlock()

isDifferentThanPrevious := publicKeysSet.Equal(d.previousPublicKeys)
isDifferentThanPrevious := publicKeys.Equal(d.previousPublicKeys)

for _, v := range d.relayDownloaders {
v.RelayDownloader.UpdateSubscription(v.Context, isDifferentThanPrevious, publicKeysSet)
if err := v.RelayDownloader.UpdateSubscription(v.Context, isDifferentThanPrevious, publicKeys); err != nil {
return errors.Wrap(err, "error updating subscription")
}
}

d.previousPublicKeys = publicKeysSet

d.previousPublicKeys = publicKeys
return nil
}

Expand Down Expand Up @@ -270,6 +301,7 @@ func (d *RelayDownloader) Start(ctx context.Context) {
nil,
globalEventKindsToDownload,
nil,
nil,
d.downloadSince(),
))
}
Expand Down Expand Up @@ -301,26 +333,47 @@ func (d *RelayDownloader) downloadMessagesWithErr(ctx context.Context, filter do
return nil
}

func (d *RelayDownloader) UpdateSubscription(ctx context.Context, isDifferentThanPrevious bool, publicKeys *internal.Set[domain.PublicKey]) {
func (d *RelayDownloader) UpdateSubscription(ctx context.Context, isDifferentThanPrevious bool, publicKeys PublicKeys) error {
d.publicKeySubscriptionLock.Lock()
defer d.publicKeySubscriptionLock.Unlock()

if d.publicKeySubscriptionCancelFunc != nil && !isDifferentThanPrevious {
return
return nil
}

if d.publicKeySubscriptionCancelFunc != nil {
d.publicKeySubscriptionCancelFunc()
}

var pTags []domain.FilterTag
for _, publicKey := range publicKeys.PublicKeysToMonitor() {
tag, err := domain.NewFilterTag(domain.TagProfile, publicKey.Hex())
if err != nil {
return errors.Wrap(err, "error creating a filter tag")
}
pTags = append(pTags, tag)
}

ctx, cancel := context.WithCancel(ctx)

go d.downloadMessages(ctx, domain.NewFilter(
nil,
nil,
publicKeys.List(),
nil,
publicKeys.All(),
d.downloadSince(),
))

go d.downloadMessages(ctx, domain.NewFilter(
nil,
nil,
pTags,
nil,
d.downloadSince(),
))

d.publicKeySubscriptionCancelFunc = cancel
return nil
}

type RelayDownloaderFactory struct {
Expand Down
31 changes: 31 additions & 0 deletions service/domain/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package domain
import (
"time"

"github.com/boreq/errors"
"github.com/nbd-wtf/go-nostr"
"github.com/planetary-social/nos-event-service/internal"
)
Expand All @@ -14,6 +15,7 @@ type Filter struct {
func NewFilter(
eventIDs []EventId,
eventKinds []EventKind,
eventTags []FilterTag,
authors []PublicKey,
since *time.Time,
) Filter {
Expand All @@ -36,6 +38,14 @@ func NewFilter(
filter.Kinds = append(filter.Kinds, eventKind.Int())
}

if len(eventTags) > 0 {
filter.Tags = make(nostr.TagMap)

for _, eventTag := range eventTags {
filter.Tags[eventTag.Name().String()] = []string{eventTag.Value()}
}
}

for _, author := range authors {
filter.Authors = append(filter.Authors, author.Hex())
}
Expand All @@ -56,3 +66,24 @@ func (e Filter) Libfilter() nostr.Filter {
func (e Filter) MarshalJSON() ([]byte, error) {
return e.filter.MarshalJSON()
}

type FilterTag struct {
name EventTagName
value string
}

func NewFilterTag(name EventTagName, value string) (FilterTag, error) {
if value == "" {
return FilterTag{}, errors.New("value can't be empty")
}

return FilterTag{name: name, value: value}, nil
}

func (f FilterTag) Name() EventTagName {
return f.name
}

func (f FilterTag) Value() string {
return f.value
}
8 changes: 4 additions & 4 deletions service/domain/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

var (
tagProfile = MustNewEventTagName("p")
tagRelay = MustNewEventTagName("r")
TagProfile = MustNewEventTagName("p")
TagRelay = MustNewEventTagName("r")
)

type EventTag struct {
Expand Down Expand Up @@ -50,11 +50,11 @@ func (e EventTag) FirstValueIsAnEmptyString() bool {
}

func (e EventTag) IsProfile() bool {
return e.name == tagProfile
return e.name == TagProfile
}

func (e EventTag) IsRelay() bool {
return e.name == tagRelay
return e.name == TagRelay
}

func (e EventTag) Profile() (PublicKey, error) {
Expand Down

0 comments on commit 0a04e99

Please sign in to comment.