diff --git a/.env.example b/.env.example index 98a85fe8..20ca95d2 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ ENV=dev BFF_INGESTER_ENABLED=1 BFF_API_ENABLED=1 +BFF_HOTNESS_MATERIALIZER_ENABLED=1 # Your handle on bsky.app and an app password generated # at: https://bsky.app/settings/app-passwords diff --git a/cmd/bffsrv/main.go b/cmd/bffsrv/main.go index b25ec70b..8ac9238e 100644 --- a/cmd/bffsrv/main.go +++ b/cmd/bffsrv/main.go @@ -12,6 +12,7 @@ import ( "github.com/strideynet/bsky-furry-feed/api" "github.com/strideynet/bsky-furry-feed/bluesky" "github.com/strideynet/bsky-furry-feed/feed" + "github.com/strideynet/bsky-furry-feed/hotness" "github.com/strideynet/bsky-furry-feed/ingester" "github.com/strideynet/bsky-furry-feed/store" "go.opentelemetry.io/contrib/detectors/gcp" @@ -113,6 +114,7 @@ func runE(log *zap.Logger) error { ingesterEnabled := os.Getenv("BFF_INGESTER_ENABLED") == "1" apiEnabled := os.Getenv("BFF_API_ENABLED") == "1" + hotnessMaterializerEnabled := os.Getenv("BFF_HOTNESS_MATERIALIZER_ENABLED") == "1" log.Info("starting", zap.String("mode", string(mode))) @@ -227,6 +229,17 @@ func runE(log *zap.Logger) error { }) } + if hotnessMaterializerEnabled { + hm := hotness.NewMaterializer(log.Named("hotness"), pgxStore, hotness.Opts{ + MaterializationPeriod: 30 * time.Second, + RetentionPeriod: 1 * time.Hour, + LookbackPeriod: 24 * time.Hour, + }) + eg.Go(func() error { + return hm.Run(ctx) + }) + } + // Setup private diagnostics/metrics server debugSrv := debugServer() eg.Go(func() error { diff --git a/hotness/materializer.go b/hotness/materializer.go new file mode 100644 index 00000000..cf0996ab --- /dev/null +++ b/hotness/materializer.go @@ -0,0 +1,67 @@ +package hotness + +import ( + "context" + "fmt" + "time" + + "github.com/strideynet/bsky-furry-feed/store" + "go.uber.org/zap" +) + +type Materializer struct { + log *zap.Logger + store *store.PGXStore + opts Opts +} + +type Opts struct { + MaterializationPeriod time.Duration + RetentionPeriod time.Duration + LookbackPeriod time.Duration +} + +func NewMaterializer( + log *zap.Logger, store *store.PGXStore, opts Opts, +) *Materializer { + return &Materializer{ + log: log, + store: store, + opts: opts, + } +} + +func (m *Materializer) materialize(ctx context.Context) error { + return m.store.MaterializeClassicPostHotness(ctx, m.opts.LookbackPeriod) +} + +func (m *Materializer) cleanup(ctx context.Context) error { + return m.store.DeleteOldPostHotness(ctx, m.opts.RetentionPeriod) +} + +func (m *Materializer) step(ctx context.Context) error { + // NOTE: materalize and cleanup don't run a transaction together (they don't need to, since it's okay if we keep old materialized results around for too long). + // However, we should do the cleanup _after_ the materialization, in case the materialization fails and we converge on purging the entire table. + if err := m.materialize(ctx); err != nil { + return fmt.Errorf("materialize: %w", err) + } + if err := m.cleanup(ctx); err != nil { + return fmt.Errorf("cleanup: %w", err) + } + return nil +} + +func (m *Materializer) Run(ctx context.Context) error { + t := time.NewTicker(m.opts.MaterializationPeriod) + for { + select { + case <-t.C: + case <-ctx.Done(): + return ctx.Err() + } + + if err := m.step(ctx); err != nil { + m.log.Error("step", zap.Error(err)) + } + } +} diff --git a/sqlc.yaml b/sqlc.yaml index 2ef10807..e507b9f7 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -7,6 +7,7 @@ overrides: actor_did: ActorDID subject_uri: SubjectURI is_nsfw: IsNSFW + after_uri: AfterURI overrides: - column: candidate_posts.raw go_type: diff --git a/store/gen/candidate_posts.sql.go b/store/gen/candidate_posts.sql.go index 05cf3606..52d38bcb 100644 --- a/store/gen/candidate_posts.sql.go +++ b/store/gen/candidate_posts.sql.go @@ -14,9 +14,11 @@ import ( const createCandidatePost = `-- name: CreateCandidatePost :exec INSERT INTO - candidate_posts (uri, actor_did, created_at, indexed_at, hashtags, has_media, raw) +candidate_posts ( + uri, actor_did, created_at, indexed_at, hashtags, has_media, raw +) VALUES - ($1, $2, $3, $4, $5, $6, $7) +($1, $2, $3, $4, $5, $6, $7) ` type CreateCandidatePostParams struct { @@ -43,19 +45,28 @@ func (q *Queries) CreateCandidatePost(ctx context.Context, db DBTX, arg CreateCa } const getFurryNewFeed = `-- name: GetFurryNewFeed :many -SELECT - cp.uri, cp.actor_did, cp.created_at, cp.indexed_at, cp.is_hidden, cp.deleted_at, cp.raw, cp.hashtags, cp.has_media +SELECT cp.uri, cp.actor_did, cp.created_at, cp.indexed_at, cp.is_hidden, cp.deleted_at, cp.raw, cp.hashtags, cp.has_media FROM - candidate_posts cp - INNER JOIN candidate_actors ca ON cp.actor_did = ca.did + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did WHERE - cp.is_hidden = false - AND ca.status = 'approved' - AND (COALESCE($1::TEXT[], '{}') = '{}' OR $1::TEXT[] && cp.hashtags) - AND ($2::BOOLEAN IS NULL OR COALESCE(cp.has_media, false) = $2) - AND ($3::BOOLEAN IS NULL OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) = $3) - AND (cp.indexed_at < $4) - AND cp.deleted_at IS NULL + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + COALESCE($1::TEXT [], '{}') = '{}' + OR $1::TEXT [] && cp.hashtags + ) + AND ( + $2::BOOLEAN IS NULL + OR COALESCE(cp.has_media, false) = $2 + ) + AND ( + $3::BOOLEAN IS NULL + OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) + = $3 + ) + AND (cp.indexed_at < $4) + AND cp.deleted_at IS NULL ORDER BY cp.indexed_at DESC LIMIT $5 @@ -105,10 +116,107 @@ func (q *Queries) GetFurryNewFeed(ctx context.Context, db DBTX, arg GetFurryNewF return items, nil } +const getHotPosts = `-- name: GetHotPosts :many +SELECT cp.uri, cp.actor_did, cp.created_at, cp.indexed_at, cp.is_hidden, cp.deleted_at, cp.raw, cp.hashtags, cp.has_media +FROM + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did +INNER JOIN post_hotness AS ph + ON + ph.post_uri = cp.uri AND ph.alg = $1 + AND ph.generation_seq = $2 +WHERE + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + COALESCE($3::TEXT [], '{}') = '{}' + OR $3::TEXT [] && cp.hashtags + ) + AND ( + $4::BOOLEAN IS NULL + OR COALESCE(cp.has_media, false) = $4 + ) + AND ( + $5::BOOLEAN IS NULL + OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) + = $5 + ) + AND cp.deleted_at IS NULL + AND (ph.score, ph.uri) < ($6::REAL, $7::TEXT) +ORDER BY + ph.score DESC +LIMIT $8 +` + +type GetHotPostsParams struct { + Alg string + GenerationSeq int64 + Hashtags []string + HasMedia pgtype.Bool + IsNSFW pgtype.Bool + AfterScore float32 + AfterURI string + Limit int32 +} + +func (q *Queries) GetHotPosts(ctx context.Context, db DBTX, arg GetHotPostsParams) ([]CandidatePost, error) { + rows, err := db.Query(ctx, getHotPosts, + arg.Alg, + arg.GenerationSeq, + arg.Hashtags, + arg.HasMedia, + arg.IsNSFW, + arg.AfterScore, + arg.AfterURI, + arg.Limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []CandidatePost + for rows.Next() { + var i CandidatePost + if err := rows.Scan( + &i.URI, + &i.ActorDID, + &i.CreatedAt, + &i.IndexedAt, + &i.IsHidden, + &i.DeletedAt, + &i.Raw, + &i.Hashtags, + &i.HasMedia, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getLatestHotPostGeneration = `-- name: GetLatestHotPostGeneration :one +SELECT ph.generation_seq +FROM post_hotness AS ph +WHERE ph.alg = $1 +ORDER BY ph.generation_seq DESC +LIMIT 1 +` + +func (q *Queries) GetLatestHotPostGeneration(ctx context.Context, db DBTX, alg string) (int64, error) { + row := db.QueryRow(ctx, getLatestHotPostGeneration, alg) + var generation_seq int64 + err := row.Scan(&generation_seq) + return generation_seq, err +} + const getPostByURI = `-- name: GetPostByURI :one SELECT uri, actor_did, created_at, indexed_at, is_hidden, deleted_at, raw, hashtags, has_media FROM - candidate_posts cp + candidate_posts AS cp WHERE cp.uri = $1 LIMIT 1 @@ -134,23 +242,26 @@ func (q *Queries) GetPostByURI(ctx context.Context, db DBTX, uri string) (Candid const getPostsWithLikes = `-- name: GetPostsWithLikes :many SELECT cp.uri, cp.actor_did, cp.created_at, cp.indexed_at, cp.is_hidden, cp.deleted_at, cp.raw, cp.hashtags, cp.has_media, - (SELECT - COUNT(*) - FROM - candidate_likes cl - WHERE - cl.subject_uri = cp.uri - AND (cl.indexed_at < $1) - AND cl.deleted_at IS NULL) AS likes + ( + SELECT COUNT(*) + FROM + candidate_likes AS cl + WHERE + cl.subject_uri = cp.uri + AND (cl.indexed_at < $1) + AND cl.deleted_at IS NULL + ) AS likes FROM - candidate_posts cp - INNER JOIN candidate_actors ca ON cp.actor_did = ca.did + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did WHERE - cp.is_hidden = false - AND ca.status = 'approved' - AND ($1::TIMESTAMPTZ IS NULL OR - cp.indexed_at < $1) - AND cp.deleted_at IS NULL + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + $1::TIMESTAMPTZ IS NULL + OR cp.indexed_at < $1 + ) + AND cp.deleted_at IS NULL ORDER BY cp.indexed_at DESC LIMIT $2 @@ -207,7 +318,7 @@ func (q *Queries) GetPostsWithLikes(ctx context.Context, db DBTX, arg GetPostsWi const softDeleteCandidatePost = `-- name: SoftDeleteCandidatePost :exec UPDATE - candidate_posts +candidate_posts SET deleted_at = NOW() WHERE diff --git a/store/gen/models.go b/store/gen/models.go index 09867481..02b3c790 100644 --- a/store/gen/models.go +++ b/store/gen/models.go @@ -113,3 +113,11 @@ type CandidatePost struct { Hashtags []string HasMedia pgtype.Bool } + +type PostHotness struct { + URI string + Alg string + Score float32 + GenerationSeq int64 + GeneratedAt pgtype.Timestamptz +} diff --git a/store/gen/post_hotness.sql.go b/store/gen/post_hotness.sql.go new file mode 100644 index 00000000..3bb5db8d --- /dev/null +++ b/store/gen/post_hotness.sql.go @@ -0,0 +1,46 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.19.0 +// source: post_hotness.sql + +package gen + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const deleteOldPostHotness = `-- name: DeleteOldPostHotness :exec +DELETE FROM post_hotness +WHERE generated_at < NOW() - $1::INTERVAL +` + +func (q *Queries) DeleteOldPostHotness(ctx context.Context, db DBTX, retentionPeriod pgtype.Interval) error { + _, err := db.Exec(ctx, deleteOldPostHotness, retentionPeriod) + return err +} + +const materializeClassicPostHotness = `-- name: MaterializeClassicPostHotness :exec +INSERT INTO post_hotness (uri, alg, score, generation_seq) +SELECT + cp.uri AS uri, + 'classic' AS alg, + ( + SELECT COUNT(*) + FROM candidate_likes AS cl + WHERE cl.subject_uri = cp.uri AND cl.deleted_at IS NULL + ) + / (EXTRACT(EPOCH FROM NOW() - cp.created_at) / (60 * 60) + 2) + ^ 1.85 AS score, + NEXTVAL('post_hotness_generation_seq') AS generation_seq +FROM candidate_posts AS cp +WHERE + cp.deleted_at IS NULL + AND cp.created_at >= NOW() - $1::INTERVAL +` + +func (q *Queries) MaterializeClassicPostHotness(ctx context.Context, db DBTX, lookbackPeriod pgtype.Interval) error { + _, err := db.Exec(ctx, materializeClassicPostHotness, lookbackPeriod) + return err +} diff --git a/store/migrations/000017_create_post_hotness.down.sql b/store/migrations/000017_create_post_hotness.down.sql new file mode 100644 index 00000000..e35e8a8d --- /dev/null +++ b/store/migrations/000017_create_post_hotness.down.sql @@ -0,0 +1,2 @@ +DROP SEQUENCE post_hotness_generation_seq; +DROP TABLE post_hotness; diff --git a/store/migrations/000017_create_post_hotness.up.sql b/store/migrations/000017_create_post_hotness.up.sql new file mode 100644 index 00000000..3feccafa --- /dev/null +++ b/store/migrations/000017_create_post_hotness.up.sql @@ -0,0 +1,12 @@ +CREATE TABLE post_hotness ( + uri TEXT PRIMARY KEY, + alg TEXT NOT NULL, + score REAL NOT NULL, + generation_seq BIGINT NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX post_hotness_alg_score_uri_idx ON post_hotness (alg, score, uri); +CREATE INDEX post_hotness_alg_generation_seq_at_idx ON post_hotness ( + alg, generation_seq +); +CREATE SEQUENCE post_hotness_generation_seq; diff --git a/store/postgres.go b/store/postgres.go index f0a43313..47348924 100644 --- a/store/postgres.go +++ b/store/postgres.go @@ -559,6 +559,61 @@ func (s *PGXStore) ListPostsForNewFeed(ctx context.Context, opts ListPostsForNew return posts, nil } +func (s *PGXStore) GetLatestHotPostGeneration(ctx context.Context, alg string) (out int64, err error) { + ctx, span := tracer.Start(ctx, "pgx_store.get_latest_hot_post_generation") + defer func() { + endSpan(span, err) + }() + seq, err := s.queries.GetLatestHotPostGeneration(ctx, s.pool, alg) + if err != nil { + return 0, err + } + return seq, nil +} + +type ListPostsForHotFeedCursor struct { + Alg string + GenerationSeq int64 + AfterScore float32 + AfterURI string +} + +type ListPostsForHotFeedOpts struct { + Cursor ListPostsForHotFeedCursor + Hashtags []string + IsNSFW tristate.Tristate + HasMedia tristate.Tristate + Limit int +} + +func (s *PGXStore) ListPostsForHotFeed(ctx context.Context, opts ListPostsForHotFeedOpts) (out []gen.CandidatePost, err error) { + // TODO: Don't leak gen.CandidatePost implementation + ctx, span := tracer.Start(ctx, "pgx_store.list_posts_for_hot_feed") + defer func() { + endSpan(span, err) + }() + + queryParams := gen.GetHotPostsParams{ + Hashtags: opts.Hashtags, + HasMedia: tristateToPgtypeBool(opts.HasMedia), + IsNSFW: tristateToPgtypeBool(opts.IsNSFW), + Alg: opts.Cursor.Alg, + GenerationSeq: opts.Cursor.GenerationSeq, + AfterScore: opts.Cursor.AfterScore, + AfterURI: opts.Cursor.AfterURI, + } + if opts.Limit != 0 { + queryParams.Limit = int32(opts.Limit) + } + + posts, err := s.queries.GetHotPosts(ctx, s.pool, queryParams) + if err != nil { + return nil, fmt.Errorf("executing GetHotPosts query: %w", err) + } + + return posts, nil +} + type ListPostsWithLikesOpts struct { CursorTime time.Time Limit int @@ -703,3 +758,11 @@ func (s *PGXStore) GetPostByURI(ctx context.Context, uri string) (out gen.Candid // TODO: Return a proto type rather than exposing gen.CandidatePost return s.queries.GetPostByURI(ctx, s.pool, uri) } + +func (s *PGXStore) MaterializeClassicPostHotness(ctx context.Context, lookbackPeriod time.Duration) error { + return s.queries.MaterializeClassicPostHotness(ctx, s.pool, pgtype.Interval{Valid: true, Microseconds: lookbackPeriod.Microseconds()}) +} + +func (s *PGXStore) DeleteOldPostHotness(ctx context.Context, retentionPeriod time.Duration) error { + return s.queries.DeleteOldPostHotness(ctx, s.pool, pgtype.Interval{Valid: true, Microseconds: retentionPeriod.Microseconds()}) +} diff --git a/store/queries/candidate_posts.sql b/store/queries/candidate_posts.sql index 2769119f..babf1262 100644 --- a/store/queries/candidate_posts.sql +++ b/store/queries/candidate_posts.sql @@ -1,63 +1,115 @@ -- name: CreateCandidatePost :exec INSERT INTO - candidate_posts (uri, actor_did, created_at, indexed_at, hashtags, has_media, raw) +candidate_posts ( + uri, actor_did, created_at, indexed_at, hashtags, has_media, raw +) VALUES - ($1, $2, $3, $4, $5, $6, $7); +($1, $2, $3, $4, $5, $6, $7); -- name: SoftDeleteCandidatePost :exec UPDATE - candidate_posts +candidate_posts SET deleted_at = NOW() WHERE uri = $1; -- name: GetFurryNewFeed :many -SELECT - cp.* +SELECT cp.* FROM - candidate_posts cp - INNER JOIN candidate_actors ca ON cp.actor_did = ca.did + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did WHERE - cp.is_hidden = false - AND ca.status = 'approved' - AND (COALESCE(@hashtags::TEXT[], '{}') = '{}' OR @hashtags::TEXT[] && cp.hashtags) - AND (sqlc.narg(has_media)::BOOLEAN IS NULL OR COALESCE(cp.has_media, false) = @has_media) - AND (sqlc.narg(is_nsfw)::BOOLEAN IS NULL OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) = @is_nsfw) - AND (cp.indexed_at < @cursor_timestamp) - AND cp.deleted_at IS NULL + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + COALESCE(sqlc.arg(hashtags)::TEXT [], '{}') = '{}' + OR sqlc.arg(hashtags)::TEXT [] && cp.hashtags + ) + AND ( + sqlc.narg(has_media)::BOOLEAN IS NULL + OR COALESCE(cp.has_media, false) = sqlc.arg(has_media) + ) + AND ( + sqlc.narg(is_nsfw)::BOOLEAN IS NULL + OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) + = sqlc.arg(is_nsfw) + ) + AND (cp.indexed_at < sqlc.arg(cursor_timestamp)) + AND cp.deleted_at IS NULL ORDER BY cp.indexed_at DESC -LIMIT @_limit; +LIMIT sqlc.arg(_limit); -- name: GetPostsWithLikes :many SELECT cp.*, - (SELECT - COUNT(*) - FROM - candidate_likes cl - WHERE - cl.subject_uri = cp.uri - AND (cl.indexed_at < @cursor_timestamp) - AND cl.deleted_at IS NULL) AS likes + ( + SELECT COUNT(*) + FROM + candidate_likes AS cl + WHERE + cl.subject_uri = cp.uri + AND (cl.indexed_at < sqlc.arg(cursor_timestamp)) + AND cl.deleted_at IS NULL + ) AS likes FROM - candidate_posts cp - INNER JOIN candidate_actors ca ON cp.actor_did = ca.did + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did WHERE - cp.is_hidden = false - AND ca.status = 'approved' - AND (@cursor_timestamp::TIMESTAMPTZ IS NULL OR - cp.indexed_at < @cursor_timestamp) - AND cp.deleted_at IS NULL + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + sqlc.arg(cursor_timestamp)::TIMESTAMPTZ IS NULL + OR cp.indexed_at < sqlc.arg(cursor_timestamp) + ) + AND cp.deleted_at IS NULL ORDER BY cp.indexed_at DESC -LIMIT @_limit; +LIMIT sqlc.arg(_limit); -- name: GetPostByURI :one SELECT * FROM - candidate_posts cp + candidate_posts AS cp WHERE - cp.uri = @uri + cp.uri = sqlc.arg(uri) +LIMIT 1; + +-- name: GetLatestHotPostGeneration :one +SELECT ph.generation_seq +FROM post_hotness AS ph +WHERE ph.alg = sqlc.arg(alg) +ORDER BY ph.generation_seq DESC LIMIT 1; + +-- name: GetHotPosts :many +SELECT cp.* +FROM + candidate_posts AS cp +INNER JOIN candidate_actors AS ca ON cp.actor_did = ca.did +INNER JOIN post_hotness AS ph + ON + ph.post_uri = cp.uri AND ph.alg = sqlc.arg(alg) + AND ph.generation_seq = sqlc.arg(generation_seq) +WHERE + cp.is_hidden = false + AND ca.status = 'approved' + AND ( + COALESCE(sqlc.arg(hashtags)::TEXT [], '{}') = '{}' + OR sqlc.arg(hashtags)::TEXT [] && cp.hashtags + ) + AND ( + sqlc.narg(has_media)::BOOLEAN IS NULL + OR COALESCE(cp.has_media, false) = sqlc.narg(has_media) + ) + AND ( + sqlc.narg(is_nsfw)::BOOLEAN IS NULL + OR (ARRAY['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) + = sqlc.narg(is_nsfw) + ) + AND cp.deleted_at IS NULL + AND (ph.score, ph.uri) < (sqlc.arg(after_score)::REAL, sqlc.arg(after_uri)::TEXT) +ORDER BY + ph.score DESC +LIMIT sqlc.arg(_limit); diff --git a/store/queries/post_hotness.sql b/store/queries/post_hotness.sql new file mode 100644 index 00000000..a88f893a --- /dev/null +++ b/store/queries/post_hotness.sql @@ -0,0 +1,21 @@ +-- name: DeleteOldPostHotness :exec +DELETE FROM post_hotness +WHERE generated_at < NOW() - sqlc.arg(retention_period)::INTERVAL; + +-- name: MaterializeClassicPostHotness :exec +INSERT INTO post_hotness (uri, alg, score, generation_seq) +SELECT + cp.uri AS uri, + 'classic' AS alg, + ( + SELECT COUNT(*) + FROM candidate_likes AS cl + WHERE cl.subject_uri = cp.uri AND cl.deleted_at IS NULL + ) + / (EXTRACT(EPOCH FROM NOW() - cp.created_at) / (60 * 60) + 2) + ^ 1.85 AS score, + NEXTVAL('post_hotness_generation_seq') AS generation_seq +FROM candidate_posts AS cp +WHERE + cp.deleted_at IS NULL + AND cp.created_at >= NOW() - sqlc.arg(lookback_period)::INTERVAL;