Skip to content

Commit

Permalink
. fuseif.h, fuseif.cpp: probably found the cause of 'EchoMany lost re…
Browse files Browse the repository at this point in the history
…sp' issue.

Actually the response of EchoMany did not lose, and instead, github does not log
the hanging program, which is seemingly like the program hangs at the first
request EchoMany waiting for its response. But actually the last request
EchoStream request was also completed. And most likely this time, the program
hangs endlessly at the call to 'select' on the stmfile because of a race
condition between fs_poll and OnStmRecv. That is, There is a slim chance when
the fs_poll checked to found no data, and then preempted right before it leave
the notify handle, the OnStmRecv arrived and found there is no notify handle,
and left, and when the fs_poll got back the control and set the pollhandle, the
pollhandle will never be notified, because the OnStmRecv has gone. Now I have
added to stmfile a FillAndNotify to address this issue.
  • Loading branch information
zhiming99 committed May 31, 2023
1 parent e0ced4f commit 26744ac
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 110 deletions.
121 changes: 23 additions & 98 deletions fuse/fuseif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1882,74 +1882,6 @@ gint32 CFuseStmFile::fs_unlink(
return ret;
}

gint32 CFuseStmFile::StartNextRead( BufPtr& pBuf )
{
gint32 ret = 0;
do{
InterfPtr pIf = GetIf();
CRpcServices* pSvc = pIf;
CStreamServerSync* pStmSvr = pIf;
CStreamProxySync* pStmProxy = pIf;
guint32 dwPReq = 0;
HANDLE hStream = GetStream();
if( pStmSvr != nullptr )
{
ret = pStmSvr->GetPendingReqs(
hStream, dwPReq );
}
else
{
ret = pStmProxy->GetPendingReqs(
hStream, dwPReq );
}
if( ERROR( ret ) )
break;

if( dwPReq > 0 )
{
ret = STATUS_PENDING;
break;
}

if( pStmSvr != nullptr )
{
ret = pStmSvr->ReadStreamAsync(
hStream, pBuf,
( IConfigDb* )nullptr );
}
else
{
ret = pStmProxy->ReadStreamAsync(
hStream, pBuf,
( IConfigDb* )nullptr );
}

}while( 0 );

return ret;
}

gint32 CFuseStmFile::OnCompleteReadReq()
{
gint32 ret = 0;

do{
CFuseMutex oLock( GetLock() );

size_t dwAvail = GetBytesAvail();
if( dwAvail > 0 )
{
guint32 dwFlags = POLLIN;
SetRevents( GetRevents() | dwFlags );
NotifyPoll();
break;
}

}while( 0 );

return ret;
}

gint32 CFuseStmFile::OnReadStreamComplete(
HANDLE hStream,
gint32 iRet,
Expand Down Expand Up @@ -2038,6 +1970,25 @@ gint32 CFuseStmFile::FillIncomingQue(
return ret;
}

gint32 CFuseStmFile::FillAndNotify()
{
std::vector< INBUF > vecIncoming;
FillIncomingQue( vecIncoming );

CFuseMutex oLock( GetLock() );

auto endPos = m_queIncoming.end();
if( vecIncoming.size() )
m_queIncoming.insert( endPos,
vecIncoming.begin(),
vecIncoming.end());

if( m_queIncoming.size() )
NotifyPoll();

return 0;
}

gint32 CFuseStmFile::fs_read(
const char* path,
fuse_file_info *fi,
Expand All @@ -2052,22 +2003,15 @@ gint32 CFuseStmFile::fs_read(
if( size == 0 )
break;

std::vector< INBUF > vecIncoming;
FillIncomingQue( vecIncoming );

//Non-blocking only
CFuseMutex oLock( GetLock() );

auto endPos = m_queIncoming.end();
if( vecIncoming.size() )
m_queIncoming.insert( endPos,
vecIncoming.begin(),
vecIncoming.end());

// no-blocking read
size_t dwAvail = GetBytesAvail();
size_t dwBytesRead =
std::min( size, dwAvail );

if( dwBytesRead == 0 )
break;

FillBufVec( dwBytesRead,
m_queIncoming, vecBackup,
bufvec );
Expand Down Expand Up @@ -2345,32 +2289,13 @@ gint32 CFuseStmFile::fs_poll(
gint32 ret = 0;
do{
gint32 ret = 0;
guint32 dwSize = 0;

HANDLE hstm = GetStream();
ObjPtr pObj = GetIf();
CStreamProxyFuse* pProxy = pObj;
CStreamServerFuse* pSvr = pObj;
if( pSvr != nullptr )
{
ret = pSvr->GetPendingBytes(
hstm, dwSize );
}
else
{
ret = pProxy->GetPendingBytes(
hstm, dwSize );
}

CFuseMutex oLock( GetLock() );
bool bCanSend = !GetFlowCtrl();
SetPollHandle( ph );
if( m_queIncoming.size() > 0 )
*reventsp |= POLLIN;

if( SUCCEEDED( ret ) && dwSize > 0 )
*reventsp |= POLLIN;

if( bCanSend )
*reventsp |= POLLOUT;

Expand Down
15 changes: 3 additions & 12 deletions fuse/fuseif.h
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,6 @@ class CFuseStmFile : public CFuseFileEntry
std::vector<INBUF>& vecIncoming );
sem_t m_semFlowCtrl;

gint32 fs_read_blocking(
const char* path,
fuse_file_info *fi,
fuse_req_t req, fuse_bufvec*& bufvec,
off_t off, size_t size,
std::vector< BufPtr >& vecBackup,
fuseif_intr_data* d );

public:

typedef CFuseFileEntry super;
Expand Down Expand Up @@ -1001,6 +993,8 @@ class CFuseStmFile : public CFuseFileEntry
void SetStream( HANDLE hStream )
{ m_hStream = hStream; }

gint32 FillAndNotify();

gint32 fs_open(
const char *path,
fuse_file_info *fi ) override;
Expand All @@ -1011,9 +1005,6 @@ class CFuseStmFile : public CFuseFileEntry
BufPtr& pBuf,
IConfigDb* pCtx );

gint32 StartNextRead( BufPtr& pRetBuf );
gint32 OnCompleteReadReq();

gint32 OnWriteStreamComplete(
HANDLE hStream,
gint32 iRet,
Expand Down Expand Up @@ -1900,7 +1891,7 @@ class CFuseServicePoint :

auto pStmFile = static_cast
< CFuseStmFile* >( pObj );
pStmFile->NotifyPoll();
pStmFile->FillAndNotify();

}while( 0 );

Expand Down

0 comments on commit 26744ac

Please sign in to comment.