Skip to content

Commit

Permalink
Poll changes
Browse files Browse the repository at this point in the history
  • Loading branch information
yazun committed Oct 16, 2015
1 parent 11bc261 commit 15d8125
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 169 deletions.
20 changes: 10 additions & 10 deletions src/backend/pgxc/pool/execRemote.c
Expand Up @@ -141,7 +141,7 @@ typedef struct
* Buffer size does not affect performance significantly, just do not allow
* connection buffer grows infinitely
*/
#define COPY_BUFFER_SIZE 8192
#define COPY_BUFFER_SIZE 8192*8
#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024

#ifndef XCP
Expand Down Expand Up @@ -562,7 +562,7 @@ HandleCommandComplete(RemoteQueryState *combiner, char *msg_body, size_t len, PG
/* There is a consistency issue in the database with the replicated table */
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("Write to replicated table returned different results from the Datanodes")));
errmsg("Write to replicated table returned different results from the Datanodes: rowcount %u vs %u",rowcount,estate->es_processed)));
}
else
/* first result */
Expand Down Expand Up @@ -7584,7 +7584,7 @@ ExecRemoteQuery(RemoteQueryState *node)
step->read_only, PGXC_NODE_DATANODE))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not begin transaction on data node.")));
errmsg("Could not begin transaction on primary data node.")));

/* If explicit transaction is needed gxid is already sent */
if (!pgxc_start_command_on_connection(primaryconnection, node, snapshot))
Expand Down Expand Up @@ -7624,7 +7624,7 @@ ExecRemoteQuery(RemoteQueryState *node)
step->read_only, PGXC_NODE_DATANODE))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not begin transaction on data node.")));
errmsg("Could not begin transaction on data node: %u",connections[i]->nodeoid)));

/* If explicit transaction is needed gxid is already sent */
if (!pgxc_start_command_on_connection(connections[i], node, snapshot))
Expand All @@ -7633,7 +7633,7 @@ ExecRemoteQuery(RemoteQueryState *node)
pfree_pgxc_all_handles(pgxc_connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
errmsg("Failed to send command to data node: %u",connections[i]->nodeoid)));
}
connections[i]->combiner = combiner;
}
Expand Down Expand Up @@ -8581,31 +8581,31 @@ ExecFinishInitRemoteSubplan(RemoteSubplanState *node)
is_read_only, PGXC_NODE_DATANODE))
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not begin transaction on data node.")));
errmsg("Could not begin transaction on data node: %u",connection->nodeoid)));

if (pgxc_node_send_timestamp(connection, timestamp))
{
combiner->conn_count = 0;
pfree(combiner->connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command to data nodes")));
errmsg("Failed to send command to data nodes: %u",connection->nodeoid)));
}
if (snapshot && pgxc_node_send_snapshot(connection, snapshot))
{
combiner->conn_count = 0;
pfree(combiner->connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send snapshot to data nodes")));
errmsg("Failed to send snapshot to data nodes: %u",connection->nodeoid)));
}
if (pgxc_node_send_cmd_id(connection, estate->es_snapshot->curcid) < 0 )
{
combiner->conn_count = 0;
pfree(combiner->connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send command ID to data nodes")));
errmsg("Failed to send command ID to data nodes: %u",connection->nodeoid)));
}
pgxc_node_send_plan(connection, cursor, "Remote Subplan",
node->subplanstr, node->nParamRemote, paramtypes);
Expand All @@ -8615,7 +8615,7 @@ ExecFinishInitRemoteSubplan(RemoteSubplanState *node)
pfree(combiner->connections);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Failed to send subplan to data nodes")));
errmsg("Failed to send subplan to data node: %u",connection->nodeoid)));
}
}
}
Expand Down
122 changes: 85 additions & 37 deletions src/backend/pgxc/pool/pgxcnode.c
Expand Up @@ -22,7 +22,9 @@
*/

#include "postgres.h"
#include <sys/select.h>
//#include <sys/select.h>
#include <poll.h>

