Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions etl-api/migrations/20250827000000_base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
create schema if not exists app;

-- Tenants
create table app.tenants (
create table if not exists app.tenants (
id text primary key,
name text not null,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);

-- Images
create table app.images (
create table if not exists app.images (
id bigint generated always as identity primary key,
name text not null,
is_default boolean not null,
Expand All @@ -22,12 +22,12 @@ create table app.images (
);

-- Ensure at most one default image exists
create unique index images_one_default_idx
create unique index if not exists images_one_default_idx
on app.images (is_default)
where is_default = true;

-- Destinations
create table app.destinations (
create table if not exists app.destinations (
id bigint generated always as identity primary key,
tenant_id text not null references app.tenants (id) on delete cascade,
name text not null,
Expand All @@ -36,10 +36,10 @@ create table app.destinations (
updated_at timestamptz not null default now()
);

create index idx_destinations_tenant_id_id on app.destinations (tenant_id, id);
create index if not exists idx_destinations_tenant_id_id on app.destinations (tenant_id, id);

-- Sources
create table app.sources (
create table if not exists app.sources (
id bigint generated always as identity primary key,
tenant_id text not null references app.tenants (id) on delete cascade,
name text not null,
Expand All @@ -48,21 +48,21 @@ create table app.sources (
updated_at timestamptz not null default now()
);

create index idx_sources_tenant_id_id on app.sources (tenant_id, id);
create index if not exists idx_sources_tenant_id_id on app.sources (tenant_id, id);

-- Replicators
create table app.replicators (
create table if not exists app.replicators (
id bigint generated always as identity primary key,
tenant_id text not null references app.tenants (id) on delete cascade,
image_id bigint not null references app.images (id),
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);

create index idx_replicators_tenant_id_id on app.replicators (tenant_id, id);
create index if not exists idx_replicators_tenant_id_id on app.replicators (tenant_id, id);

-- Pipelines
create table app.pipelines (
create table if not exists app.pipelines (
id bigint generated always as identity primary key,
tenant_id text not null references app.tenants (id) on delete cascade,
source_id bigint not null references app.sources (id),
Expand All @@ -74,4 +74,4 @@ create table app.pipelines (
unique (tenant_id, source_id, destination_id)
);

create index idx_pipelines_tenant_id_id on app.pipelines (tenant_id, id);
create index if not exists idx_pipelines_tenant_id_id on app.pipelines (tenant_id, id);
42 changes: 26 additions & 16 deletions etl-replicator/migrations/20250827000000_base.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
-- Base schema for etl-replicator store (applied on the source DB)

-- Ensure etl schema exists (also set by runtime, but safe here)
create schema if not exists etl;

-- Enum for table replication state
create type etl.table_state as enum (
'init',
'data_sync',
'finished_copy',
'sync_done',
'ready',
'errored'
);
do $$
begin
if not exists (
select 1
from pg_type t
join pg_namespace n on n.oid = t.typnamespace
where t.typname = 'table_state' and n.nspname = 'etl'
) then
create type etl.table_state as enum (
'init',
'data_sync',
'finished_copy',
'sync_done',
'ready',
'errored'
);
end if;
end
$$;

-- Replication state
create table etl.replication_state (
create table if not exists etl.replication_state (
id bigint generated always as identity primary key,
pipeline_id bigint not null,
table_id oid not null,
Expand All @@ -27,12 +37,12 @@ create table etl.replication_state (
);

-- Ensures that there is only one current state per pipeline/table
create unique index uq_replication_state_current_true
create unique index if not exists uq_replication_state_current_true
on etl.replication_state (pipeline_id, table_id)
where is_current = true;

-- Table schemas (per pipeline, per table)
create table etl.table_schemas (
create table if not exists etl.table_schemas (
id bigint generated always as identity primary key,
pipeline_id bigint not null,
table_id oid not null,
Expand All @@ -43,11 +53,11 @@ create table etl.table_schemas (
unique (pipeline_id, table_id)
);

create index idx_table_schemas_pipeline_table
create index if not exists idx_table_schemas_pipeline_table
on etl.table_schemas (pipeline_id, table_id);

-- Columns for stored schemas
create table etl.table_columns (
create table if not exists etl.table_columns (
id bigint generated always as identity primary key,
table_schema_id bigint not null references etl.table_schemas(id) on delete cascade,
column_name text not null,
Expand All @@ -61,11 +71,11 @@ create table etl.table_columns (
unique (table_schema_id, column_order)
);

create index idx_table_columns_order
create index if not exists idx_table_columns_order
on etl.table_columns (table_schema_id);

-- Source-to-destination table id mappings
create table etl.table_mappings (
create table if not exists etl.table_mappings (
id bigint generated always as identity primary key,
pipeline_id bigint not null,
source_table_id oid not null,
Expand Down