Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions include/spock_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,16 @@ typedef struct SpockGroupEntry
extern void spock_group_shmem_request(void);
extern void spock_group_shmem_startup(int napply_groups);

SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id);
void spock_group_detach(void);
extern SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id,
Oid remote_node_id);
extern void spock_group_detach(void);
extern bool spock_group_progress_update(const SpockApplyProgress *sap);
extern void spock_group_progress_update_ptr(SpockGroupEntry *entry,
const SpockApplyProgress *sap);
SpockApplyProgress *apply_worker_get_progress(void);
SpockGroupEntry *spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id);

/* Iterate all groups */
typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg);
void spock_group_foreach(SpockGroupIterCB cb, void *arg);
extern SpockApplyProgress *apply_worker_get_progress(void);

extern void spock_group_resource_dump(void);
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);
extern void spock_group_progress_update_list(List *lst);

#endif /* SPOCK_GROUP_H */
67 changes: 46 additions & 21 deletions src/spock_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -394,37 +394,34 @@ spock_group_progress_update_ptr(SpockGroupEntry *e,
/*
* apply_worker_get_progress
*
* Return a pointer to the current apply worker's progress payload, or NULL
* Return a pointer to the snapshot of the current apply worker's progress.
*/
SpockApplyProgress *
apply_worker_get_progress(void)
{
static SpockApplyProgress sap;

Assert(MyApplyWorker != NULL);
Assert(MyApplyWorker->apply_group != NULL);

if (MyApplyWorker && MyApplyWorker->apply_group)
return &MyApplyWorker->apply_group->progress;
{
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);
sap = MyApplyWorker->apply_group->progress;
LWLockRelease(SpockCtx->apply_group_master_lock);
}
else
/*
* Should never happen. In production just send the worker into
* exception behaviour without crash.
*/
elog(ERROR, "apply worker has not been fully initialised yet");

return NULL;
return &sap;
}

/*
* spock_group_lookup
*
* Snapshot-read the progress payload for the specified group. Uses HASH_FIND
* to locate the entry.
*
* Returns entry if found, NULL otherwise.
*/
SpockGroupEntry *
spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id)
{
SpockGroupKey key = make_key(dbid, node_id, remote_node_id);
SpockGroupEntry *e;

e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_FIND, NULL);
return e; /* may be NULL */
}
/* Iterate all groups */
typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg);

/*
* spock_group_foreach
Expand All @@ -433,7 +430,7 @@ spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id)
* Caller selects any gating needed for consistency (e.g., take the gate in
* SHARED before calling this if you want a coherent snapshot).
*/
void
static void
spock_group_foreach(SpockGroupIterCB cb, void *arg)
{
HASH_SEQ_STATUS it;
Expand Down Expand Up @@ -649,3 +646,31 @@ spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags)
/* Dump group progress to resource.dat */
spock_group_resource_dump();
}

void
spock_group_progress_update_list(List *lst)
{
ListCell *lc;

foreach (lc, lst)
{
SpockApplyProgress *sap = (SpockApplyProgress *) lfirst(lc);

spock_apply_progress_add_to_wal(sap);

spock_group_progress_update(sap);

elog(LOG, "SPOCK: adjust spock.progress %d->%d to "
"remote_commit_ts='%s' "
"remote_commit_lsn=%llX remote_insert_lsn=%llX",
sap->key.remote_node_id, MySubscription->target->id,
timestamptz_to_str(sap->remote_commit_ts),
sap->remote_commit_lsn, sap->remote_insert_lsn);
}

/*
* Free the list and each object. Be careful here because it is inside
* a memory context that is rarely reset.
*/
list_free_deep(lst);
}
60 changes: 38 additions & 22 deletions src/spock_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,15 @@ ensure_replication_origin(char *slot_name)
}


