diff --git a/fuse/fuseif.cpp b/fuse/fuseif.cpp index 9968cac32..2f4295e03 100644 --- a/fuse/fuseif.cpp +++ b/fuse/fuseif.cpp @@ -1882,74 +1882,6 @@ gint32 CFuseStmFile::fs_unlink( return ret; } -gint32 CFuseStmFile::StartNextRead( BufPtr& pBuf ) -{ - gint32 ret = 0; - do{ - InterfPtr pIf = GetIf(); - CRpcServices* pSvc = pIf; - CStreamServerSync* pStmSvr = pIf; - CStreamProxySync* pStmProxy = pIf; - guint32 dwPReq = 0; - HANDLE hStream = GetStream(); - if( pStmSvr != nullptr ) - { - ret = pStmSvr->GetPendingReqs( - hStream, dwPReq ); - } - else - { - ret = pStmProxy->GetPendingReqs( - hStream, dwPReq ); - } - if( ERROR( ret ) ) - break; - - if( dwPReq > 0 ) - { - ret = STATUS_PENDING; - break; - } - - if( pStmSvr != nullptr ) - { - ret = pStmSvr->ReadStreamAsync( - hStream, pBuf, - ( IConfigDb* )nullptr ); - } - else - { - ret = pStmProxy->ReadStreamAsync( - hStream, pBuf, - ( IConfigDb* )nullptr ); - } - - }while( 0 ); - - return ret; -} - -gint32 CFuseStmFile::OnCompleteReadReq() -{ - gint32 ret = 0; - - do{ - CFuseMutex oLock( GetLock() ); - - size_t dwAvail = GetBytesAvail(); - if( dwAvail > 0 ) - { - guint32 dwFlags = POLLIN; - SetRevents( GetRevents() | dwFlags ); - NotifyPoll(); - break; - } - - }while( 0 ); - - return ret; -} - gint32 CFuseStmFile::OnReadStreamComplete( HANDLE hStream, gint32 iRet, @@ -2038,6 +1970,25 @@ gint32 CFuseStmFile::FillIncomingQue( return ret; } +gint32 CFuseStmFile::FillAndNotify() +{ + std::vector< INBUF > vecIncoming; + FillIncomingQue( vecIncoming ); + + CFuseMutex oLock( GetLock() ); + + auto endPos = m_queIncoming.end(); + if( vecIncoming.size() ) + m_queIncoming.insert( endPos, + vecIncoming.begin(), + vecIncoming.end()); + + if( m_queIncoming.size() ) + NotifyPoll(); + + return 0; +} + gint32 CFuseStmFile::fs_read( const char* path, fuse_file_info *fi, @@ -2052,22 +2003,15 @@ gint32 CFuseStmFile::fs_read( if( size == 0 ) break; - std::vector< INBUF > vecIncoming; - FillIncomingQue( vecIncoming ); - + //Non-blocking only CFuseMutex oLock( GetLock() ); - - auto endPos = m_queIncoming.end(); - if( vecIncoming.size() ) - m_queIncoming.insert( endPos, - vecIncoming.begin(), - vecIncoming.end()); - - // no-blocking read size_t dwAvail = GetBytesAvail(); size_t dwBytesRead = std::min( size, dwAvail ); + if( dwBytesRead == 0 ) + break; + FillBufVec( dwBytesRead, m_queIncoming, vecBackup, bufvec ); @@ -2345,22 +2289,6 @@ gint32 CFuseStmFile::fs_poll( gint32 ret = 0; do{ gint32 ret = 0; - guint32 dwSize = 0; - - HANDLE hstm = GetStream(); - ObjPtr pObj = GetIf(); - CStreamProxyFuse* pProxy = pObj; - CStreamServerFuse* pSvr = pObj; - if( pSvr != nullptr ) - { - ret = pSvr->GetPendingBytes( - hstm, dwSize ); - } - else - { - ret = pProxy->GetPendingBytes( - hstm, dwSize ); - } CFuseMutex oLock( GetLock() ); bool bCanSend = !GetFlowCtrl(); @@ -2368,9 +2296,6 @@ gint32 CFuseStmFile::fs_poll( if( m_queIncoming.size() > 0 ) *reventsp |= POLLIN; - if( SUCCEEDED( ret ) && dwSize > 0 ) - *reventsp |= POLLIN; - if( bCanSend ) *reventsp |= POLLOUT; diff --git a/fuse/fuseif.h b/fuse/fuseif.h index 30fc48aa6..b7d64c32d 100644 --- a/fuse/fuseif.h +++ b/fuse/fuseif.h @@ -962,14 +962,6 @@ class CFuseStmFile : public CFuseFileEntry std::vector& vecIncoming ); sem_t m_semFlowCtrl; - gint32 fs_read_blocking( - const char* path, - fuse_file_info *fi, - fuse_req_t req, fuse_bufvec*& bufvec, - off_t off, size_t size, - std::vector< BufPtr >& vecBackup, - fuseif_intr_data* d ); - public: typedef CFuseFileEntry super; @@ -1001,6 +993,8 @@ class CFuseStmFile : public CFuseFileEntry void SetStream( HANDLE hStream ) { m_hStream = hStream; } + gint32 FillAndNotify(); + gint32 fs_open( const char *path, fuse_file_info *fi ) override; @@ -1011,9 +1005,6 @@ class CFuseStmFile : public CFuseFileEntry BufPtr& pBuf, IConfigDb* pCtx ); - gint32 StartNextRead( BufPtr& pRetBuf ); - gint32 OnCompleteReadReq(); - gint32 OnWriteStreamComplete( HANDLE hStream, gint32 iRet, @@ -1900,7 +1891,7 @@ class CFuseServicePoint : auto pStmFile = static_cast < CFuseStmFile* >( pObj ); - pStmFile->NotifyPoll(); + pStmFile->FillAndNotify(); }while( 0 );