Skip to content

Commit

Permalink
Merge pull request #200 from osschar/mt_sync_master
Browse files Browse the repository at this point in the history
[pfc] Use fsync for cache consistency
  • Loading branch information
abh3 committed Feb 23, 2015
2 parents f931dd4 + d9b68b8 commit a9dea40
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 54 deletions.
27 changes: 15 additions & 12 deletions src/XrdFileCache/XrdFileCacheInfo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ using namespace XrdFileCache;
Info::Info() :
m_version(0),
m_bufferSize(0),
m_sizeInBits(0), m_buff(0),
m_sizeInBits(0), m_buff_fetched(0), m_buff_write_called(0),
m_accessCnt(0),
m_complete(false)
{
Expand All @@ -51,7 +51,8 @@ Info::Info() :

Info::~Info()
{
if (m_buff) free(m_buff);
if (m_buff_fetched) free(m_buff_fetched);
if (m_buff_write_called) free(m_buff_write_called);
}

//______________________________________________________________________________
Expand All @@ -60,8 +61,10 @@ Info::~Info()
void Info::ResizeBits(int s)
{
m_sizeInBits = s;
m_buff = (unsigned char*)malloc(GetSizeInBytes());
memset(m_buff, 0, GetSizeInBytes());
m_buff_fetched = (unsigned char*)malloc(GetSizeInBytes());
m_buff_write_called = (unsigned char*)malloc(GetSizeInBytes());
memset(m_buff_fetched, 0, GetSizeInBytes());
memset(m_buff_write_called, 0, GetSizeInBytes());
}

//______________________________________________________________________________
Expand All @@ -81,7 +84,8 @@ int Info::Read(XrdOssDF* fp)
off += fp->Read(&sb, off, sizeof(int));
ResizeBits(sb);

off += fp->Read(m_buff, off, GetSizeInBytes());
off += fp->Read(m_buff_fetched, off, GetSizeInBytes());
off += fp->Read(m_buff_write_called, off, GetSizeInBytes());
m_complete = IsAnythingEmptyInRng(0, sb-1) ? false : true;

assert (off = GetHeaderSize());
Expand Down Expand Up @@ -112,7 +116,7 @@ void Info::WriteHeader(XrdOssDF* fp)

int nb = GetSizeInBits();
off += fp->Write(&nb, off, sizeof(int));
off += fp->Write(m_buff, off, GetSizeInBytes());
off += fp->Write(m_buff_write_called, off, GetSizeInBytes());

flr = XrdOucSxeq::Release(fp->getFD());
if (flr) clLog()->Error(XrdCl::AppMsg, "WriteHeader() un-lock failed \n");
Expand All @@ -125,7 +129,6 @@ void Info::AppendIOStat(const Stats* caches, XrdOssDF* fp)
{
clLog()->Info(XrdCl::AppMsg, "Info:::AppendIOStat()");


int flr = XrdOucSxeq::Serialize(fp->getFD(), 0);
if (flr) clLog()->Error(XrdCl::AppMsg, "AppendIOStat() lock failed \n");

Expand All @@ -141,7 +144,7 @@ void Info::AppendIOStat(const Stats* caches, XrdOssDF* fp)
as.BytesMissed = caches->m_BytesMissed;

flr = XrdOucSxeq::Release(fp->getFD());
if (flr) clLog()->Error(XrdCl::AppMsg, "AppendStat() un-lock failed \n");
if (flr) clLog()->Error(XrdCl::AppMsg, "AppenIOStat() un-lock failed \n");

long long ws = fp->Write(&as, off, sizeof(AStat));
if ( ws != sizeof(AStat)) { assert(0); }
Expand All @@ -156,10 +159,10 @@ bool Info::GetLatestDetachTime(time_t& t, XrdOssDF* fp) const
if (flr) clLog()->Error(XrdCl::AppMsg, "Info::GetLatestAttachTime() lock failed \n");
if (m_accessCnt)
{
AStat stat;
long long off = GetHeaderSize() + sizeof(int) + (m_accessCnt-1)*sizeof(AStat);
res = fp->Read(&stat, off, sizeof(AStat));
if (res == sizeof(AStat))
AStat stat;
long long off = GetHeaderSize() + sizeof(int) + (m_accessCnt-1)*sizeof(AStat);
ssize_t read_res = fp->Read(&stat, off, sizeof(AStat));
if (read_res == sizeof(AStat))
{
t = stat.DetachTime;
res = true;
Expand Down
21 changes: 16 additions & 5 deletions src/XrdFileCache/XrdFileCacheInfo.hh
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ namespace XrdFileCache
//!
//! @param i block index
//---------------------------------------------------------------------
void SetBit(int i);
void SetBitWriteCalled(int i);
void SetBitFetched(int i);

//---------------------------------------------------------------------
//! \brief Reserve buffer for fileSize/bufferSize bytes
Expand Down Expand Up @@ -155,7 +156,8 @@ namespace XrdFileCache
int m_version; //!< info version
long long m_bufferSize; //!< prefetch buffer size
int m_sizeInBits; //!< number of file blocks
unsigned char *m_buff; //!< download state vector
unsigned char *m_buff_fetched; //!< download state vector
unsigned char *m_buff_write_called; //!< disk written state vector
int m_accessCnt; //!< number of written AStat structs
bool m_complete; //!< cached
};
Expand All @@ -166,7 +168,7 @@ namespace XrdFileCache
assert(cn < GetSizeInBytes());

int off = i - cn*8;
return (m_buff[cn] & cfiBIT(off)) == cfiBIT(off);
return (m_buff_fetched[cn] & cfiBIT(off)) == cfiBIT(off);
}

inline int Info::GetSizeInBytes() const
Expand Down Expand Up @@ -197,13 +199,22 @@ namespace XrdFileCache
m_complete = !IsAnythingEmptyInRng(0, m_sizeInBits-1);
}

inline void Info::SetBit(int i)
inline void Info::SetBitWriteCalled(int i)
{
int cn = i/8;
assert(cn < GetSizeInBytes());

int off = i - cn*8;
m_buff[cn] |= cfiBIT(off);
m_buff_write_called[cn] |= cfiBIT(off);
}

inline void Info::SetBitFetched(int i)
{
int cn = i/8;
assert(cn < GetSizeInBytes());

int off = i - cn*8;
m_buff_fetched[cn] |= cfiBIT(off);
}

inline long long Info::GetBufferSize() const
Expand Down

0 comments on commit a9dea40

Please sign in to comment.