Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of the module I/O filter API #6140

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/acl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1321,12 +1321,12 @@ int ACLSaveToFile(const char *filename) {
}

/* Write it. */
if (write(fd,acl,sdslen(acl)) != (ssize_t)sdslen(acl)) {
if (writeNoFilter(fd,acl,sdslen(acl)) != (ssize_t)sdslen(acl)) {
serverLog(LL_WARNING,"Writing ACL file for ACL SAVE: %s",
strerror(errno));
goto cleanup;
}
close(fd); fd = -1;
closeNoFilter(fd); fd = -1;

/* Let's replace the new file with the old one. */
if (rename(tmpfilename,filename) == -1) {
Expand All @@ -1338,7 +1338,7 @@ int ACLSaveToFile(const char *filename) {
retval = C_OK; /* If we reached this point, everything is fine. */

cleanup:
if (fd != -1) close(fd);
if (fd != -1) closeNoFilter(fd);
if (tmpfilename) unlink(tmpfilename);
sdsfree(tmpfilename);
sdsfree(acl);
Expand Down
40 changes: 20 additions & 20 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
nwritten = writeNoFilter(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
Expand Down Expand Up @@ -182,7 +182,7 @@ ssize_t aofRewriteBufferWrite(int fd) {
ssize_t nwritten;

if (block->used) {
nwritten = write(fd,block->buf,block->used);
nwritten = writeNoFilter(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
Expand Down Expand Up @@ -237,7 +237,7 @@ void stopAppendOnly(void) {
serverAssert(server.aof_state != AOF_OFF);
flushAppendOnlyFile(1);
redis_fsync(server.aof_fd);
close(server.aof_fd);
closeNoFilter(server.aof_fd);

server.aof_fd = -1;
server.aof_selected_db = -1;
Expand Down Expand Up @@ -276,7 +276,7 @@ int startAppendOnly(void) {
killAppendOnlyChild();
}
if (rewriteAppendOnlyFileBackground() == C_ERR) {
close(newfd);
closeNoFilter(newfd);
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return C_ERR;
}
Expand All @@ -300,7 +300,7 @@ ssize_t aofWrite(int fd, const char *buf, size_t len) {
ssize_t nwritten = 0, totwritten = 0;

while(len) {
nwritten = write(fd, buf, len);
nwritten = writeNoFilter(fd, buf, len);

if (nwritten < 0) {
if (errno == EINTR) {
Expand Down Expand Up @@ -1283,7 +1283,7 @@ ssize_t aofReadDiffFromParent(void) {
ssize_t nread, total = 0;

while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
readNoFilter(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
Expand Down Expand Up @@ -1428,7 +1428,7 @@ int rewriteAppendOnlyFile(char *filename) {
}

/* Ask the master to stop sending diffs. */
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (writeNoFilter(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
Expand Down Expand Up @@ -1483,10 +1483,10 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(privdata);
UNUSED(mask);

if (read(fd,&byte,1) == 1 && byte == '!') {
if (readNoFilter(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
if (writeNoFilter(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
/* If we can't send the ack, inform the user, but don't try again
* since in the other side the children will use a timeout if the
* kernel can't buffer our write, or, the children was
Expand Down Expand Up @@ -1529,19 +1529,19 @@ int aofCreatePipes(void) {
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
for (j = 0; j < 6; j++) if(fds[j] != -1) closeNoFilter(fds[j]);
return C_ERR;
}

void aofClosePipes(void) {
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
close(server.aof_pipe_write_data_to_child);
close(server.aof_pipe_read_data_from_parent);
close(server.aof_pipe_write_ack_to_parent);
close(server.aof_pipe_read_ack_from_child);
close(server.aof_pipe_write_ack_to_child);
close(server.aof_pipe_read_ack_from_parent);
closeNoFilter(server.aof_pipe_write_data_to_child);
closeNoFilter(server.aof_pipe_read_data_from_parent);
closeNoFilter(server.aof_pipe_write_ack_to_parent);
closeNoFilter(server.aof_pipe_read_ack_from_child);
closeNoFilter(server.aof_pipe_write_ack_to_child);
closeNoFilter(server.aof_pipe_read_ack_from_parent);
}

/* ----------------------------------------------------------------------------
Expand Down Expand Up @@ -1689,7 +1689,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
closeNoFilter(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
Expand Down Expand Up @@ -1746,8 +1746,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
closeNoFilter(newfd);
if (oldfd != -1) closeNoFilter(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
Expand All @@ -1756,7 +1756,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (server.aof_fd == -1) {
/* AOF disabled, we don't need to set the AOF file descriptor
* to this new file, so we can close it. */
close(newfd);
closeNoFilter(newfd);
} else {
/* AOF enabled, replace the old fd with the new one. */
oldfd = server.aof_fd;
Expand Down
2 changes: 1 addition & 1 deletion src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void *bioProcessBackgroundJobs(void *arg) {

/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
closeNoFilter((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
Expand Down
8 changes: 4 additions & 4 deletions src/childinfo.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ void closeChildInfoPipe(void) {
if (server.child_info_pipe[0] != -1 ||
server.child_info_pipe[1] != -1)
{
close(server.child_info_pipe[0]);
close(server.child_info_pipe[1]);
closeNoFilter(server.child_info_pipe[0]);
closeNoFilter(server.child_info_pipe[1]);
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
}
Expand All @@ -64,7 +64,7 @@ void sendChildInfo(int ptype) {
server.child_info_data.magic = CHILD_INFO_MAGIC;
server.child_info_data.process_type = ptype;
ssize_t wlen = sizeof(server.child_info_data);
if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) {
if (writeNoFilter(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) {
/* Nothing to do on error, this will be detected by the other side. */
}
}
Expand All @@ -73,7 +73,7 @@ void sendChildInfo(int ptype) {
void receiveChildInfo(void) {
if (server.child_info_pipe[0] == -1) return;
ssize_t wlen = sizeof(server.child_info_data);
if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen &&
if (readNoFilter(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen &&
server.child_info_data.magic == CHILD_INFO_MAGIC)
{
if (server.child_info_data.process_type == CHILD_INFO_TYPE_RDB) {
Expand Down
22 changes: 11 additions & 11 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ int clusterSaveConfig(int do_fsync) {
memset(ci+content_size,'\n',sb.st_size-content_size);
}
}
if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
if (writeNoFilter(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
if (do_fsync) {
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
fsync(fd);
Expand All @@ -346,12 +346,12 @@ int clusterSaveConfig(int do_fsync) {
if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {
/* ftruncate() failing is not a critical error. */
}
close(fd);
closeNoFilter(fd);
sdsfree(ci);
return 0;

err:
if (fd != -1) close(fd);
if (fd != -1) closeNoFilter(fd);
sdsfree(ci);
return -1;
}
Expand Down Expand Up @@ -400,7 +400,7 @@ int clusterLockConfig(char *filename) {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
closeNoFilter(fd);
return C_ERR;
}
/* Lock acquired: leak the 'fd' by not closing it, so that we'll retain the
Expand Down Expand Up @@ -607,7 +607,7 @@ void freeClusterLink(clusterLink *link) {
sdsfree(link->rcvbuf);
if (link->node)
link->node->link = NULL;
close(link->fd);
closeWithFilters(link->fd);
zfree(link);
}

Expand Down Expand Up @@ -2123,7 +2123,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);

nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
nwritten = writeWithFilters(fd, link->sndbuf, sdslen(link->sndbuf));
if (nwritten <= 0) {
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
(nwritten == -1) ? strerror(errno) : "short write");
Expand Down Expand Up @@ -2173,7 +2173,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}

nread = read(fd,buf,readlen);
nread = readWithFilters(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */

if (nread <= 0) {
Expand Down Expand Up @@ -4983,7 +4983,7 @@ migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long ti
/* Too many items, drop one at random. */
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
close(cs->fd);
closeWithFilters(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
Expand All @@ -5004,7 +5004,7 @@ migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long ti
sdsfree(name);
addReplySds(c,
sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd);
closeWithFilters(fd);
return NULL;
}

Expand All @@ -5031,7 +5031,7 @@ void migrateCloseSocket(robj *host, robj *port) {
return;
}

close(cs->fd);
closeWithFilters(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,name);
sdsfree(name);
Expand All @@ -5045,7 +5045,7 @@ void migrateCloseTimedoutSockets(void) {
migrateCachedSocket *cs = dictGetVal(de);

if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
close(cs->fd);
closeWithFilters(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
Expand Down
6 changes: 3 additions & 3 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2175,7 +2175,7 @@ int rewriteConfigOverwriteFile(char *configfile, sds content) {
* exist), get the size. */
if (fd == -1) return -1; /* errno set by open(). */
if (fstat(fd,&sb) == -1) {
close(fd);
closeNoFilter(fd);
return -1; /* errno set by fstat(). */
}

Expand All @@ -2191,7 +2191,7 @@ int rewriteConfigOverwriteFile(char *configfile, sds content) {
}

/* 3) Write the new content using a single write(2). */
if (write(fd,content_padded,strlen(content_padded)) == -1) {
if (writeNoFilter(fd,content_padded,strlen(content_padded)) == -1) {
retval = -1;
goto cleanup;
}
Expand All @@ -2205,7 +2205,7 @@ int rewriteConfigOverwriteFile(char *configfile, sds content) {

cleanup:
sdsfree(content_padded);
close(fd);
closeNoFilter(fd);
return retval;
}

Expand Down
14 changes: 7 additions & 7 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ int openDirectLogFiledes(void) {
/* Used to close what closeDirectLogFiledes() returns. */
void closeDirectLogFiledes(int fd) {
int log_to_stdout = server.logfile[0] == '\0';
if (!log_to_stdout) close(fd);
if (!log_to_stdout) closeNoFilter(fd);
}

/* Logs the stack trace using the backtrace() call. This function is designed
Expand All @@ -1150,10 +1150,10 @@ void logStackTrace(ucontext_t *uc) {
if (getMcontextEip(uc) != NULL) {
char *msg1 = "EIP:\n";
char *msg2 = "\nBacktrace:\n";
if (write(fd,msg1,strlen(msg1)) == -1) {/* Avoid warning. */};
if (writeNoFilter(fd,msg1,strlen(msg1)) == -1) {/* Avoid warning. */};
trace[0] = getMcontextEip(uc);
backtrace_symbols_fd(trace, 1, fd);
if (write(fd,msg2,strlen(msg2)) == -1) {/* Avoid warning. */};
if (writeNoFilter(fd,msg2,strlen(msg2)) == -1) {/* Avoid warning. */};
}

/* Write symbols to log file */
Expand Down Expand Up @@ -1248,17 +1248,17 @@ int memtest_test_linux_anonymous_maps(void) {
"*** Preparing to test memory region %lx (%lu bytes)\n",
(unsigned long) start_vect[regions],
(unsigned long) size_vect[regions]);
if (write(fd,logbuf,strlen(logbuf)) == -1) { /* Nothing to do. */ }
if (writeNoFilter(fd,logbuf,strlen(logbuf)) == -1) { /* Nothing to do. */ }
regions++;
}

int errors = 0;
for (j = 0; j < regions; j++) {
if (write(fd,".",1) == -1) { /* Nothing to do. */ }
if (writeNoFilter(fd,".",1) == -1) { /* Nothing to do. */ }
errors += memtest_preserving_test((void*)start_vect[j],size_vect[j],1);
if (write(fd, errors ? "E" : "O",1) == -1) { /* Nothing to do. */ }
if (writeNoFilter(fd, errors ? "E" : "O",1) == -1) { /* Nothing to do. */ }
}
if (write(fd,"\n",1) == -1) { /* Nothing to do. */ }
if (writeNoFilter(fd,"\n",1) == -1) { /* Nothing to do. */ }

/* NOTE: It is very important to close the file descriptor only now
* because closing it before may result into unmapping of some memory
Expand Down