Skip to content

Commit

Permalink
Fix for bug282.
Browse files Browse the repository at this point in the history
There was a problem with the new COPY protocol when
a data type or domain used in a replicated column
is not in slon's search_path. SPI does not provide
a mechanism to get the namespace name of a columns
data type.

Instead of adding explicit type casting to the apply
queries and handing the data in as TEXT Datums, we
now use the same technique that PL/pgSQL uses at
at least since 8.3 and convert the TEXT datums into
the requested data type ourselves.
  • Loading branch information
Jan Wieck authored and Jan Wieck committed Dec 14, 2012
1 parent d23fed0 commit 47c6f38
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 29 deletions.
116 changes: 89 additions & 27 deletions src/backend/slony1_funcs.c
Expand Up @@ -165,6 +165,10 @@ typedef struct apply_cache_entry
struct apply_cache_entry *prev;
struct apply_cache_entry *next;

FmgrInfo *finfo_input;
Oid *typioparam;
int32 *typmod;

#ifdef APPLY_CACHE_VERIFY
char *verifyKey;
int evicted;
Expand Down Expand Up @@ -1289,6 +1293,20 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)

/* elog(NOTICE, "cache entry for %s NOT found", cacheKey); */

/*
* Allocate memory for the function call info to cast
* all datums from TEXT to the required Datum type.
*/
oldContext = MemoryContextSwitchTo(applyCacheContext);
cacheEnt->finfo_input = (FmgrInfo *)palloc(sizeof(FmgrInfo) * (cmdargsn / 2));
cacheEnt->typioparam = (Oid *)palloc(sizeof(Oid) * (cmdargsn / 2));
cacheEnt->typmod = (int32 *)palloc(sizeof(int32) * (cmdargsn / 2));
MemoryContextSwitchTo(oldContext);

if (cacheEnt->finfo_input == NULL || cacheEnt->typioparam == NULL ||
cacheEnt->typmod == NULL)
elog(ERROR, "Slony-I: out of memory in logApply()");

#ifdef APPLY_CACHE_VERIFY

/*
Expand Down Expand Up @@ -1370,27 +1388,36 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
*/
for (i = 0; i < cmdargsn; i += 2)
{
char *coltype;
int colnum;
Oid coltype;
Oid typinput;

applyQueryIncrease();

/*
* Lookup the column data type in the target relation.
* Lookup the column data type in the target relation and
* remember everything we need to know later to
* cast TEXT to the correct column Datum.
*/
coltype = SPI_gettype(target_rel->rd_att,
SPI_fnumber(target_rel->rd_att, querycolnames[i / 2]));
if (coltype == NULL)
colnum = SPI_fnumber(target_rel->rd_att, querycolnames[i / 2]);
coltype = SPI_gettypeid(target_rel->rd_att, colnum);
if (coltype == InvalidOid)
elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()",
querycolnames[i / 2]);
getTypeInputInfo(coltype, &typinput,
&(cacheEnt->typioparam[i / 2]));
fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2]));
cacheEnt->typmod[i / 2] =
target_rel->rd_att->attrs[colnum - 1]->atttypmod;

/*
* Add the parameter to the query string
*/
sprintf(applyQueryPos, "%s$%d::%s", (i == 0) ? "" : ", ",
i / 2 + 1, coltype);
sprintf(applyQueryPos, "%s$%d", (i == 0) ? "" : ", ",
i / 2 + 1);
applyQueryPos += strlen(applyQueryPos);

querytypes[i / 2] = TEXTOID;
querytypes[i / 2] = coltype;
}

/*
Expand Down Expand Up @@ -1423,22 +1450,30 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
for (i = 0; i < cmdargsn; i += 2)
{
char *colname;
char *coltype;
int colnum;
Oid coltype;
Oid typinput;

applyQueryIncrease();

/*
* Get the column name and data type.
* Get the column name and data type as well as everything
* needed later to cast TEXT to the correct input Datum.
*/
if (cmdargsnulls[i])
elog(ERROR, "Slony-I: column name in log_cmdargs is NULL");
colname = DatumGetCString(DirectFunctionCall1(
textout, cmdargs[i]));
coltype = SPI_gettype(target_rel->rd_att,
SPI_fnumber(target_rel->rd_att, colname));
if (coltype == NULL)
colnum = SPI_fnumber(target_rel->rd_att, colname);
coltype = SPI_gettypeid(target_rel->rd_att, colnum);
if (coltype == InvalidOid)
elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()",
colname);
getTypeInputInfo(coltype, &typinput,
&(cacheEnt->typioparam[i / 2]));
fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2]));
cacheEnt->typmod[i / 2] =
target_rel->rd_att->attrs[colnum - 1]->atttypmod;

