Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions priv/migrations/0000000000000-create.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
-module('0000000000000-create').

-export([perform/2]).

-spec perform(_, _) -> _.
perform(Connection, MigrationOpts) ->
NsId = proplists:get_value(namespace, MigrationOpts),
#{
processes := ProcessesTable,
tasks := TaskTable,
schedule := ScheduleTable,
running := RunningTable,
events := EventsTable
} = prg_pg_utils:tables(NsId),
{ok, _, [{IsProcessStatusExists}]} = epg_pool:query(
Connection,
"select exists (select 1 from pg_type where typname = 'process_status')"
),
_ =
case IsProcessStatusExists of
true ->
ok;
false ->
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TYPE process_status AS ENUM ('running', 'error')"
)
end,
%% create type task_status if not exists
{ok, _, [{IsTaskStatusExists}]} = epg_pool:query(
Connection,
"select exists (select 1 from pg_type where typname = 'task_status')"
),
_ =
case IsTaskStatusExists of
true ->
ok;
false ->
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TYPE task_status AS ENUM "
"('waiting', 'running', 'blocked', 'error', 'finished', 'cancelled')"
)
end,
%% create type task_type if not exists
{ok, _, [{IsTaskTypeExists}]} = epg_pool:query(
Connection,
"select exists (select 1 from pg_type where typname = 'task_type')"
),
_ =
case IsTaskTypeExists of
true ->
ok;
false ->
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TYPE task_type AS ENUM ('init', 'timeout', 'call', 'notify', 'repair', 'remove')"
)
end,
%% create processes table
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TABLE IF NOT EXISTS " ++ ProcessesTable ++
" ("
"process_id VARCHAR(80) PRIMARY KEY, "
"status process_status NOT NULL, "
"detail TEXT, "
"aux_state BYTEA, "
"created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "
"metadata JSONB)"
),
%% create tasks table
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TABLE IF NOT EXISTS " ++ TaskTable ++
" ("
"task_id BIGSERIAL PRIMARY KEY, "
"process_id VARCHAR(80) NOT NULL, "
"task_type task_type NOT NULL, "
"status task_status NOT NULL, "
"scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, "
"running_time TIMESTAMP WITH TIME ZONE, "
"finished_time TIMESTAMP WITH TIME ZONE, "
"args BYTEA, "
"metadata JSONB, "
"idempotency_key VARCHAR(80) UNIQUE, "
"response BYTEA, "
"blocked_task BIGINT REFERENCES " ++ TaskTable ++
" (task_id), "
"last_retry_interval INTEGER NOT NULL, "
"attempts_count SMALLINT NOT NULL, "
"context BYTEA, "
"FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++ " (process_id))"
),
%% create constraint for process error cause
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TABLE " ++ ProcessesTable ++
" ADD COLUMN IF NOT EXISTS corrupted_by BIGINT REFERENCES " ++ TaskTable ++ "(task_id)"
),

%% create schedule table
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TABLE IF NOT EXISTS " ++ ScheduleTable ++
" ("
"task_id BIGINT PRIMARY KEY, "
"process_id VARCHAR(80) NOT NULL, "
"task_type task_type NOT NULL, "
"status task_status NOT NULL, "
"scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, "
"args BYTEA, "
"metadata JSONB, "
"last_retry_interval INTEGER NOT NULL, "
"attempts_count SMALLINT NOT NULL, "
"context BYTEA, "
"FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++
" (process_id), "
"FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))"
),

%% create running table
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TABLE IF NOT EXISTS " ++ RunningTable ++
" ("
"process_id VARCHAR(80) PRIMARY KEY, "
"task_id BIGINT NOT NULL, "
"task_type task_type NOT NULL, "
"status task_status NOT NULL, "
"scheduled_time TIMESTAMP WITH TIME ZONE NOT NULL, "
"running_time TIMESTAMP WITH TIME ZONE NOT NULL, "
"args BYTEA, "
"metadata JSONB, "
"last_retry_interval INTEGER NOT NULL, "
"attempts_count SMALLINT NOT NULL, "
"context BYTEA, "
"FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++
" (process_id), "
"FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))"
),

%% create events table
{ok, _, _} = epg_pool:query(
Connection,
"CREATE TABLE IF NOT EXISTS " ++ EventsTable ++
" ("
"process_id VARCHAR(80) NOT NULL, "
"task_id BIGINT NOT NULL, "
"event_id SMALLINT NOT NULL, "
"timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(), "
"metadata JSONB, "
"payload BYTEA NOT NULL, "
"PRIMARY KEY (process_id, event_id), "
"FOREIGN KEY (process_id) REFERENCES " ++ ProcessesTable ++
" (process_id), "
"FOREIGN KEY (task_id) REFERENCES " ++ TaskTable ++ " (task_id))"
),
ok.
35 changes: 35 additions & 0 deletions priv/migrations/0000000000001-expand-id.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-module('0000000000001-expand-id').

