Skip to content

Commit

Permalink
Add support for CURRENT_TIMESTAMP and friends.
Browse files Browse the repository at this point in the history
Patch provided by Akio Ishida.
  • Loading branch information
t-ishii committed Nov 10, 2009
1 parent cbf2a19 commit e913eb4
Show file tree
Hide file tree
Showing 17 changed files with 1,491 additions and 24 deletions.
7 changes: 7 additions & 0 deletions Makefile.am
Expand Up @@ -10,6 +10,7 @@ pgpool_SOURCES = pool.h pool_type.h version.h pgpool.conf.sample \
pool_query_cache.c pool_hba.conf.sample sample/pgpool.pam\ pool_query_cache.c pool_hba.conf.sample sample/pgpool.pam\
pool_hba.c pool_path.h pool_path.c pool_ip.h pool_ip.c pool_type.h \ pool_hba.c pool_path.h pool_path.c pool_ip.h pool_ip.c pool_type.h \
ps_status.c strlcpy.c recovery.c pool_relcache.c \ ps_status.c strlcpy.c recovery.c pool_relcache.c \
pool_timestamp.c pool_timestamp.h \
pool_proto_modules.c pool_proto_modules.h pool_proto_modules.c pool_proto_modules.h


pg_md5_SOURCES = pg_md5.c md5.c md5.h pg_md5_SOURCES = pg_md5.c md5.c md5.h
Expand Down Expand Up @@ -67,6 +68,12 @@ EXTRA_DIST = pgpool.8.in sql/system_db.sql sample/pgpool.pam doc/pgpool-ja.html
test/jdbc/expected/autocommit test/jdbc/expected/batch \ test/jdbc/expected/autocommit test/jdbc/expected/batch \
test/jdbc/expected/column test/jdbc/expected/lock test/jdbc/expected/select \ test/jdbc/expected/column test/jdbc/expected/lock test/jdbc/expected/select \
test/jdbc/expected/update test/jdbc/expected/insert \ test/jdbc/expected/update test/jdbc/expected/insert \
test/timestamp/Makefile test/timestamp/input/insert.sql \
test/timestamp/input/update.sql test/timestamp/input/misc.sql \
test/timestamp/expected/insert.out test/timestamp/expected/update.out \
test/timestamp/expected/misc.out test/timestamp/main.c \
test/timestamp/parse_schedule test/timestamp/run-test \
redhat/pgpool.init redhat/pgpool.sysconfig redhat/pgpool.init redhat/pgpool.sysconfig



SUBDIRS = parser pcp SUBDIRS = parser pcp
11 changes: 9 additions & 2 deletions Makefile.in
Expand Up @@ -71,7 +71,7 @@ am_pgpool_OBJECTS = main.$(OBJEXT) child.$(OBJEXT) pool_auth.$(OBJEXT) \
pool_query_cache.$(OBJEXT) pool_hba.$(OBJEXT) \ pool_query_cache.$(OBJEXT) pool_hba.$(OBJEXT) \
pool_path.$(OBJEXT) pool_ip.$(OBJEXT) ps_status.$(OBJEXT) \ pool_path.$(OBJEXT) pool_ip.$(OBJEXT) ps_status.$(OBJEXT) \
strlcpy.$(OBJEXT) recovery.$(OBJEXT) pool_relcache.$(OBJEXT) \ strlcpy.$(OBJEXT) recovery.$(OBJEXT) pool_relcache.$(OBJEXT) \
pool_proto_modules.$(OBJEXT) pool_timestamp.$(OBJEXT) pool_proto_modules.$(OBJEXT)
pgpool_OBJECTS = $(am_pgpool_OBJECTS) pgpool_OBJECTS = $(am_pgpool_OBJECTS)
pgpool_DEPENDENCIES = parser/libsql-parser.a pcp/libpcp.la \ pgpool_DEPENDENCIES = parser/libsql-parser.a pcp/libpcp.la \
parser/nodes.o parser/nodes.o
Expand Down Expand Up @@ -239,6 +239,7 @@ pgpool_SOURCES = pool.h pool_type.h version.h pgpool.conf.sample \
pool_query_cache.c pool_hba.conf.sample sample/pgpool.pam\ pool_query_cache.c pool_hba.conf.sample sample/pgpool.pam\
pool_hba.c pool_path.h pool_path.c pool_ip.h pool_ip.c pool_type.h \ pool_hba.c pool_path.h pool_path.c pool_ip.h pool_ip.c pool_type.h \
ps_status.c strlcpy.c recovery.c pool_relcache.c \ ps_status.c strlcpy.c recovery.c pool_relcache.c \
pool_timestamp.c pool_timestamp.h \
pool_proto_modules.c pool_proto_modules.h pool_proto_modules.c pool_proto_modules.h