static void
adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)
static List *
adjust_progress_info(PGconn *origin_conn)
{
const char *originQuery =
"SELECT * FROM spock.progress "
"WHERE node_id = %u AND remote_node_id <> %u";
StringInfoData query;
PGresult *originRes;
List *resultList = NIL;

/*
* Select the current content of the origin's spock.progress table where
Expand All @@ -427,7 +428,10 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)

for (rno = 0; rno < PQntuples(originRes); rno++)
{
SpockApplyProgress sap;
SpockApplyProgress *sap =
MemoryContextAlloc(CacheMemoryContext,
sizeof(SpockApplyProgress));
MemoryContext oldctx;

/*
* Update the remote node's progress entry to what our sync
Expand All @@ -443,10 +447,10 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)
char *last_updated_ts = NULL;
char *updated_by_decode = PQgetvalue(originRes, rno, GP_UPDATED_BY_DECODE);

sap.key.dbid = MyDatabaseId;
sap.key.node_id = MySubscription->target->id;
sap.key.remote_node_id = atooid(remote_node_id);
Assert(OidIsValid(sap.key.remote_node_id));
sap->key.dbid = MyDatabaseId;
sap->key.node_id = MySubscription->target->id;
sap->key.remote_node_id = atooid(remote_node_id);
Assert(OidIsValid(sap->key.remote_node_id));

/* Check: we view only values related to a single database */
Assert(!PQgetisnull(originRes, rno, GP_DBOID));
Expand All @@ -460,31 +464,30 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)
if (!PQgetisnull(originRes, rno, GP_REMOTE_COMMIT_TS))
{
remote_commit_ts = PQgetvalue(originRes, rno, GP_REMOTE_COMMIT_TS);
sap.remote_commit_ts = str_to_timestamptz(remote_commit_ts);
Assert(IS_VALID_TIMESTAMP(sap.remote_commit_ts));
sap->remote_commit_ts = str_to_timestamptz(remote_commit_ts);
Assert(IS_VALID_TIMESTAMP(sap->remote_commit_ts));
}
sap.prev_remote_ts = sap.remote_commit_ts;
sap->prev_remote_ts = sap->remote_commit_ts;

sap.remote_commit_lsn = str_to_lsn(remote_commit_lsn);
sap.remote_insert_lsn = str_to_lsn(remote_insert_lsn);
sap->remote_commit_lsn = str_to_lsn(remote_commit_lsn);
sap->remote_insert_lsn = str_to_lsn(remote_insert_lsn);

/*
* We don't actually receive a single WAL record - just assume
* we've got the last commit only. Don't set it to the Invalid
* value in case someone uses tracking data in state monitoring
* scripts.
*/
sap.received_lsn = str_to_lsn(remote_commit_lsn);
sap->received_lsn = str_to_lsn(remote_commit_lsn);

if (!PQgetisnull(originRes, rno, GP_LAST_UPDATED_TS))
{
last_updated_ts = PQgetvalue(originRes, rno, GP_LAST_UPDATED_TS);
sap.last_updated_ts = str_to_timestamptz(last_updated_ts);
sap->last_updated_ts = str_to_timestamptz(last_updated_ts);

Assert(IS_VALID_TIMESTAMP(sap.last_updated_ts));

if (sap.last_updated_ts < sap.remote_commit_ts)
Assert(IS_VALID_TIMESTAMP(sap->last_updated_ts));

if (sap->last_updated_ts < sap->remote_commit_ts)
/*
* Complaining at the end of the sync we shouldn't flood
* the log
Expand All @@ -498,10 +501,11 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)
MySubscription->origin->id),
errhint("usually it means that the server's clocks are out of sync.")));
}
sap.updated_by_decode = updated_by_decode[0] == 't',
sap->updated_by_decode = updated_by_decode[0] == 't',

/* Update progress */
spock_group_progress_update(&sap);
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
resultList = lappend(resultList, sap);
MemoryContextSwitchTo(oldctx);

elog(LOG, "SPOCK: adjust spock.progress %s->%d to "
"remote_commit_ts='%s' "
Expand All @@ -522,6 +526,7 @@ adjust_progress_info(PGconn *origin_conn, PGconn *target_conn)
PQclear(originRes);

resetStringInfo(&query);
return resultList;
}


Expand Down Expand Up @@ -887,6 +892,7 @@ copy_tables_data(SpockSubscription *sub, const char *origin_dsn,
{
PGconn *origin_conn;
PGconn *target_conn;
List *progress_entries_list = NIL;
ListCell *lc;

/* Connect to origin node. */
Expand Down Expand Up @@ -917,11 +923,21 @@ copy_tables_data(SpockSubscription *sub, const char *origin_dsn,
CHECK_FOR_INTERRUPTS();
}

adjust_progress_info(origin_conn, target_conn);
progress_entries_list = adjust_progress_info(origin_conn);

/* Finish the transactions and disconnect. */
finish_copy_origin_tx(origin_conn);
finish_copy_target_tx(target_conn);

/*
* Update replication progress. We must do it after commit of the COPY.
*
* NOTE:
* It is not obvious we need to arrange progress in case of accidental
* single-table re-sync. But while this machinery serves information goals
* only we just follow the initial logic.
*/
spock_group_progress_update_list(progress_entries_list);
}

/*
Expand Down Expand Up @@ -1005,7 +1021,7 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn,
CHECK_FOR_INTERRUPTS();
}

adjust_progress_info(origin_conn, target_conn);
adjust_progress_info(origin_conn);

/* Finish the transactions and disconnect. */
finish_copy_origin_tx(origin_conn);
Expand Down