Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
558 changes: 558 additions & 0 deletions services/libs/tinybird/LAMBDA_ARCHITECTURE.md

Large diffs are not rendered by default.

64 changes: 6 additions & 58 deletions services/libs/tinybird/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,7 @@
[This image](https://uploads.linear.app/aebec7ad-5649-4758-9bed-061f7228a879/b72d9f55-8f27-4c57-81fe-729807c12ffb/36c116c2-0f88-4735-a932-0c3e6bf8ea45) shows how data flows from CM to Insights.

## Activity Preprocessing Pipeline

1. **New activities land** on `activities` and `activityRelations` datasources
2. **Deduplication** of activities via copy pipe:
- `activities_deduplicated_copy_pipe (every hour at minute 0)`
2.1. `activities` → `activities_deduplicated_ds`
3. **Preprocessing pipeline for activityRelations - Deduplicates, filters and sorts data for performant queries**:
- `activityRelations (every hour at minute 0)` → `activityRelations_deduplicated_cleaned_ds`

## Other Copy Pipes

1. **pull_request_analysis_copy_pipe (every hour at minute 15)**: Compacts activities from same PR into one, keeping state change times in the same row. Helps with serving PR related metrics
2. **issue_analysis_copy_pipe (every hour at minute 15)**: Similar to pr analysis, this time we compact issue related information into one row.
See LAMBDA_ARCHITECTURE.md for details

---

Expand Down Expand Up @@ -63,18 +52,6 @@ Since `activities` **don’t exist in Postgres**, schema iteration must be done

### Iterating on Datasources Replicated by Sequin

These sources exist in Postgres (i.e., all Tinybird datasources **except `activities`**):

- `activityRelations`
- `collections`
- `insightsProjects`
- `collectionsInsightsProjects`
- `members`
- `organizations`
- `segments`
- `securityInsightsEvaluationSuiteControlEvaluations`
- `securityInsightsEvaluationSuiteControlEvaluationAssessments`

**Steps:**
1. **Pause** the related Sequin sink
2. **Run Postgres migration** to add/update/remove fields
Expand All @@ -83,7 +60,6 @@ These sources exist in Postgres (i.e., all Tinybird datasources **except `activi
4. **Backfill** the resource from Sequin
5. **Restart** the paused sink


---

### Add new tables to sequin and tinybird
Expand All @@ -95,7 +71,7 @@ These sources exist in Postgres (i.e., all Tinybird datasources **except `activi
ALTER PUBLICATION sequin_pub ADD TABLE "tableName";
ALTER TABLE public."tableName" REPLICA IDENTITY FULL;
```
3. (only for PROD) u need to create the topic in oracle kafka, it doesn't get created automaticly
3. (only for PROD) You need to create the topic in oracle kafka, it doesn't get created automaticly
4. Update tinybird kafka connect plugin env ( it's under crowd-kube/lf-prod-oracle(lf-staging-oracle)/kafka-connect/tinybird-sink.properties.enc ), there are list of tracked files in the decrypted file.
5. Restart kafka-connect
6. Create tinybird datasource schema and push it to tinybird
Expand All @@ -111,11 +87,11 @@ GRANT SELECT ON "tableName" to sequin;

### Downtime Consideration

Switching between old and new datasources can lead to **temporary downtime**, but only for **endpoint pipes that consume raw datasources directly**.
Switching between old and new datasources can lead to **temporary downtime**, but only for **endpoint pipes that consume raw datasources directly**.

**No Downtime** if the endpoint pipe uses a **deduplication copy pipe**:
- You can safely remove the raw datasource
- The deduplicated datasource will continue to serve data
**No Downtime** if the endpoint pipe uses a **copy pipe result**:
- You can safely remove the raw datasource after stopping the copy job
- The copy pipe result datasource will continue to serve data
- New fields will be included in the **next copy run**

**Only consider the following tips if your pipe is consuming raw datasources directly**:
Expand All @@ -127,34 +103,6 @@ Switching between old and new datasources can lead to **temporary downtime**, bu

---

### Alternative Way to Handle Datasource Iterations

You can avoid downtime entirely by **not deleting the old datasource**.

Instead of renaming the new datasource to the old one,
**Update each endpoint pipe to use the new datasource directly**

This allows your pipelines to stay active without interruption.

#### Pros:
- No downtime at all
- Safer testing of the new datasource before retiring the old one

#### Cons:
- Every pipe using the old datasource must be updated manually
- Easy to miss a reference if not done carefully

---

### Choosing the Right Approach

Until we move fully to **Tinybird Forward** (which will support migration scripts), the best practice is to **find a balance** between these two approaches:

1. **Quick rename strategy** is best when the raw datasource is only consumed by deduplication copy pipes, but no endpoints
2. **Pipe-by-pipe updates** for zero downtime where #1 is not enough

Pick the method that best fits your workflow and datasource complexity.

# Testing Tinybird Pipes Locally

This guide explains how to test a Tinybird data pipeline ("pipe") on your local Tinybird environment. We will fetch sample data (fixtures) from a staging Tinybird workspace and use it to run and verify a pipe locally. The steps below are written for a developer who may not be familiar with Tinybird, and they are organized in a clear, numbered format for easy follow-up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ SCHEMA >
`gitChangedLines` UInt64,
`gitChangedLinesBucket` String,
`organizationCountryCode` LowCardinality(String),
`organizationName` String
`organizationName` String,
`snapshotId` DateTime

ENGINE MergeTree
ENGINE_PARTITION_KEY toYear(timestamp)
ENGINE_PARTITION_KEY snapshotId
ENGINE_SORTING_KEY segmentId, timestamp, type, platform, memberId, organizationId
ENGINE_TTL toDateTime(snapshotId) + toIntervalHour(6)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
SCHEMA >
`activityId` String,
`conversationId` String,
`createdAt` DateTime64(3),
`updatedAt` DateTime64(3),
`memberId` String,
`objectMemberId` String,
`objectMemberUsername` String,
`organizationId` String,
`parentId` String,
`platform` LowCardinality(String),
`segmentId` String,
`username` String,
`sourceId` String,
`type` LowCardinality(String),
`timestamp` DateTime64(3),
`sourceParentId` String,
`channel` String,
`sentimentScore` Int8,
`gitInsertions` UInt32,
`gitDeletions` UInt32,
`score` Int8,
`isContribution` UInt8,
`pullRequestReviewState` LowCardinality(String),
`gitChangedLines` UInt64,
`gitChangedLinesBucket` String,
`organizationCountryCode` LowCardinality(String),
`organizationName` String,
`snapshotId` DateTime

ENGINE ReplacingMergeTree
ENGINE_PARTITION_KEY toYYYYMM(snapshotId)
ENGINE_SORTING_KEY snapshotId, segmentId, timestamp, type, platform, channel, sourceId
ENGINE_TTL toDateTime(snapshotId) + toIntervalDay(1)
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ DESCRIPTION >
- `segmentId` links to the subproject-level segment for filtering and organization.
- `username` contains the username of the member who performed the activity.

TAGS "Activity preprocessing pipeline", "Query optimization"

SCHEMA >
`activityId` String,
`conversationId` String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ DESCRIPTION >
- `weekday` represents the day of the week (0-6, UInt8) in local time.
- `two_hours_block` represents 2-hour time blocks (0-11, UInt16) for temporal pattern analysis.

TAGS "Contribution patterns", "Temporal analysis"

SCHEMA >
`id` String,
`timestamp` DateTime64(3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ DESCRIPTION >
- `memberId` links to the member who has this maintainer role.
- `insightsProjectId` links to the insights project this repository belongs to.

TAGS "Maintainer roles", "Project governance"
TAGS "Project governance"

SCHEMA >
`id` String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ SCHEMA >
`reviewedInSeconds` Nullable(Int64),
`closedInSeconds` Nullable(Int64),
`mergedInSeconds` Nullable(Int64),
`resolvedInSeconds` Nullable(Int64)
`resolvedInSeconds` Nullable(Int64),
`snapshotId` DateTime

ENGINE MergeTree
ENGINE_PARTITION_KEY toYear(openedAt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ DESCRIPTION >
- `contributorCount` is an aggregate function storing distinct contributor counts for the segment.
- `organizationCount` is an aggregate function storing distinct organization counts for the segment.

TAGS "Segment aggregates", "Materialized view"

SCHEMA >
`segmentId` String,
`contributorCount` AggregateFunction(countDistinct, String),
Expand Down
5 changes: 3 additions & 2 deletions services/libs/tinybird/pipes/activities_filtered.pipe
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ DESCRIPTION >
- This pipe is consumed by many of downstream pipes and widgets across the platform for consistent activity filtering.
- Performance is optimized through proper sorting keys on `segmentId`, `timestamp`, `type`, `platform`, and `memberId` in the source datasource.

NODE activities_filtered_v2_1
NODE activities_filtered_LAMBDA
SQL >
%
SELECT activityId as id, timestamp, type, platform, memberId, organizationId, segmentId
FROM activityRelations_deduplicated_cleaned_ds a
where
segmentId = (SELECT segmentId FROM segments_filtered)
snapshotId = (select max(snapshotId) from activityRelations_deduplicated_cleaned_ds)
AND segmentId = (SELECT segmentId FROM segments_filtered)
{% if defined(startDate) %}
AND a.timestamp
> {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ DESCRIPTION >
- `includeCollaborations`: Optional boolean to include or exclude collaboration activities. Inherited from activityTypes_filtered.
- Response: `id` (activityId), `timestamp`, `type`, `platform`, `memberId`, `organizationId`, `segmentId`.

NODE activities_filtered_by_timestamp_and_channel
NODE activities_filtered_historical_cutoff_LAMBDA
SQL >
%
SELECT activityId as id, timestamp, type, platform, memberId, organizationId, segmentId
FROM activityRelations_deduplicated_cleaned_ds a
where
segmentId = (SELECT segmentId FROM segments_filtered)
snapshotId = (select max(snapshotId) from activityRelations_deduplicated_cleaned_ds)
AND segmentId = (SELECT segmentId FROM segments_filtered)
{% if defined(startDate) %}
AND a.timestamp
<= {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ DESCRIPTION >
- `granularity`: Required string for time aggregation and period extension ('daily', 'weekly', 'monthly', 'quarterly', 'yearly')
- Response: `id` (activityId), `timestamp`, `type`, `platform`, `memberId`, `organizationId`, `segmentId`.

NODE activities_filtered_by_timestamp_and_channel
NODE activities_filtered_retention_LAMBDA
SQL >
%
SELECT activityId as id, timestamp, type, platform, memberId, organizationId, segmentId
FROM activityRelations_deduplicated_cleaned_ds a
where
segmentId = (SELECT segmentId FROM segments_filtered)
snapshotId = (select max(snapshotId) from activityRelations_deduplicated_cleaned_ds)
AND segmentId = (SELECT segmentId FROM segments_filtered)
{% if defined(startDate) %}
AND a.timestamp
> {% if defined(granularity) and granularity == "daily" %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ SQL >
organizationCountryCode,
organizationName
FROM activityRelations_deduplicated_cleaned_ds
where snapshotId = (select max(snapshotId) from activityRelations_deduplicated_cleaned_ds)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
NODE country_mapping_array
SQL >
SELECT groupArray((country, country_code, timezone_offset)) AS country_data FROM country_mapping_ds

NODE activityRelations_deduplicated_cleaned_denormalized
SQL >
WITH
upperUTF8(o.location) AS search_u,
(SELECT arrayMap(x -> upperUTF8(x .1), country_data) FROM country_mapping_array) AS names_u,
(SELECT country_data FROM country_mapping_array) AS mapping,
multiSearchFirstIndexUTF8(search_u, names_u) AS idx,
if(idx = 0, ('Unknown', 'XX', 0), mapping[idx]) AS country_data
SELECT
activityRelations.*,
(gitInsertions + gitDeletions) as gitChangedLines,
case
when gitChangedLines > 0 and gitChangedLines < 10
then '1-9'
when gitChangedLines > 9 and gitChangedLines < 60
then '10-59'
when gitChangedLines > 59 and gitChangedLines < 100
then '60-99'
when gitChangedLines > 99 and gitChangedLines < 500
then '100-499'
when gitChangedLines > 499
then '500+'
else ''
end as "gitChangedLinesBucket",
CAST(country_data .2 AS LowCardinality(String)) AS organizationCountryCode,
o.displayName as "organizationName",
toStartOfInterval(now(), INTERVAL 1 hour) as snapshotId
from activityRelations final
left join organizations o final on o.id = activityRelations.organizationId
where
memberId IN (SELECT id FROM members_sorted)
and (
(
platform IN ('git', 'gerrit', 'github', 'gitlab')
AND channel
IN (SELECT arrayJoin(i.repositories) FROM insightsProjects i where isNull (i.deletedAt))
AND activityRelations.segmentId IN (
SELECT segmentId
FROM segmentRepositories sr FINAL
WHERE (sr.excluded IS NULL OR sr.excluded = false)
)
)
OR platform NOT IN ('git', 'gerrit', 'github', 'gitlab')
)

TYPE COPY
TARGET_DATASOURCE activityRelations_deduplicated_cleaned_ds
COPY_MODE replace
COPY_SCHEDULE @on-demand
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
NODE country_mapping_array
SQL >
SELECT groupArray((country, country_code, timezone_offset)) AS country_data FROM country_mapping_ds

NODE activityRelations_deduplicated_cleaned_denormalized
SQL >
WITH
upperUTF8(o.location) AS search_u,
(SELECT arrayMap(x -> upperUTF8(x .1), country_data) FROM country_mapping_array) AS names_u,
(SELECT country_data FROM country_mapping_array) AS mapping,
multiSearchFirstIndexUTF8(search_u, names_u) AS idx,
if(idx = 0, ('Unknown', 'XX', 0), mapping[idx]) AS country_data
SELECT
activityRelations.*,
(gitInsertions + gitDeletions) as gitChangedLines,
case
when gitChangedLines > 0 and gitChangedLines < 10
then '1-9'
when gitChangedLines > 9 and gitChangedLines < 60
then '10-59'
when gitChangedLines > 59 and gitChangedLines < 100
then '60-99'
when gitChangedLines > 99 and gitChangedLines < 500
then '100-499'
when gitChangedLines > 499
then '500+'
else ''
end as "gitChangedLinesBucket",
CAST(country_data .2 AS LowCardinality(String)) AS organizationCountryCode,
o.displayName as "organizationName",
toStartOfInterval("updatedAt", INTERVAL 1 hour) + INTERVAL 1 hour as snapshotId
from activityRelations
left join organizations o final on o.id = activityRelations.organizationId
where
memberId IN (SELECT id FROM members_sorted)
and (
(
platform IN ('git', 'gerrit', 'github', 'gitlab')
AND channel
IN (SELECT arrayJoin(i.repositories) FROM insightsProjects i where isNull (i.deletedAt))
AND activityRelations.segmentId IN (
SELECT segmentId
FROM segmentRepositories sr FINAL
WHERE (sr.excluded IS NULL OR sr.excluded = false)
)
)
OR platform NOT IN ('git', 'gerrit', 'github', 'gitlab')
)

TYPE MATERIALIZED
DATASOURCE activityRelations_enrich_clean_snapshot_MV_ds
Loading
Loading