-export([perform/2]).

-spec perform(_, _) -> _.
perform(Connection, MigrationOpts) ->
NsId = proplists:get_value(namespace, MigrationOpts),
#{
processes := ProcessesTable,
tasks := TaskTable,
schedule := ScheduleTable,
running := RunningTable,
events := EventsTable
} = prg_pg_utils:tables(NsId),
lists:foreach(
fun(T) ->
TableStr = string:replace(T, "\"", "'", all),
{ok, _, [{VarSize}]} = epg_pool:query(
Connection,
"SELECT character_maximum_length FROM information_schema.columns "
"WHERE table_name = " ++ TableStr ++ " AND column_name = 'process_id'"
),
case VarSize < 256 of
true ->
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TABLE " ++ T ++ "ALTER COLUMN process_id TYPE VARCHAR(256)"
);
false ->
skip
end
end,
[ProcessesTable, TaskTable, ScheduleTable, RunningTable, EventsTable]
),
ok.
43 changes: 43 additions & 0 deletions priv/migrations/0000000000002-add-previous-status.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-module('0000000000002-add-previous-status').

-export([perform/2]).

-spec perform(_, _) -> _.
perform(Connection, MigrationOpts) ->
NsId = proplists:get_value(namespace, MigrationOpts),
#{
processes := ProcessesTable
} = prg_pg_utils:tables(NsId),
ProcessesTableStr = string:replace(ProcessesTable, "\"", "'", all),
{ok, _, [{IsPrevStatusExists}]} = epg_pool:query(
Connection,
"SELECT exists (SELECT 1 FROM information_schema.columns WHERE table_schema = 'public' "
" AND table_name = " ++ ProcessesTableStr ++ " AND column_name = 'previous_status')"
),
_ =
case IsPrevStatusExists of
true ->
ok;
false ->
%% create columns
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TABLE " ++ ProcessesTable ++
" ADD COLUMN previous_status process_status, "
" ADD COLUMN status_changed_at TIMESTAMP WITH TIME ZONE"
),
%% set values
{ok, _} = epg_pool:query(
Connection,
"UPDATE " ++ ProcessesTable ++
" SET previous_status = status, status_changed_at = created_at"
),
%% set NOT NULL constraint
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TABLE " ++ ProcessesTable ++
" ALTER COLUMN previous_status SET NOT NULL,"
" ALTER COLUMN status_changed_at SET NOT NULL"
)
end,
ok.
22 changes: 22 additions & 0 deletions priv/migrations/0000000000003-add-init-status.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-module('0000000000003-add-init-status').

-export([perform/2]).

-spec perform(_, _) -> _.
perform(Connection, _MigrationOpts) ->
{ok, _, [{IsInitStatusExists}]} = epg_pool:query(
Connection,
"select exists (SELECT 1 FROM pg_enum WHERE "
" enumtypid = 'process_status'::regtype and enumlabel = 'init')"
),
_ =
case IsInitStatusExists of
true ->
ok;
false ->
{ok, _, _} = epg_pool:query(
Connection,
"ALTER TYPE process_status ADD VALUE 'init'"
)
end,
ok.
57 changes: 57 additions & 0 deletions priv/migrations/0000000000004-fix-indexes.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
-module('0000000000004-fix-indexes').

-export([perform/2]).

-spec perform(_, _) -> _.
perform(Connection, MigrationOpts) ->
NsId = proplists:get_value(namespace, MigrationOpts),
#{
tasks := TaskTable,
schedule := ScheduleTable,
running := RunningTable,
events := EventsTable
} = prg_pg_utils:tables(NsId),
{ok, _, _} = drop_index(Connection, "process_idx"),
{ok, _, _} = drop_index(Connection, "task_idx"),
{ok, _, _} = create_index(Connection, EventsTable, "process_idx", "(process_id)"),
{ok, _, _} = create_index(Connection, TaskTable, "process_idx", "(process_id)"),
{ok, _, _} = create_index(Connection, ScheduleTable, "process_idx", "(process_id)"),
{ok, _, _} = create_index(Connection, RunningTable, "task_idx", "(task_id)"),
ok.

drop_index(Connection, IndexName) ->
{ok, _, [{IsIndexExists}]} = epg_pool:query(
Connection,
"SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = $1)",
[IndexName]
),
case IsIndexExists of
true ->
epg_pool:query(Connection, "DROP INDEX " ++ IndexName);
false ->
{ok, [], []}
end.

create_index(Connection, Table, Index, Fields) ->
create_index(Connection, Table, Index, " HASH ", Fields).

