Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hotness queries and materializer (#25) #127

Merged
merged 4 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ENV=dev
BFF_INGESTER_ENABLED=1
BFF_API_ENABLED=1
BFF_SCORE_MATERIALIZER_ENABLED=1
# Set BFF_HOSTNAME to the host you will serve BFF on.
BFF_HOSTNAME=

Expand Down
18 changes: 18 additions & 0 deletions cmd/bffsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/strideynet/bsky-furry-feed/scoring"
"os"
"os/signal"
"time"
Expand Down Expand Up @@ -113,6 +114,7 @@ func runE(log *zap.Logger) error {

ingesterEnabled := os.Getenv("BFF_INGESTER_ENABLED") == "1"
apiEnabled := os.Getenv("BFF_API_ENABLED") == "1"
scoreMaterializerEnabled := os.Getenv("BFF_SCORE_MATERIALIZER_ENABLED") == "1"

log.Info("starting", zap.String("mode", string(mode)))

Expand Down Expand Up @@ -226,6 +228,22 @@ func runE(log *zap.Logger) error {
})
}

if scoreMaterializerEnabled {
hm := scoring.NewMaterializer(
log.Named("scoring"),
pgxStore,
scoring.Opts{
MaterializationInterval: 1 * time.Minute,
RetentionPeriod: 15 * time.Minute,
LookbackPeriod: 24 * time.Hour,
},
)
eg.Go(func() error {
log.Info("scoring materializer started")
return hm.Run(ctx)
})
}

