|
@@ -123,218 +123,219 @@ class QgsConnectionPoolGroup |
|
|
{ |
|
|
qgsConnectionPool_ConnectionDestroy( i.c ); |
|
|
qgsConnectionPool_ConnectionCreate( connInfo, i.c ); |
|
|
} |
|
|
|
|
|
|
|
|
// no need to run if nothing can expire |
|
|
if ( conns.isEmpty() ) |
|
|
{ |
|
|
// will call the slot directly or queue the call (if the object lives in a different thread) |
|
|
QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" ); |
|
|
} |
|
|
|
|
|
acquiredConns.append( i.c ); |
|
|
|
|
|
return i.c; |
|
|
// no need to run if nothing can expire |
|
|
if ( conns.isEmpty() ) |
|
|
{ |
|
|
// will call the slot directly or queue the call (if the object lives in a different thread) |
|
|
QMetaObject::invokeMethod( expirationTimer->parent(), "stopExpirationTimer" ); |
|
|
} |
|
|
} |
|
|
|
|
|
T c; |
|
|
qgsConnectionPool_ConnectionCreate( connInfo, c ); |
|
|
if ( !c ) |
|
|
{ |
|
|
// we didn't get connection for some reason, so release the lock |
|
|
sem.release(); |
|
|
return nullptr; |
|
|
} |
|
|
acquiredConns.append( i.c ); |
|
|
|
|
|
connMutex.lock(); |
|
|
acquiredConns.append( c ); |
|
|
connMutex.unlock(); |
|
|
return c; |
|
|
return i.c; |
|
|
} |
|
|
} |
|
|
|
|
|
void release( T conn ) |
|
|
T c; |
|
|
qgsConnectionPool_ConnectionCreate( connInfo, c ); |
|
|
if ( !c ) |
|
|
{ |
|
|
connMutex.lock(); |
|
|
acquiredConns.removeAll( conn ); |
|
|
if ( !qgsConnectionPool_ConnectionIsValid( conn ) ) |
|
|
{ |
|
|
qgsConnectionPool_ConnectionDestroy( conn ); |
|
|
} |
|
|
else |
|
|
{ |
|
|
Item i; |
|
|
i.c = conn; |
|
|
i.lastUsedTime = QTime::currentTime(); |
|
|
conns.push( i ); |
|
|
|
|
|
if ( !expirationTimer->isActive() ) |
|
|
{ |
|
|
// will call the slot directly or queue the call (if the object lives in a different thread) |
|
|
QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" ); |
|
|
} |
|
|
} |
|
|
// we didn't get connection for some reason, so release the lock |
|
|
sem.release(); |
|
|
return nullptr; |
|
|
} |
|
|
|
|
|
connMutex.unlock(); |
|
|
connMutex.lock(); |
|
|
acquiredConns.append( c ); |
|
|
connMutex.unlock(); |
|
|
return c; |
|
|
} |
|
|
|
|
|
sem.release(); // this can unlock a thread waiting in acquire() |
|
|
void release( T conn ) |
|
|
{ |
|
|
connMutex.lock(); |
|
|
acquiredConns.removeAll( conn ); |
|
|
if ( !qgsConnectionPool_ConnectionIsValid( conn ) ) |
|
|
{ |
|
|
qgsConnectionPool_ConnectionDestroy( conn ); |
|
|
} |
|
|
|
|
|
void invalidateConnections() |
|
|
else |
|
|
{ |
|
|
connMutex.lock(); |
|
|
for ( const Item &i : qgis::as_const( conns ) ) |
|
|
Item i; |
|
|
i.c = conn; |
|
|
i.lastUsedTime = QTime::currentTime(); |
|
|
conns.push( i ); |
|
|
|
|
|
if ( !expirationTimer->isActive() ) |
|
|
{ |
|
|
qgsConnectionPool_ConnectionDestroy( i.c ); |
|
|
// will call the slot directly or queue the call (if the object lives in a different thread) |
|
|
QMetaObject::invokeMethod( expirationTimer->parent(), "startExpirationTimer" ); |
|
|
} |
|
|
conns.clear(); |
|
|
for ( T c : qgis::as_const( acquiredConns ) ) |
|
|
qgsConnectionPool_InvalidateConnection( c ); |
|
|
connMutex.unlock(); |
|
|
} |
|
|
|
|
|
protected: |
|
|
connMutex.unlock(); |
|
|
|
|
|
void initTimer( QObject * parent ) |
|
|
sem.release(); // this can unlock a thread waiting in acquire() |
|
|
} |
|
|
|
|
|
void invalidateConnections() |
|
|
{ |
|
|
connMutex.lock(); |
|
|
for ( const Item &i : qgis::as_const( conns ) ) |
|
|
{ |
|
|
expirationTimer = new QTimer( parent ); |
|
|
expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 ); |
|
|
QObject::connect( expirationTimer, SIGNAL( timeout() ), parent, SLOT( handleConnectionExpired() ) ); |
|
|
qgsConnectionPool_ConnectionDestroy( i.c ); |
|
|
} |
|
|
conns.clear(); |
|
|
for ( T c : qgis::as_const( acquiredConns ) ) |
|
|
qgsConnectionPool_InvalidateConnection( c ); |
|
|
connMutex.unlock(); |
|
|
} |
|
|
|
|
|
protected: |
|
|
|
|
|
void initTimer( QObject *parent ) |
|
|
{ |
|
|
expirationTimer = new QTimer( parent ); |
|
|
expirationTimer->setInterval( CONN_POOL_EXPIRATION_TIME * 1000 ); |
|
|
QObject::connect( expirationTimer, SIGNAL( timeout() ), parent, SLOT( handleConnectionExpired() ) ); |
|
|
|
|
|
// just to make sure the object belongs to main thread and thus will get events |
|
|
if ( qApp ) |
|
|
parent->moveToThread( qApp->thread() ); |
|
|
// just to make sure the object belongs to main thread and thus will get events |
|
|
if ( qApp ) |
|
|
parent->moveToThread( qApp->thread() ); |
|
|
} |
|
|
|
|
|
void onConnectionExpired() |
|
|
{ |
|
|
connMutex.lock(); |
|
|
|
|
|
QTime now = QTime::currentTime(); |
|
|
|
|
|
// what connections have expired? |
|
|
QList<int> toDelete; |
|
|
for ( int i = 0; i < conns.count(); ++i ) |
|
|
{ |
|
|
if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME ) |
|
|
toDelete.append( i ); |
|
|
} |
|
|
|
|
|
void onConnectionExpired() |
|
|
// delete expired connections |
|
|
for ( int j = toDelete.count() - 1; j >= 0; --j ) |
|
|
{ |
|
|
connMutex.lock(); |
|
|
int index = toDelete[j]; |
|
|
qgsConnectionPool_ConnectionDestroy( conns[index].c ); |
|
|
conns.remove( index ); |
|
|
} |
|
|
|
|
|
QTime now = QTime::currentTime(); |
|
|
if ( conns.isEmpty() ) |
|
|
expirationTimer->stop(); |
|
|
|
|
|
// what connections have expired? |
|
|
QList<int> toDelete; |
|
|
for ( int i = 0; i < conns.count(); ++i ) |
|
|
{ |
|
|
if ( conns.at( i ).lastUsedTime.secsTo( now ) >= CONN_POOL_EXPIRATION_TIME ) |
|
|
toDelete.append( i ); |
|
|
} |
|
|
connMutex.unlock(); |
|
|
} |
|
|
|
|
|
// delete expired connections |
|
|
for ( int j = toDelete.count() - 1; j >= 0; --j ) |
|
|
{ |
|
|
int index = toDelete[j]; |
|
|
qgsConnectionPool_ConnectionDestroy( conns[index].c ); |
|
|
conns.remove( index ); |
|
|
} |
|
|
protected: |
|
|
|
|
|
if ( conns.isEmpty() ) |
|
|
expirationTimer->stop(); |
|
|
QString connInfo; |
|
|
QStack<Item> conns; |
|
|
QList<T> acquiredConns; |
|
|
QMutex connMutex; |
|
|
QSemaphore sem; |
|
|
QTimer *expirationTimer = nullptr; |
|
|
|
|
|
connMutex.unlock(); |
|
|
} |
|
|
}; |
|
|
|
|
|
protected: |
|
|
|
|
|
QString connInfo; |
|
|
QStack<Item> conns; |
|
|
QList<T> acquiredConns; |
|
|
QMutex connMutex; |
|
|
QSemaphore sem; |
|
|
QTimer *expirationTimer = nullptr; |
|
|
/** |
|
|
* \ingroup core |
|
|
* Template class responsible for keeping a pool of open connections. |
|
|
* This is desired to avoid the overhead of creation of new connection every time. |
|
|
* |
|
|
* The methods are thread safe. |
|
|
* |
|
|
* The connection pool has a limit on maximum number of concurrent connections |
|
|
* (per server), once the limit is reached, the acquireConnection() function |
|
|
* will block. All connections that have been acquired must be then released |
|
|
* with releaseConnection() function. |
|
|
* |
|
|
* When the connections are not used for some time, they will get closed automatically |
|
|
* to save resources. |
|
|
* \note not available in Python bindings |
|
|
*/ |
|
|
template <typename T, typename T_Group> |
|
|
class QgsConnectionPool |
|
|
{ |
|
|
public: |
|
|
|
|
|
}; |
|
|
typedef QMap<QString, T_Group *> T_Groups; |
|
|
|
|
|
virtual ~QgsConnectionPool() |
|
|
{ |
|
|
mMutex.lock(); |
|
|
for ( T_Group *group : qgis::as_const( mGroups ) ) |
|
|
{ |
|
|
delete group; |
|
|
} |
|
|
mGroups.clear(); |
|
|
mMutex.unlock(); |
|
|
} |
|
|
|
|
|
/** |
|
|
* \ingroup core |
|
|
* Template class responsible for keeping a pool of open connections. |
|
|
* This is desired to avoid the overhead of creation of new connection every time. |
|
|
* Try to acquire a connection for a maximum of \a timeout milliseconds. |
|
|
* If \a timeout is a negative value the calling thread will be blocked |
|
|
* until a connection becomes available. This is the default behavior. |
|
|
* |
|
|
* The methods are thread safe. |
|
|
* |
|
|
* The connection pool has a limit on maximum number of concurrent connections |
|
|
* (per server), once the limit is reached, the acquireConnection() function |
|
|
* will block. All connections that have been acquired must be then released |
|
|
* with releaseConnection() function. |
|
|
* |
|
|
* When the connections are not used for some time, they will get closed automatically |
|
|
* to save resources. |
|
|
* \note not available in Python bindings |
|
|
* \returns initialized connection or nullptr if unsuccessful |
|
|
*/ |
|
|
template <typename T, typename T_Group> |
|
|
class QgsConnectionPool |
|
|
T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false ) |
|
|
{ |
|
|
public: |
|
|
|
|
|
typedef QMap<QString, T_Group *> T_Groups; |
|
|
|
|
|
virtual ~QgsConnectionPool() |
|
|
{ |
|
|
mMutex.lock(); |
|
|
for ( T_Group *group : qgis::as_const( mGroups ) ) |
|
|
{ |
|
|
delete group; |
|
|
} |
|
|
mGroups.clear(); |
|
|
mMutex.unlock(); |
|
|
} |
|
|
|
|
|
/** |
|
|
* Try to acquire a connection for a maximum of \a timeout milliseconds. |
|
|
* If \a timeout is a negative value the calling thread will be blocked |
|
|
* until a connection becomes available. This is the default behavior. |
|
|
* |
|
|
* |
|
|
* |
|
|
* \returns initialized connection or nullptr if unsuccessful |
|
|
*/ |
|
|
T acquireConnection( const QString &connInfo, int timeout = -1, bool requestMayBeNested = false ) |
|
|
{ |
|
|
mMutex.lock(); |
|
|
typename T_Groups::iterator it = mGroups.find( connInfo ); |
|
|
if ( it == mGroups.end() ) |
|
|
{ |
|
|
it = mGroups.insert( connInfo, new T_Group( connInfo ) ); |
|
|
} |
|
|
T_Group *group = *it; |
|
|
mMutex.unlock(); |
|
|
mMutex.lock(); |
|
|
typename T_Groups::iterator it = mGroups.find( connInfo ); |
|
|
if ( it == mGroups.end() ) |
|
|
{ |
|
|
it = mGroups.insert( connInfo, new T_Group( connInfo ) ); |
|
|
} |
|
|
T_Group *group = *it; |
|
|
mMutex.unlock(); |
|
|
|
|
|
return group->acquire( timeout, requestMayBeNested ); |
|
|
} |
|
|
return group->acquire( timeout, requestMayBeNested ); |
|
|
} |
|
|
|
|
|
//! Release an existing connection so it will get back into the pool and can be reused |
|
|
void releaseConnection( T conn ) |
|
|
{ |
|
|
mMutex.lock(); |
|
|
typename T_Groups::iterator it = mGroups.find( qgsConnectionPool_ConnectionToName( conn ) ); |
|
|
Q_ASSERT( it != mGroups.end() ); |
|
|
T_Group *group = *it; |
|
|
mMutex.unlock(); |
|
|
//! Release an existing connection so it will get back into the pool and can be reused |
|
|
void releaseConnection( T conn ) |
|
|
{ |
|
|
mMutex.lock(); |
|
|
typename T_Groups::iterator it = mGroups.find( qgsConnectionPool_ConnectionToName( conn ) ); |
|
|
Q_ASSERT( it != mGroups.end() ); |
|
|
T_Group *group = *it; |
|
|
mMutex.unlock(); |
|
|
|
|
|
group->release( conn ); |
|
|
} |
|
|
group->release( conn ); |
|
|
} |
|
|
|
|
|
/** |
|
|
* Invalidates all connections to the specified resource. |
|
|
* The internal state of certain handles (for instance OGR) are altered |
|
|
* when a dataset is modified. Consquently, all open handles need to be |
|
|
* invalidated when such datasets are changed to ensure the handles are |
|
|
* refreshed. See the OGR provider for an example where this is needed. |
|
|
*/ |
|
|
void invalidateConnections( const QString &connInfo ) |
|
|
{ |
|
|
mMutex.lock(); |
|
|
if ( mGroups.contains( connInfo ) ) |
|
|
mGroups[connInfo]->invalidateConnections(); |
|
|
mMutex.unlock(); |
|
|
} |
|
|
/** |
|
|
* Invalidates all connections to the specified resource. |
|
|
* The internal state of certain handles (for instance OGR) are altered |
|
|
* when a dataset is modified. Consquently, all open handles need to be |
|
|
* invalidated when such datasets are changed to ensure the handles are |
|
|
* refreshed. See the OGR provider for an example where this is needed. |
|
|
*/ |
|
|
void invalidateConnections( const QString &connInfo ) |
|
|
{ |
|
|
mMutex.lock(); |
|
|
if ( mGroups.contains( connInfo ) ) |
|
|
mGroups[connInfo]->invalidateConnections(); |
|
|
mMutex.unlock(); |
|
|
} |
|
|
|
|
|
|
|
|
protected: |
|
|
T_Groups mGroups; |
|
|
QMutex mMutex; |
|
|
}; |
|
|
protected: |
|
|
T_Groups mGroups; |
|
|
QMutex mMutex; |
|
|
}; |
|
|
|
|
|
|
|
|
#endif // QGSCONNECTIONPOOL_H |