Skip to content

Commit

Permalink
support performing AUTH in the middle of SimplePool's subMany*
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Dec 8, 2023
1 parent f8fa490 commit b2170ef
Showing 1 changed file with 52 additions and 5 deletions.
57 changes: 52 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"

Expand All @@ -18,25 +19,49 @@ type SimplePool struct {
Relays *xsync.MapOf[string, *Relay]
Context context.Context

cancel context.CancelFunc
authHandler func(*Event) error
cancel context.CancelFunc
}

type IncomingEvent struct {
*Event
Relay *Relay
}

func NewSimplePool(ctx context.Context) *SimplePool {
type PoolOption interface {
IsPoolOption()
Apply(*SimplePool)
}

func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
ctx, cancel := context.WithCancel(ctx)

return &SimplePool{
pool := &SimplePool{
Relays: xsync.NewMapOf[*Relay](),

Context: ctx,
cancel: cancel,
}

for _, opt := range opts {
opt.Apply(pool)
}

return pool
}

// WithAuthHandler must be a function that signs the auth event when called.
// it will be called whenever any relay in the pool returns a `CLOSED` message
// with the "auth-required:" prefix, only once for each relay
type WithAuthHandler func(authEvent *Event) error

func (_ WithAuthHandler) IsPoolOption() {}
func (h WithAuthHandler) Apply(pool *SimplePool) {
pool.authHandler = h
}

var _ PoolOption = (WithAuthHandler)(nil)

func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
nm := NormalizeURL(url)

Expand Down Expand Up @@ -91,6 +116,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
cancel()
}()

hasAuthed := false
interval := 3 * time.Second
for {
select {
Expand All @@ -105,7 +131,9 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
if err != nil {
goto reconnect
}
hasAuthed = false

subscribe:
sub, err = relay.Subscribe(ctx, filters)
if err != nil {
goto reconnect
Expand Down Expand Up @@ -149,7 +177,15 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
})
}
case reason := <-sub.ClosedReason:
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again
if err := relay.Auth(ctx, pool.authHandler); err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe
}
} else {
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
}
return
case <-ctx.Done():
return
Expand Down Expand Up @@ -202,6 +238,9 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
return
}

hasAuthed := false

subscribe:
sub, err := relay.Subscribe(ctx, filters)
if sub == nil {
debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
Expand All @@ -215,7 +254,15 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
case <-sub.EndOfStoredEvents:
return
case reason := <-sub.ClosedReason:
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again
if err := relay.Auth(ctx, pool.authHandler); err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe
}
} else {
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
}
return
case evt, more := <-sub.Events:
if !more {
Expand Down

0 comments on commit b2170ef

Please sign in to comment.