// Setup private diagnostics/metrics server
debugSrv := debugServer()
eg.Go(func() error {
Expand Down
122 changes: 87 additions & 35 deletions feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package feed

import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
Expand All @@ -11,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/strideynet/bsky-furry-feed/bluesky"
"github.com/strideynet/bsky-furry-feed/store"
"github.com/strideynet/bsky-furry-feed/store/gen"
"github.com/strideynet/bsky-furry-feed/tristate"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -102,24 +102,13 @@ func (s *Service) GetFeedPosts(ctx context.Context, feedKey string, cursor strin
return f.generate(ctx, s.store, cursor, limit)
}

func PostsFromStorePosts(storePosts []gen.CandidatePost) []Post {
posts := make([]Post, 0, len(storePosts))
for _, p := range storePosts {
posts = append(posts, Post{
URI: p.URI,
Cursor: bluesky.FormatTime(p.IndexedAt.Time),
})
}
return posts
}

type chronologicalGeneratorOpts struct {
type generatorOpts struct {
Hashtags []string
IsNSFW tristate.Tristate
HasMedia tristate.Tristate
}

func chronologicalGenerator(opts chronologicalGeneratorOpts) GenerateFunc {
func chronologicalGenerator(opts generatorOpts) GenerateFunc {
return func(ctx context.Context, pgxStore *store.PGXStore, cursor string, limit int) ([]Post, error) {
cursorTime := time.Now().UTC()
if cursor != "" {
Expand All @@ -137,11 +126,80 @@ func chronologicalGenerator(opts chronologicalGeneratorOpts) GenerateFunc {
CursorTime: cursorTime,
}

posts, err := pgxStore.ListPostsForNewFeed(ctx, params)
storePosts, err := pgxStore.ListPostsForNewFeed(ctx, params)
if err != nil {
return nil, fmt.Errorf("executing ListPostsForNewFeed: %w", err)
}
return PostsFromStorePosts(posts), nil

posts := make([]Post, 0, len(storePosts))
for _, p := range storePosts {
posts = append(posts, Post{
URI: p.URI,
Cursor: bluesky.FormatTime(p.IndexedAt.Time),
})
}

return posts, nil
}
}

func preScoredGenerator(alg string, opts generatorOpts) GenerateFunc {
return func(ctx context.Context, pgxStore *store.PGXStore, cursor string, limit int) ([]Post, error) {
type cursorValues struct {
GenerationSeq int64 `json:"generation_seq"`
AfterScore float32 `json:"after_score"`
AfterURI string `json:"after_uri"`
}
params := store.ListPostsForHotFeedOpts{
Limit: limit,
Hashtags: opts.Hashtags,
IsNSFW: opts.IsNSFW,
HasMedia: opts.HasMedia,
Alg: alg,
}
if cursor == "" {
seq, err := pgxStore.GetLatestScoreGeneration(ctx, alg)
if err != nil {
return nil, fmt.Errorf("executing GetLatestScoreGeneration: %w", err)
}
params.Cursor = store.ListPostsForHotFeedCursor{
GenerationSeq: seq,
AfterScore: float32(math.Inf(1)),
AfterURI: "",
}
} else {
var p cursorValues
if err := json.Unmarshal([]byte(cursor), &p); err != nil {
return nil, fmt.Errorf("unmarshaling cursor: %w", err)
}
params.Cursor = store.ListPostsForHotFeedCursor{
GenerationSeq: p.GenerationSeq,
AfterScore: p.AfterScore,
AfterURI: p.AfterURI,
strideynet marked this conversation as resolved.
Show resolved Hide resolved
}
}
storePosts, err := pgxStore.ListScoredPosts(ctx, params)
if err != nil {
return nil, fmt.Errorf("executing ListPostsForHotFeed: %w", err)
}

posts := make([]Post, 0, len(storePosts))
for _, p := range storePosts {
postCursor, err := json.Marshal(cursorValues{
GenerationSeq: params.Cursor.GenerationSeq,
AfterScore: p.Score,
AfterURI: p.URI,
})
if err != nil {
return nil, fmt.Errorf("marshaling cursor: %w", err)
}
posts = append(posts, Post{
URI: p.URI,
Cursor: string(postCursor),
})
}

return posts, nil
}
}

Expand Down Expand Up @@ -261,20 +319,20 @@ func ServiceWithDefaultFeeds(pgxStore *store.PGXStore) *Service {
DisplayName: "🐾 New",
Description: "Posts by furries across Bluesky. Contains a mix of SFW and NSFW content.\n\nJoin the furry feeds by following @furryli.st",
Priority: 101,
}, chronologicalGenerator(chronologicalGeneratorOpts{}))
}, chronologicalGenerator(generatorOpts{}))
r.Register(Meta{
ID: "furry-fursuit",
DisplayName: "🐾 Fursuits",
Description: "Posts by furries with #fursuit.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"fursuit"},
HasMedia: tristate.True,
}))
r.Register(Meta{
ID: "fursuit-nsfw",
DisplayName: "🐾 Murrsuits 🌙",
Description: "Posts by furries that have an image and #murrsuit or #fursuit.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"fursuit", "murrsuit", "mursuit"},
HasMedia: tristate.True,
IsNSFW: tristate.True,
Expand All @@ -283,7 +341,7 @@ func ServiceWithDefaultFeeds(pgxStore *store.PGXStore) *Service {
ID: "fursuit-clean",
DisplayName: "🐾 Fursuits 🧼",
Description: "Posts by furries with #fursuit and without #nsfw.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"fursuit"},
HasMedia: tristate.True,
IsNSFW: tristate.False,
Expand All @@ -292,15 +350,15 @@ func ServiceWithDefaultFeeds(pgxStore *store.PGXStore) *Service {
ID: "furry-art",
DisplayName: "🐾 Art",
Description: "Posts by furries with #art or #furryart. Contains a mix of SFW and NSFW content.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"art", "furryart"},
HasMedia: tristate.True,
}))
r.Register(Meta{
ID: "art-clean",
DisplayName: "🐾 Art 🧼",
Description: "Posts by furries with #art or #furryart and without #nsfw.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"art", "furryart"},
HasMedia: tristate.True,
IsNSFW: tristate.False,
Expand All @@ -309,7 +367,7 @@ func ServiceWithDefaultFeeds(pgxStore *store.PGXStore) *Service {
ID: "art-nsfw",
DisplayName: "🐾 Art 🌙",
Description: "Posts by furries with #art or #furryart and #nsfw.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"art", "furryart"},
HasMedia: tristate.True,
IsNSFW: tristate.True,
Expand All @@ -318,49 +376,43 @@ func ServiceWithDefaultFeeds(pgxStore *store.PGXStore) *Service {
ID: "furry-nsfw",
DisplayName: "🐾 New 🌙",
Description: "Posts by furries that have #nsfw.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
IsNSFW: tristate.True,
}))
r.Register(Meta{
ID: "furry-comms",
DisplayName: "🐾 #CommsOpen",
Description: "Posts by furries that have #commsopen.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"commsopen"},
}))
r.Register(Meta{
ID: "con-denfur",
DisplayName: "🐾 DenFur 2023",
Description: "A feed for all things DenFur! Use #denfur or #denfur2023 to include a post in the feed.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"denfur", "denfur2023"},
}))
r.Register(Meta{
ID: "merch",
DisplayName: "🐾 #FurSale",
Description: "Buy and sell furry merch on the FurSale feed. Use #fursale or #merch to include a post in the feed.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"fursale", "merch"},
}))
r.Register(Meta{
ID: "streamers",
DisplayName: "🐾 Streamers",
Description: "Find furs going live on streaming platforms. Use #goinglive or #furrylive to include a post in the feed.\n\nJoin the furry feeds by following @furryli.st",
}, chronologicalGenerator(chronologicalGeneratorOpts{
}, chronologicalGenerator(generatorOpts{
Hashtags: []string{"goinglive", "furrylive"},
}))
r.Register(Meta{
ID: "furry-test",
DisplayName: "🐾 Test 🚨🛠️",
Description: "Experimental version of the '🐾 Hot' feed.\ntest\ntest\n\ndouble break",
Priority: -1,
}, func(_ context.Context, _ *store.PGXStore, _ string, limit int) ([]Post, error) {
return []Post{
{
URI: "at://did:plc:dllwm3fafh66ktjofzxhylwk/app.bsky.feed.post/3jznh32lq6s2c",
},
}, nil
})
}, preScoredGenerator("classic", generatorOpts{}))