/*
* Special case if there were no columns updated. We tell
Expand Down Expand Up @@ -1468,25 +1503,25 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
* This is inside the SET clause. Add the <colname> =
* $n::<coltype> separated by comma.
*/
sprintf(applyQueryPos, "%s%s = $%d::%s",
sprintf(applyQueryPos, "%s%s = $%d",
(i > 0) ? ", " : "",
slon_quote_identifier(colname),
i / 2 + 1, coltype);
i / 2 + 1);
}
else
{
/*
* This is in the WHERE clause. Same as above but
* separated by AND.
*/
sprintf(applyQueryPos, "%s%s = $%d::%s",
sprintf(applyQueryPos, "%s%s = $%d",
(i > cmdupdncols * 2) ? " AND " : "",
slon_quote_identifier(colname),
i / 2 + 1, coltype);
i / 2 + 1);
}
applyQueryPos += strlen(applyQueryPos);

querytypes[i / 2] = TEXTOID;
querytypes[i / 2] = coltype;
}

strcpy(applyQueryPos, ";");
Expand All @@ -1510,31 +1545,39 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)

for (i = 0; i < cmdargsn; i += 2)
{
int colnum;
char *colname;
char *coltype;
Oid coltype;
Oid typinput;

applyQueryIncrease();

/*
* Add <colname> = $n::<coltype> separated by comma.
* Add <colname> = $n separated by comma.
*/
if (cmdargsnulls[i])
elog(ERROR, "Slony-I: column name in log_cmdargs is NULL");
colname = DatumGetCString(DirectFunctionCall1(
textout, cmdargs[i]));
coltype = SPI_gettype(target_rel->rd_att,
SPI_fnumber(target_rel->rd_att, colname));
if (coltype == NULL)
colnum = SPI_fnumber(target_rel->rd_att, colname);
coltype = SPI_gettypeid(target_rel->rd_att, colnum);
if (coltype == InvalidOid)
elog(ERROR, "Slony-I: type lookup for column %s failed in logApply()",
colname);
sprintf(applyQueryPos, "%s%s = $%d::%s",
getTypeInputInfo(coltype, &typinput,
&(cacheEnt->typioparam[i / 2]));
fmgr_info(typinput, &(cacheEnt->finfo_input[i / 2]));
cacheEnt->typmod[i / 2] =
target_rel->rd_att->attrs[colnum - 1]->atttypmod;

sprintf(applyQueryPos, "%s%s = $%d",
(i > 0) ? " AND " : "",
slon_quote_identifier(colname),
i / 2 + 1, coltype);
i / 2 + 1);

applyQueryPos += strlen(applyQueryPos);

querytypes[i / 2] = TEXTOID;
querytypes[i / 2] = coltype;
}

strcpy(applyQueryPos, ";");
Expand Down Expand Up @@ -1610,6 +1653,11 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)
apply_num_evict++;

SPI_freeplan(evict->plan);
oldContext = MemoryContextSwitchTo(applyCacheContext);
pfree(cacheEnt->finfo_input);
pfree(cacheEnt->typioparam);
pfree(cacheEnt->typmod);
MemoryContextSwitchTo(oldContext);
evict->plan = NULL;
#ifdef APPLY_CACHE_VERIFY
evict->evicted = 1;
Expand Down Expand Up @@ -1670,11 +1718,25 @@ _Slony_I_logApply(PG_FUNCTION_ARGS)

for (i = 0; i < cmdargsn; i += 2)
{
queryvals[i / 2] = cmdargs[i + 1];
char *tmpval;

if (cmdargsnulls[i + 1])
{
queryvals[i / 2] = (Datum)0;
querynulls[i / 2] = 'n';
}
else
{
tmpval = DatumGetCString(DirectFunctionCall1(textout,
cmdargs[i + 1]));
queryvals[i / 2] = InputFunctionCall(
&(cacheEnt->finfo_input[i / 2]),
tmpval,
cacheEnt->typioparam[i / 2],
cacheEnt->typmod[i / 2]);
pfree(tmpval);
querynulls[i / 2] = ' ';
}
}
querynulls[cmdargsn / 2] = '\0';

Expand Down
4 changes: 2 additions & 2 deletions src/slon/remote_worker.c
Expand Up @@ -4766,7 +4766,7 @@ sync_helper(void *cdata, PGconn *local_conn)
}

res = PQgetResult(dbconn);
if (PQresultStatus(res) < 0)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY OUT: %s",
node->no_id, provider->no_id,
Expand All @@ -4776,7 +4776,7 @@ sync_helper(void *cdata, PGconn *local_conn)
PQclear(res);

res = PQgetResult(local_conn);
if (PQresultStatus(res) < 0)
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
slon_log(SLON_ERROR, "remoteWorkerThread_%d_%d: error at end of COPY IN: %s",
node->no_id, provider->no_id,
Expand Down

0 comments on commit 47c6f38

Please sign in to comment.