pg_md5_SOURCES = pg_md5.c md5.c md5.h pg_md5_SOURCES = pg_md5.c md5.c md5.h
Expand Down Expand Up @@ -281,6 +282,11 @@ EXTRA_DIST = pgpool.8.in sql/system_db.sql sample/pgpool.pam doc/pgpool-ja.html
test/jdbc/expected/autocommit test/jdbc/expected/batch \ test/jdbc/expected/autocommit test/jdbc/expected/batch \
test/jdbc/expected/column test/jdbc/expected/lock test/jdbc/expected/select \ test/jdbc/expected/column test/jdbc/expected/lock test/jdbc/expected/select \
test/jdbc/expected/update test/jdbc/expected/insert \ test/jdbc/expected/update test/jdbc/expected/insert \
test/timestamp/Makefile test/timestamp/input/insert.sql \
test/timestamp/input/update.sql test/timestamp/input/misc.sql \
test/timestamp/expected/insert.out test/timestamp/expected/update.out \
test/timestamp/expected/misc.out test/timestamp/main.c \
test/timestamp/parse_schedule test/timestamp/run-test \
redhat/pgpool.init redhat/pgpool.sysconfig redhat/pgpool.init redhat/pgpool.sysconfig


SUBDIRS = parser pcp SUBDIRS = parser pcp
Expand Down Expand Up @@ -404,6 +410,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_signal.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_signal.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_stream.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_stream.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_system.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_system.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/pool_timestamp.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ps_status.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ps_status.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/recovery.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/recovery.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/strlcpy.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/strlcpy.Po@am__quote@
Expand Down Expand Up @@ -658,7 +665,7 @@ distclean-tags:
distdir: $(DISTFILES) distdir: $(DISTFILES)
$(am__remove_distdir) $(am__remove_distdir)
mkdir $(distdir) mkdir $(distdir)
$(mkdir_p) $(distdir)/doc $(distdir)/redhat $(distdir)/sample $(distdir)/sql $(distdir)/sql/pgpool-recovery $(distdir)/test/jdbc $(distdir)/test/jdbc/expected $(distdir)/test/parser $(distdir)/test/parser/expected $(distdir)/test/parser/input $(mkdir_p) $(distdir)/doc $(distdir)/redhat $(distdir)/sample $(distdir)/sql $(distdir)/sql/pgpool-recovery $(distdir)/test/jdbc $(distdir)/test/jdbc/expected $(distdir)/test/parser $(distdir)/test/parser/expected $(distdir)/test/parser/input $(distdir)/test/timestamp $(distdir)/test/timestamp/expected $(distdir)/test/timestamp/input
@srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \ @srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; \
topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's|.|.|g'`; \
list='$(DISTFILES)'; for file in $$list; do \ list='$(DISTFILES)'; for file in $$list; do \
Expand Down
69 changes: 52 additions & 17 deletions pool_process_query.c
@@ -1,6 +1,6 @@
/* -*-pgsql-c-*- */ /* -*-pgsql-c-*- */
/* /*
* $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.170 2009/11/04 14:01:28 t-ishii Exp $ * $Header: /cvsroot/pgpool/pgpool-II/pool_process_query.c,v 1.171 2009/11/10 10:03:10 t-ishii Exp $
* *
* pgpool: a language independent connection pool server for PostgreSQL * pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii * written by Tatsuo Ishii
Expand Down Expand Up @@ -43,6 +43,7 @@


#include "pool.h" #include "pool.h"
#include "pool_signal.h" #include "pool_signal.h"
#include "pool_timestamp.h"
#include "pool_proto_modules.h" #include "pool_proto_modules.h"


#ifndef FD_SETSIZE #ifndef FD_SETSIZE
Expand Down Expand Up @@ -1895,35 +1896,30 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
char *p; char *p;
int i; int i;
char *name; char *name;
char *rewrite_msg = NULL;
POOL_STATUS ret; POOL_STATUS ret;


for (i=0;i<NUM_BACKENDS;i++)
{
if (VALID_BACKEND(i))
{
if (pool_write(CONNECTION(backend, i), &kind, 1))
return POOL_END;
}
}

if (pool_read(frontend, &sendlen, sizeof(sendlen))) if (pool_read(frontend, &sendlen, sizeof(sendlen)))
{ {
return POOL_END; return POOL_END;
} }


len = ntohl(sendlen) - 4; len = ntohl(sendlen) - 4;


for (i=0;i<NUM_BACKENDS;i++) if (len == 0)
{ {
if (VALID_BACKEND(i)) for (i=0;i<NUM_BACKENDS;i++)
{ {
if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen))) if (VALID_BACKEND(i))
return POOL_END; {
if (pool_write(CONNECTION(backend, i), &kind, 1))
return POOL_END;
if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen)))
return POOL_END;
}
} }
}

if (len == 0)
return POOL_CONTINUE; return POOL_CONTINUE;
}
else if (len < 0) else if (len < 0)
{ {
pool_error("SimpleForwardToBackend: invalid message length"); pool_error("SimpleForwardToBackend: invalid message length");
Expand All @@ -1934,14 +1930,53 @@ POOL_STATUS SimpleForwardToBackend(char kind, POOL_CONNECTION *frontend, POOL_CO
if (p == NULL) if (p == NULL)
return POOL_END; return POOL_END;


if (kind == 'B')
{
Portal *portal = NULL;
char *stmt_name, *portal_name;

portal_name = p;
stmt_name = p + strlen(portal_name) + 1;

if (*stmt_name == '\0')
portal = unnamed_statement;
else
{
portal = lookup_prepared_statement_by_statement(&prepared_list, stmt_name);
}

/* rewrite bind message */
if (REPLICATION && portal && portal->num_tsparams > 0)
{
p = rewrite_msg = bind_rewrite_timestamp(backend, portal, p, &len);
sendlen = htonl(len + 4);
}
}

for (i=0;i<NUM_BACKENDS;i++) for (i=0;i<NUM_BACKENDS;i++)
{ {
if (VALID_BACKEND(i)) if (VALID_BACKEND(i))
{ {
if (pool_write(CONNECTION(backend, i), &kind, 1))
{
free(rewrite_msg);
return POOL_END;
}

if (pool_write(CONNECTION(backend,i), &sendlen, sizeof(sendlen)))
{
free(rewrite_msg);
return POOL_END;
}

if (pool_write_and_flush(CONNECTION(backend, i), p, len)) if (pool_write_and_flush(CONNECTION(backend, i), p, len))
{
free(rewrite_msg);
return POOL_END; return POOL_END;
}
} }
} }
free(rewrite_msg);


if (kind == 'B') /* Bind message */ if (kind == 'B') /* Bind message */
{ {
Expand Down
91 changes: 87 additions & 4 deletions pool_proto_modules.c
@@ -1,6 +1,6 @@
/* -*-pgsql-c-*- */ /* -*-pgsql-c-*- */
/* /*
* $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.22 2009/10/30 05:11:20 t-ishii Exp $ * $Header: /cvsroot/pgpool/pgpool-II/pool_proto_modules.c,v 1.23 2009/11/10 10:03:10 t-ishii Exp $
* *
* pgpool: a language independent connection pool server for PostgreSQL * pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii * written by Tatsuo Ishii
Expand Down Expand Up @@ -45,6 +45,7 @@


#include "pool.h" #include "pool.h"
#include "pool_signal.h" #include "pool_signal.h"
#include "pool_timestamp.h"
#include "pool_proto_modules.h" #include "pool_proto_modules.h"
#include "parser/pool_string.h" #include "parser/pool_string.h"


Expand Down Expand Up @@ -525,17 +526,44 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
{ {
/* check if query is "COMMIT" or "ROLLBACK" */ /* check if query is "COMMIT" or "ROLLBACK" */
commit = is_commit_query(node); commit = is_commit_query(node);
free_parser();


/* /*
* Query is not commit/rollback * Query is not commit/rollback
*/ */
if (!commit) if (!commit)
{ {
char *rewrite_query;

if (node)
{
Portal *portal = NULL;

if (IsA(node, PrepareStmt))
{
portal = pending_prepared_portal;
portal->num_tsparams = 0;
}
else if (IsA(node, ExecuteStmt))
portal = lookup_prepared_statement_by_statement(
&prepared_list, ((ExecuteStmt *) node)->name);

/* rewrite `now()' to timestamp literal */
rewrite_query = rewrite_timestamp(backend, node, false, portal);
if (rewrite_query != NULL)
{
string = rewrite_query;
len = strlen(string) + 1;
}

}

