From 98465c877caf69fc8c29873f9f0611ec142d37ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C5=A1per=20Grom?= Date: Thu, 29 Jan 2026 16:08:12 +0100 Subject: [PATCH] fix: mentions snowflake sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Gašper Grom --- services/libs/tinybird/datasources/mentions.datasource | 4 +++- services/libs/tinybird/pipes/octolens_mentions_sink.pipe | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/services/libs/tinybird/datasources/mentions.datasource b/services/libs/tinybird/datasources/mentions.datasource index e60a92382b..9df37df27c 100644 --- a/services/libs/tinybird/datasources/mentions.datasource +++ b/services/libs/tinybird/datasources/mentions.datasource @@ -45,7 +45,9 @@ SCHEMA > `projectSlug` LowCardinality(String) `json:$.projectSlug` DEFAULT '', `createdAt` DateTime64(3) `json:$.createdAt` DEFAULT now64(3), `bookmarked` UInt8 `json:$.bookmarked`, - `keywords` Array(String) `json:$.keywords[:]` + `keywords` Array(String) `json:$.keywords[:]`, + `authorFollowerCount` Nullable(Int32) `json:$.authorFollowerCount`, + `tags` Array(Nullable(String)) `json:$.tags[:]` ENGINE ReplacingMergeTree ENGINE_PARTITION_KEY toYear(timestamp) diff --git a/services/libs/tinybird/pipes/octolens_mentions_sink.pipe b/services/libs/tinybird/pipes/octolens_mentions_sink.pipe index f3567800f4..9d497a8acf 100644 --- a/services/libs/tinybird/pipes/octolens_mentions_sink.pipe +++ b/services/libs/tinybird/pipes/octolens_mentions_sink.pipe @@ -1,7 +1,8 @@ DESCRIPTION > Sink pipe to export Octolens mentions data to Kafka for Snowflake ingestion. Reads from mentions datasource using FINAL to get deduplicated data. - Runs daily at 00:30 UTC, exporting all data (full snapshot). + Runs daily at 00:30 UTC, exporting only data created since the last run. + Uses createdAt filter to get rows added the previous day (incremental sync). NODE octolens_mentions_select_fields SQL > @@ -28,8 +29,11 @@ SQL > createdAt, bookmarked, arrayStringConcat(keywords, ',') as keywords, + authorFollowerCount, + arrayStringConcat(tags, ',') as tags, toStartOfDay(now()) as date FROM mentions FINAL + WHERE createdAt >= toStartOfDay(now()) - INTERVAL 1 DAY AND createdAt < toStartOfDay(now()) TYPE SINK EXPORT_SERVICE kafka