create_index(Connection, Table, Index, IndexType, Fields) ->
%% unwrap table name and wrap index name
IndexName = "\"" ++ string:replace(Table, "\"", "", all) ++ "_" ++ Index ++ "\"",
%% re-wrap for using in WHERE section
IndexNameStr = string:replace(IndexName, "\"", "'", all),
{ok, _, [{IsIndexExists}]} = epg_pool:query(
Connection,
"SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = " ++ IndexNameStr ++ " )"
),
case IsIndexExists of
true ->
{ok, [], []};
false ->
epg_pool:query(
Connection,
"CREATE INDEX " ++ IndexName ++
" on " ++ Table ++
" USING " ++ IndexType ++ " " ++ Fields
)
end.
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{thrift, {git, "https://github.com/valitydev/thrift_erlang.git", {tag, "v1.0.0"}}},
{mg_proto, {git, "https://github.com/valitydev/machinegun-proto.git", {branch, "master"}}},
{epg_connector, {git, "https://github.com/valitydev/epg_connector.git", {tag, "v0.0.5"}}},
{epg_migrator, {git, "https://github.com/valitydev/epg_migrator.git", {branch, "main"}}},
{opentelemetry_api, "1.4.0"}
]}.

Expand Down
7 changes: 7 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
{git,"https://github.com/valitydev/epg_connector.git",
{ref,"939a0d4ab3f7561a79b45381bbe13029d9263006"}},
0},
{<<"epg_migrator">>,
{git,"https://github.com/valitydev/epg_migrator.git",
{ref,"8633c43fb4d022355c1498c0effa9ecf4098a67f"}},
0},
{<<"epgsql">>,
{git,"https://github.com/epgsql/epgsql.git",
{ref,"28e9f84c95065a51e92baeb37d2cf1687fc4b9ce"}},
1},
{<<"erlydtl">>,{pkg,<<"erlydtl">>,<<"0.14.0">>},1},
{<<"jsone">>,{pkg,<<"jsone">>,<<"1.8.0">>},2},
{<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1},
{<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.1.10">>},1},
Expand All @@ -32,6 +37,7 @@
{pkg_hash,[
{<<"brod">>, <<"51F4DFF17ED43A806558EBD62CC88E7B35AED336D1BA1F3DE2D010F463D49736">>},
{<<"crc32cer">>, <<"B550DA6D615FEB72A882D15D020F8F7DEE72DFB2CB1BCDF3B1EE8DC2AFD68CFC">>},
{<<"erlydtl">>, <<"964B2DC84F8C17ACFAA69C59BA129EF26AC45D2BA898C3C6AD9B5BDC8BA13CED">>},
{<<"jsone">>, <<"347FF1FA700E182E1F9C5012FA6D737B12C854313B9AE6954CA75D3987D6C06D">>},
{<<"jsx">>, <<"D12516BAA0BB23A59BB35DCCAF02A1BD08243FCBB9EFE24F2D9D056CCFF71268">>},
{<<"kafka_protocol">>, <<"F917B6C90C8DF0DE2B40A87D6B9AE1CFCE7788E91A65818E90E40CF76111097A">>},
Expand All @@ -42,6 +48,7 @@
{pkg_hash_ext,[
{<<"brod">>, <<"88584FDEBA746AA6729E2A1826416C10899954F68AF93659B3C2F38A2DCAA27C">>},
{<<"crc32cer">>, <<"A39B8F0B1990AC1BF06C3A247FC6A178B740CDFC33C3B53688DC7DD6B1855942">>},
{<<"erlydtl">>, <<"D80EC044CD8F58809C19D29AC5605BE09E955040911B644505E31E9DD8143431">>},
{<<"jsone">>, <<"08560B78624A12E0B5E7EC0271EC8CA38EF51F63D84D84843473E14D9B12618C">>},
{<<"jsx">>, <<"0C5CC8FDC11B53CC25CF65AC6705AD39E54ECC56D1C22E4ADB8F5A53FB9427F3">>},
{<<"kafka_protocol">>, <<"DF680A3706EAD8695F8B306897C0A33E8063C690DA9308DB87B462CFD7029D04">>},
Expand Down
7 changes: 5 additions & 2 deletions src/prg_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-export([complete_and_error/4]).
-export([remove_process/3]).
-export([capture_task/3]).
-export([reschedule_task/3]).

%% shared functions
-export([get_task/3]).
Expand All @@ -36,9 +37,7 @@
%% Init operations
-export([db_init/2]).

%-ifdef(TEST).
-export([cleanup/2]).
%-endif.

%%%%%%%%%%%%%%%%%%%%%%%%
%% API handler functions
Expand Down Expand Up @@ -146,6 +145,10 @@ remove_process(#{client := Handler, options := HandlerOpts}, NsId, ProcessId) ->
capture_task(#{client := Handler, options := HandlerOpts}, NsId, TaskId) ->
Handler:capture_task(HandlerOpts, NsId, TaskId).

-spec reschedule_task(storage_opts(), namespace_id(), task()) -> ok | no_return().
reschedule_task(#{client := Handler, options := HandlerOpts}, NsId, Task) ->
Handler:reschedule_task(HandlerOpts, NsId, Task).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Shared functions (recipient required)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down
Loading
Loading