Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another set of fixes #212

Merged
merged 3 commits into from
Mar 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions src/XrdCeph/XrdCephOssFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ ssize_t XrdCephOssFile::Read(void *buff, off_t offset, size_t blen) {
return rc;
}

int XrdCephOssFile::Read(XrdSfsAio *aoip) {
return -ENOTSUP;
static void aioReadCallback(XrdSfsAio *aiop, size_t rc) {
aiop->Result = rc;
aiop->doneRead();
}

int XrdCephOssFile::Read(XrdSfsAio *aiop) {
return ceph_aio_read(m_fd, aiop, aioReadCallback);
}

ssize_t XrdCephOssFile::ReadRaw(void *buff, off_t offset, size_t blen) {
Expand All @@ -86,17 +91,13 @@ ssize_t XrdCephOssFile::Write(const void *buff, off_t offset, size_t blen) {
return rc;
}

static void aioWriteCallback(XrdSfsAio *aiop, size_t rc) {
aiop->Result = rc;
aiop->doneWrite();
}

int XrdCephOssFile::Write(XrdSfsAio *aiop) {
ssize_t rc = Write((void*)aiop->sfsAio.aio_buf,
aiop->sfsAio.aio_offset,
aiop->sfsAio.aio_nbytes);
if (aiop->sfsAio.aio_nbytes == (size_t)rc) {
aiop->Result = rc;
aiop->doneWrite();
return 0;
} else {
return rc;
}
return ceph_aio_write(m_fd, aiop, aioWriteCallback);
}

int XrdCephOssFile::Fsync() {
Expand Down
84 changes: 83 additions & 1 deletion src/XrdCeph/XrdCephPosix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <sys/xattr.h>
#include <time.h>
#include <limits>
#include "XrdSfs/XrdSfsAio.hh"

#include "XrdCeph/XrdCephPosix.hh"

Expand All @@ -66,6 +67,14 @@ struct DirIterator {
librados::IoCtx *m_ioctx;
};

/// small struct for aio API callbacks
struct AioArgs {
AioArgs(XrdSfsAio* a, AioCB *b, size_t n) : aiop(a), callback(b), nbBytes(n) {}
XrdSfsAio* aiop;
AioCB *callback;
size_t nbBytes;
};

/// global variables holding stripers and ioCtxs for each ceph pool plus the cluster object
std::map<std::string, libradosstriper::RadosStriper*> g_radosStripers;
std::map<std::string, librados::IoCtx*> g_ioCtx;
Expand Down Expand Up @@ -464,7 +473,8 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode
// in case of O_TRUNC, we should truncate the file
if (flags & O_TRUNC) {
int rc = ceph_posix_internal_truncate(fr, 0);
if (rc < 0) return rc;
// fail only if file exists and cannot be truncated
if (rc < 0 && rc != -ENOENT) return rc;
}
return g_nextCephFd-1;
}
Expand Down Expand Up @@ -542,6 +552,45 @@ ssize_t ceph_posix_write(int fd, const void *buf, size_t count) {
}
}

static void ceph_aio_complete(rados_completion_t c, void *arg) {
AioArgs *awa = reinterpret_cast<AioArgs*>(arg);
size_t rc = rados_aio_get_return_value(c);
awa->callback(awa->aiop, rc == 0 ? awa->nbBytes : rc);
delete(awa);
}

ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
// get the parameters from the Xroot aio object
size_t count = aiop->sfsAio.aio_nbytes;
const char *buf = (const char*)aiop->sfsAio.aio_buf;
size_t offset = aiop->sfsAio.aio_offset;
// get the striper object
CephFileRef &fr = it->second;
logwrapper((char*)"ceph_aio_write: for fd %d, count=%d", fd, count);
if ((fr.flags & (O_WRONLY|O_RDWR)) == 0) {
return -EBADF;
}
libradosstriper::RadosStriper *striper = getRadosStriper(fr);
if (0 == striper) {
return -EINVAL;
}
// prepare a bufferlist around the given buffer
ceph::bufferlist bl;
bl.append(buf, count);
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_complete, NULL);
int rc = striper->aio_write(fr.name, completion, bl, count, offset);
completion->release();
return rc;
} else {
return -EBADF;
}
}

ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
Expand All @@ -565,6 +614,39 @@ ssize_t ceph_posix_read(int fd, void *buf, size_t count) {
}
}

ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
// get the parameters from the Xroot aio object
size_t count = aiop->sfsAio.aio_nbytes;
const char *buf = (const char*)aiop->sfsAio.aio_buf;
size_t offset = aiop->sfsAio.aio_offset;
// get the striper object
CephFileRef &fr = it->second;
logwrapper((char*)"ceph_read: for fd %d, count=%d", fd, count);
if ((fr.flags & (O_WRONLY|O_RDWR)) != 0) {
return -EBADF;
}
libradosstriper::RadosStriper *striper = getRadosStriper(fr);
if (0 == striper) {
return -EINVAL;
}
// prepare a bufferlist to receive data
ceph::bufferlist bl;
// prepare a ceph AioCompletion object and do async call
AioArgs *args = new AioArgs(aiop, cb, count);
librados::AioCompletion *completion =
g_cluster->aio_create_completion(args, ceph_aio_complete, NULL);
int rc = striper->aio_read(fr.name, completion, &bl, count, offset);
completion->release();
if (rc < 0) return rc;
bl.copy(0, rc, (char*)buf);
return rc;
} else {
return -EBADF;
}
}

int ceph_posix_fstat(int fd, struct stat *buf) {
std::map<unsigned int, CephFileRef>::iterator it = g_fds.find(fd);
if (it != g_fds.end()) {
Expand Down
5 changes: 5 additions & 0 deletions src/XrdCeph/XrdCephPosix.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
#include <XrdOuc/XrdOucEnv.hh>
#include <XrdSys/XrdSysXAttr.hh>

class XrdSfsAio;
typedef void(AioCB)(XrdSfsAio*, size_t);

void ceph_posix_set_defaults(const char* value);
void ceph_posix_disconnect_all();
void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp));
Expand All @@ -43,7 +46,9 @@ int ceph_posix_close(int fd);
off_t ceph_posix_lseek(int fd, off_t offset, int whence);
off64_t ceph_posix_lseek64(int fd, off64_t offset, int whence);
ssize_t ceph_posix_write(int fd, const void *buf, size_t count);
ssize_t ceph_aio_write(int fd, XrdSfsAio *aiop, AioCB *cb);
ssize_t ceph_posix_read(int fd, void *buf, size_t count);
ssize_t ceph_aio_read(int fd, XrdSfsAio *aiop, AioCB *cb);
int ceph_posix_fstat(int fd, struct stat *buf);
int ceph_posix_stat(XrdOucEnv* env, const char *pathname, struct stat *buf);
int ceph_posix_fsync(int fd);
Expand Down