/* Send the query to master node */ /* Send the query to master node */


if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE) if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
{
free_parser();
return POOL_END; return POOL_END;
}


if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE) if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
{ {
Expand All @@ -547,6 +575,7 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
cancel_packet.key= MASTER_CONNECTION(backend)->key; cancel_packet.key= MASTER_CONNECTION(backend)->key;
cancel_request(&cancel_packet); cancel_request(&cancel_packet);


free_parser();
return POOL_END; return POOL_END;
} }


Expand All @@ -570,7 +599,10 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
continue; continue;


if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE) if (send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend)) != POOL_CONTINUE)
{
free_parser();
return POOL_END; return POOL_END;
}
} }


/* Wait for nodes othan than the master node */ /* Wait for nodes othan than the master node */
Expand All @@ -589,6 +621,7 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
cancel_packet.key= MASTER_CONNECTION(backend)->key; cancel_packet.key= MASTER_CONNECTION(backend)->key;
cancel_request(&cancel_packet); cancel_request(&cancel_packet);


free_parser();
return POOL_END; return POOL_END;
} }
} }
Expand All @@ -597,7 +630,10 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
if (commit) if (commit)
{ {
if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE) if (send_simplequery_message(MASTER(backend), len, string, MAJOR(backend)) != POOL_CONTINUE)
{
free_parser();
return POOL_END; return POOL_END;
}


if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE) if (wait_for_query_response(frontend, MASTER(backend), string, MAJOR(backend)) != POOL_CONTINUE)
{ {
Expand All @@ -609,12 +645,14 @@ POOL_STATUS NotificationResponse(POOL_CONNECTION *frontend,
cancel_packet.key= MASTER_CONNECTION(backend)->key; cancel_packet.key= MASTER_CONNECTION(backend)->key;
cancel_request(&cancel_packet); cancel_request(&cancel_packet);


free_parser();
return POOL_END; return POOL_END;
} }




TSTATE(backend) = 'I'; TSTATE(backend) = 'I';
} }
free_parser();
} }
else else
{ {
Expand Down Expand Up @@ -919,7 +957,8 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
parse_tree_list = raw_parser(stmt); parse_tree_list = raw_parser(stmt);
if (parse_tree_list == NIL) if (parse_tree_list == NIL)
{ {
free_parser(); /* free_parser(); */
;
} }
else else
{ {
Expand Down Expand Up @@ -953,6 +992,7 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
if (portal == NULL) if (portal == NULL)
{ {
pool_error("Parse: create_portal() failed"); pool_error("Parse: create_portal() failed");
free_parser();
return POOL_END; return POOL_END;
} }


Expand Down Expand Up @@ -985,6 +1025,37 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
/* switch old memory context */ /* switch old memory context */
pool_memory = old_context; pool_memory = old_context;


if (REPLICATION)
{
char *rewrite_query;
bool rewrite_to_params = true;

/*
* rewrite `now()'.
* if stmt is unnamed, we rewrite `now()' to timestamp constant.
* else we rewrite `now()' to params and expand that at Bind
* message.
*/
if (*name == '\0')
rewrite_to_params = false;
portal->num_tsparams = 0;
rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, portal);
if (rewrite_query != NULL)
{
string = palloc(strlen(name) + strlen(rewrite_query) + 2);
strcpy(string, name);
strcpy(string + strlen(name) + 1, rewrite_query);
memcpy(string + strlen(name) + strlen(rewrite_query) + 2,
stmt + strlen(stmt) + 1,
len - (strlen(name) + strlen(stmt) + 2));

len = len - strlen(stmt) + strlen(rewrite_query);
name = string;
stmt = string + strlen(name) + 1;
pool_debug("rewrite query %s %s len=%d", name, stmt, len);
}
}

if (REPLICATION) if (REPLICATION)
{ {
char kind; char kind;
Expand All @@ -1003,9 +1074,16 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,


kind = pool_read_kind(backend); kind = pool_read_kind(backend);
if (kind != 'Z') if (kind != 'Z')
{
free_parser();
return POOL_END; return POOL_END;
}

if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE) if (ReadyForQuery(frontend, backend, 0) != POOL_CONTINUE)
{
free_parser();
return POOL_END; return POOL_END;
}
} }


if (is_strict_query(node)) if (is_strict_query(node))
Expand All @@ -1017,17 +1095,22 @@ POOL_STATUS Parse(POOL_CONNECTION *frontend,
status = insert_lock(frontend, backend, stmt, (InsertStmt *)node); status = insert_lock(frontend, backend, stmt, (InsertStmt *)node);
if (status != POOL_CONTINUE) if (status != POOL_CONTINUE)
{ {
free_parser();
return status; return status;
} }
} }
} }
free_parser();
} }


/* send to master node */ /* send to master node */
if (send_extended_protocol_message(backend, MASTER_NODE_ID, if (send_extended_protocol_message(backend, MASTER_NODE_ID,
"P", len, string)) "P", len, string))
{
free_parser();
return POOL_END; return POOL_END;
}

free_parser();


if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE) if (REPLICATION || PARALLEL_MODE || MASTER_SLAVE)
{ {
Expand Down

0 comments on commit e913eb4

Please sign in to comment.