#include <sys/time.h>
#include <sys/types.h>
#include <sys/ioctl.h>
Expand Down Expand Up @@ -492,14 +494,23 @@ pgxc_node_receive(const int conn_count,
#define ERROR_OCCURED true
#define NO_ERROR_OCCURED false
int i,
res_select,
nfds = 0;
fd_set readfds;
bool is_msg_buffered;
sockets_to_poll,
poll_val;
bool is_msg_buffered;
long timeout_ms;

PGXCNodeHandle *conn;
struct pollfd *pool_fd;

pool_fd = (struct pollfd *) palloc(conn_count * sizeof(struct pollfd));

/* sockets to be polled index */
sockets_to_poll = 0;

FD_ZERO(&readfds);
//FD_ZERO(&readfds);

is_msg_buffered = false;

for (i = 0; i < conn_count; i++)
{
/* If connection has a buffered message */
Expand All @@ -514,83 +525,120 @@ pgxc_node_receive(const int conn_count,
{
/* If connection finished sending do not wait input from it */
if (connections[i]->state == DN_CONNECTION_STATE_IDLE || HAS_MESSAGE_BUFFERED(connections[i]))
{
pool_fd[i].fd = -1;
pool_fd[i].events = 0;
continue;
}

/* prepare select params */
if (connections[i]->sock > 0)
{
FD_SET(connections[i]->sock, &readfds);
nfds = connections[i]->sock;
}
else
{
/* flag as bad, it will be removed from the list */
connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
pool_fd[i].fd = connections[i]->sock;
pool_fd[i].events = POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND;
sockets_to_poll++;

//FD_SET(connections[i]->sock, &readfds);
//nfds = connections[i]->sock;
}
}

/*
* Return if we do not have connections to receive input
*/
if (nfds == 0)
if (sockets_to_poll == 0)
{
pfree(pool_fd);
if (is_msg_buffered)
{
return NO_ERROR_OCCURED;
}
return ERROR_OCCURED;
}

/* do conversion from the select behaviour */
if ( timeout == NULL )
{
timeout_ms = -1;
}
else
{
timeout_ms = (timeout->tv_sec * (uint64_t) 1000) + (timeout->tv_usec / 1000);
}
retry:
#ifdef XCP
CHECK_FOR_INTERRUPTS();
#endif
res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
if (res_select < 0)
// poll_val = select(nfds + 1, &readfds, NULL, NULL, timeout);
poll_val = poll(pool_fd, sockets_to_poll, timeout_ms);
if (poll_val < 0)
{
/* error - retry if EINTR or EAGAIN */
if (errno == EINTR || errno == EAGAIN)
/* error - retry if EINTR */
if (errno == EINTR )
{ errno = 0;
goto retry;

if (errno == EBADF)
{
elog(WARNING, "select() bad file descriptor set");
}
elog(WARNING, "select() error: %d", errno);

elog(WARNING, "poll() error: %d", errno);
pfree(pool_fd);
if (errno)
return ERROR_OCCURED;
return NO_ERROR_OCCURED;
}

if (res_select == 0)
if (poll_val == 0)
{
/* Handle timeout */
elog(DEBUG1, "timeout while waiting for response");
elog(DEBUG1, "timeout %d while waiting for any response from %d connections", timeout_ms,conn_count);
#ifdef XCP
for (i = 0; i < conn_count; i++)
connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
#endif
pfree(pool_fd);
return NO_ERROR_OCCURED;
}

/* read data */
for (i = 0; i < conn_count; i++)
{
PGXCNodeHandle *conn = connections[i];
if( pool_fd[i].fd == -1 ) continue;

conn = connections[i];

if (FD_ISSET(conn->sock, &readfds))
if ( pool_fd[i].fd == conn->sock )
{
int read_status = pgxc_node_read_data(conn, true);
if( pool_fd[i].revents & POLLIN )
{
int read_status = pgxc_node_read_data(conn, true);
if ( read_status == EOF || read_status < 0 )
{
/* Can not read - no more actions, just discard connection */
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
add_error_message(conn, "unexpected EOF on datanode connection.");
elog(WARNING, "unexpected EOF on datanode oid connection: %d", conn->nodeoid);
/* Should we read from the other connections before returning? */
pfree(pool_fd);
return ERROR_OCCURED;
}

if (read_status == EOF || read_status < 0)
}
else if (
(pool_fd[i].revents & POLLERR) ||
(pool_fd[i].revents & POLLHUP) ||
(pool_fd[i].revents & POLLNVAL)
)
{
/* Can not read - no more actions, just discard connection */
conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
add_error_message(conn, "unexpected EOF on datanode connection");
elog(WARNING, "unexpected EOF on datanode connection");
/* Should we read from the other connections before returning? */
return ERROR_OCCURED;
connections[i]->state = DN_CONNECTION_STATE_ERROR_FATAL;
add_error_message(conn, "unexpected network error on datanode connection");
elog(WARNING, "unexpected EOF on datanode oid connection: %d with event %d", conn->nodeoid,pool_fd[i].revents);
/* Should we check/read from the other connections before returning? */
pfree(pool_fd);
return ERROR_OCCURED;
}
}

}
pfree(pool_fd);
return NO_ERROR_OCCURED;
}

Expand Down Expand Up @@ -2725,8 +2773,8 @@ PGXCNodeSetParam(bool local, const char *name, const char *value)
}

/*
* Special case for
* RESET SESSION AUTHORIZATION
* Special case for
* RESET SESSION AUTHORIZATION
* SET SESSION AUTHORIZATION TO DEFAULT
*
* We must also forget any SET ROLE commands since RESET SESSION
Expand Down
7 changes: 6 additions & 1 deletion src/backend/pgxc/pool/poolcomm.c
Expand Up @@ -57,6 +57,7 @@ pool_listen(unsigned short port, const char *unixSocketName)
struct sockaddr_un unix_addr;
int maxconn;


#ifdef HAVE_UNIX_SOCKETS
if (Lock_AF_UNIX(port, unixSocketName) < 0)
return -1;
Expand All @@ -72,6 +73,8 @@ pool_listen(unsigned short port, const char *unixSocketName)
len = sizeof(unix_addr.sun_family) +
strlen(unix_addr.sun_path) + 1;



/* bind the name to the descriptor */
if (bind(fd, (struct sockaddr *) & unix_addr, len) < 0)
return -1;
Expand All @@ -89,6 +92,8 @@ pool_listen(unsigned short port, const char *unixSocketName)
if (listen(fd, maxconn) < 0)
return -1;



/* Arrange to unlink the socket file at exit */
on_proc_exit(StreamDoUnlink, 0);

Expand Down Expand Up @@ -595,7 +600,7 @@ pool_recvfds(PoolPort *port, int *fds, int count)
{
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("incomplete message from client")));
errmsg("incomplete message from client [size: %u errno %u]",r,errno)));
goto failure;
}

Expand Down

0 comments on commit 15d8125

Please sign in to comment.