return r
}
10 changes: 5 additions & 5 deletions feed/feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func TestChronologicalGenerator(t *testing.T) {

for _, test := range []struct {
name string
opts chronologicalGeneratorOpts
opts generatorOpts
expectedPosts []string
}{
{
name: "all",
opts: chronologicalGeneratorOpts{
opts: generatorOpts{
Hashtags: []string{},
IsNSFW: tristate.Maybe,
HasMedia: tristate.Maybe,
Expand All @@ -117,7 +117,7 @@ func TestChronologicalGenerator(t *testing.T) {
},
{
name: "all fursuits",
opts: chronologicalGeneratorOpts{
opts: generatorOpts{
Hashtags: []string{"fursuit"},
IsNSFW: tristate.Maybe,
HasMedia: tristate.True,
Expand All @@ -126,7 +126,7 @@ func TestChronologicalGenerator(t *testing.T) {
},
{
name: "sfw only fursuits",
opts: chronologicalGeneratorOpts{
opts: generatorOpts{
Hashtags: []string{"fursuit"},
IsNSFW: tristate.False,
HasMedia: tristate.True,
Expand All @@ -135,7 +135,7 @@ func TestChronologicalGenerator(t *testing.T) {
},
{
name: "nsfw only art",
opts: chronologicalGeneratorOpts{
opts: generatorOpts{
Hashtags: []string{"art", "furryart"},
IsNSFW: tristate.True,
HasMedia: tristate.True,
Expand Down
2 changes: 2 additions & 0 deletions infra/k8s/ingester.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ spec:
value: production
- name: BFF_INGESTER_ENABLED
value: "1"
- name: BFF_SCORE_MATERIALIZER_ENABLED
value: "1"
envFrom:
- secretRef:
name: shared-env
Expand Down
Loading