Skip to content

Commit

Permalink
Fixed race condition in multistream access to files fo CEPH
Browse files Browse the repository at this point in the history
This was leading to file corruption and was mainly due to the absence of pread/pwrite interfaces in the layer in front of ceph. A non atomic combination of seek + read/write was used, leading to the problem.
My thanks to RAL for identify the problem and making most of the debugging.
  • Loading branch information
Sebastien Ponce committed Nov 30, 2015
1 parent 2091cdd commit 96dde7b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
12 changes: 2 additions & 10 deletions src/XrdCeph/XrdCephOssFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ ssize_t XrdCephOssFile::Read(off_t offset, size_t blen) {
}

ssize_t XrdCephOssFile::Read(void *buff, off_t offset, size_t blen) {
off_t rc = ceph_posix_lseek(m_fd, offset, SEEK_SET);
if (offset == rc) {
return ceph_posix_read(m_fd, buff, blen);
}
return rc;
return ceph_posix_pread(m_fd, buff, blen, offset);
}

static void aioReadCallback(XrdSfsAio *aiop, size_t rc) {
Expand All @@ -84,11 +80,7 @@ int XrdCephOssFile::Fstat(struct stat *buff) {
}

ssize_t XrdCephOssFile::Write(const void *buff, off_t offset, size_t blen) {
off_t rc = ceph_posix_lseek(m_fd, offset, SEEK_SET);
if (offset == rc) {
return ceph_posix_write(m_fd, buff, blen);
}
return rc;
return ceph_posix_pwrite(m_fd, buff, blen, offset);
}

static void aioWriteCallback(XrdSfsAio *aiop, size_t rc) {
Expand Down
44 changes: 44 additions & 0 deletions src/XrdCeph/XrdCephPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,28 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
}
}

ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
CephFileRef &fr = it->second;
logwrapper((char*)"ceph_write: for fd %d, count=%d", fd, count);
if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) {
return -EBADF;
}
libradosstriper::RadosStriper *striper = getRadosStriper(fr);
if (0 == striper) {
return -EINVAL;
}
ceph::bufferlist bl;
bl.append((const char*)buf, count);
int rc = striper->write(fr.name, bl, count, offset);
if (rc) return rc;
return count;
} else {
return -EBADF;
}
}

static void ceph_aio_complete(rados_completion_t c, void *arg) {
AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
size_t rc = rados_aio_get_return_value(c);
Expand Down Expand Up @@ -618,6 +640,28 @@ ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
}
}

ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
CephFileRef &fr = it->second;
logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) {
return -EBADF;
}
libradosstriper::RadosStriper *striper = getRadosStriper(fr);
if (0 == striper) {
return -EINVAL;
}
ceph::bufferlist bl;
int rc = striper->read(fr.name, &bl, count, offset);
if (rc < 0) return rc;
bl.copy(0, rc, (char*)buf);
return rc;
} else {
return -EBADF;
}
}

ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
Expand Down
2 changes: 2 additions & 0 deletions src/XrdCeph/XrdCephPosix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ int ceph_posix_close(int fd);
off_t ceph_posix_lseek(int fd, off_t offset, int whence);
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence);
ssize_t ceph_posix_write(int fd, const void *buf, size_t count);
ssize_t ceph_posix_pwrite(int fd, const void *buf, size_t count, off64_t offset);
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb);
ssize_t ceph_posix_read(int fd, void *buf, size_t count);
ssize_t ceph_posix_pread(int fd, void *buf, size_t count, off64_t offset);
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb);
int ceph_posix_fstat(int fd, struct stat *buf);
int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf);
Expand Down

0 comments on commit 96dde7b

Please sign in to comment.