Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/spock_exception_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ typedef struct SpockExceptionLog
NameData slot_name;
XLogRecPtr commit_lsn;
HeapTuple local_tuple;
char initial_error_message[1024];
uint32 failed_action; /* xact_action_counter at time of error */
} SpockExceptionLog;

typedef enum SpockExceptionBehaviour
Expand Down
77 changes: 69 additions & 8 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,13 @@ handle_commit(StringInfo s)
SpockExceptionLog *exception_log;

exception_log = &exception_log_ptr[my_exception_log_index];
elog(LOG, "SPOCK %s: disabling subscription%s%s",
MySubscription->name,
exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "",
exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : "");
exception_log->commit_lsn = InvalidXLogRecPtr;
exception_log->initial_error_message[0] = '\0';
exception_log->failed_action = 0;
MySpockWorker->restart_delay = 0;

elog(ERROR, "SPOCK %s: exiting because subscription disabled",
Expand All @@ -1123,7 +1129,17 @@ handle_commit(StringInfo s)
SpockExceptionLog *exception_log;

exception_log = &exception_log_ptr[my_exception_log_index];
elog(LOG, "SPOCK %s: %s at LSN %X/%X%s%s",
MySubscription->name,
(exception_behaviour == TRANSDISCARD)
? "transaction discarded (TRANSDISCARD)"
: "exception handled (SUB_DISABLE)",
LSN_FORMAT_ARGS(end_lsn),
exception_log->initial_error_message[0] != '\0' ? ". Initial error: " : "",
exception_log->initial_error_message[0] != '\0' ? exception_log->initial_error_message : "");
exception_log->commit_lsn = InvalidXLogRecPtr;
exception_log->initial_error_message[0] = '\0';
exception_log->failed_action = 0;
MySpockWorker->restart_delay = 0;

elog(ERROR, "SPOCK %s: exception handling had no exception(s) "
Expand Down Expand Up @@ -1272,6 +1288,7 @@ handle_commit(StringInfo s)

xact_action_counter = 0;
remote_xid = InvalidTransactionId;
xact_had_exception = false;

/*
* This is the only place we can reset the use_try_block = false without
Expand Down Expand Up @@ -1546,8 +1563,15 @@ handle_insert(StringInfo s)
}

/* Let's create an exception log entry if true. */
log_insert_exception(failed, edata ? edata->message : NULL, rel,
NULL, &newtup, "INSERT");
{
char *err_msg = failed ? (edata ? edata->message : NULL) :
(xact_action_counter ==
exception_log_ptr[my_exception_log_index].failed_action &&
exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ?
exception_log_ptr[my_exception_log_index].initial_error_message : NULL;

log_insert_exception(failed, err_msg, rel, NULL, &newtup, "INSERT");
}
}
else
{
Expand Down Expand Up @@ -1707,8 +1731,16 @@ handle_update(StringInfo s)
}

/* Let's create an exception log entry if true. */
log_insert_exception(failed, edata ? edata->message : NULL, rel,
hasoldtup ? &oldtup : NULL, &newtup, "UPDATE");
{
char *err_msg = failed ? (edata ? edata->message : NULL) :
(xact_action_counter ==
exception_log_ptr[my_exception_log_index].failed_action &&
exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ?
exception_log_ptr[my_exception_log_index].initial_error_message : NULL;

log_insert_exception(failed, err_msg, rel,
hasoldtup ? &oldtup : NULL, &newtup, "UPDATE");
}
}
else
{
Expand Down Expand Up @@ -1804,8 +1836,15 @@ handle_delete(StringInfo s)
}

/* Let's create an exception log entry if true. */
log_insert_exception(failed, edata ? edata->message : NULL, rel,
&oldtup, NULL, "DELETE");
{
char *err_msg = failed ? (edata ? edata->message : NULL) :
(xact_action_counter ==
exception_log_ptr[my_exception_log_index].failed_action &&
exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ?
exception_log_ptr[my_exception_log_index].initial_error_message : NULL;

log_insert_exception(failed, err_msg, rel, &oldtup, NULL, "DELETE");
}
}
else
{
Expand Down Expand Up @@ -2396,7 +2435,7 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started)
{
bool failed = false;
char *sql = NULL;
ErrorData *edata;
ErrorData *edata = NULL;

/*
* Start transaction before making any changes to Spock's internal state.
Expand Down Expand Up @@ -2438,14 +2477,22 @@ handle_sql_or_exception(QueuedMessage *queued_message, bool tx_just_started)

/* Let's create an exception log entry if true. */
if (should_log_exception(failed))
{
char *err_msg = failed ? edata->message :
(xact_action_counter ==
exception_log_ptr[my_exception_log_index].failed_action &&
exception_log_ptr[my_exception_log_index].initial_error_message[0] != '\0') ?
exception_log_ptr[my_exception_log_index].initial_error_message : NULL;

add_entry_to_exception_log(remote_origin_id,
replorigin_session_origin_timestamp,
remote_xid,
0, 0,
NULL, NULL, NULL, NULL,
sql, queued_message->role,
"SQL",
(failed) ? edata->message : NULL);
err_msg);
}
}
else
{
Expand Down Expand Up @@ -3315,6 +3362,20 @@ apply_work(PGconn *streamConn)
MemoryContextSwitchTo(MessageContext);
elog(LOG, "SPOCK: caught initial exception - %s", edata->message);

/*
* Save the initial error message and which action triggered it.
* On the retry pass, the matching row gets this message in
* exception_log; all other rows get NULL ("unavailable").
*/
if (exception_log_ptr != NULL)
{
snprintf(exception_log_ptr[my_exception_log_index].initial_error_message,
sizeof(exception_log_ptr[my_exception_log_index].initial_error_message),
"%s", edata->message);
exception_log_ptr[my_exception_log_index].failed_action =
xact_action_counter;
}

FlushErrorState();

MemoryContextReset(MessageContext);
Expand Down
6 changes: 1 addition & 5 deletions src/spock_exception_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,8 @@ add_entry_to_exception_log(Oid remote_origin, TimestampTz remote_commit_ts,
values[Anum_exception_log_ddl_user - 1] = CStringGetTextDatum(ddl_user);
}

/*
* The error_message column of the spock.exception_log table is marked as NOT NULL,
* but we don't always have a valid error message.
*/
if (error_message == NULL)
values[Anum_exception_log_error_message - 1] = CStringGetTextDatum("unknown");
values[Anum_exception_log_error_message - 1] = CStringGetTextDatum("unavailable");
else
values[Anum_exception_log_error_message - 1] = CStringGetTextDatum(error_message);
values[Anum_exception_log_retry_errored_at - 1] = TimestampTzGetDatum(GetCurrentTimestamp());
Expand Down
10 changes: 5 additions & 5 deletions tests/regress/expected/replication_set.out
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ ORDER BY command_counter;
-----------------+--------------+------------+-----------+----------------------------------------------------+---------------------------
1 | | | INSERT | | Spock can't find relation
2 | | | INSERT | | Spock can't find relation
3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown
3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable
4 | | | INSERT | | Spock can't find relation
5 | | | INSERT | | Spock can't find relation
6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown
6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable
(6 rows)

\c :provider_dsn
Expand Down Expand Up @@ -564,13 +564,13 @@ ORDER BY command_counter;
-----------------+--------------+-------------+-----------+----------------------------------------------------+--------------------------------------------------------------------------------------------------------
1 | | | INSERT | | Spock can't find relation
2 | | | INSERT | | Spock can't find relation
3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown
3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable
4 | | | INSERT | | Spock can't find relation
5 | | | INSERT | | Spock can't find relation
6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unknown
6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | unavailable
7 | | | UPDATE | | Spock can't find relation
8 | | | UPDATE | | Spock can't find relation
9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | unknown
9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | unavailable
10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u)
(10 rows)

Expand Down
1 change: 1 addition & 0 deletions tests/tap/schedule
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ test: 010_zodan_add_remove_python
# Test could timeout while waiting; 009 and 010 have coverage
#test: 012_zodan_basics

test: 013_exception_handling
test: 013_origin_change_restore

# Tests, consuming too much time to be launched on each check:
Expand Down
Loading
Loading