Skip to content

Commit

Permalink
Add cancel to native DrillCleint
Browse files Browse the repository at this point in the history
Also change the coordination id type to match the input.
  • Loading branch information
vkorukanti committed Jul 18, 2016
1 parent 70aba77 commit e0ef634
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 12 deletions.
11 changes: 10 additions & 1 deletion contrib/native/client/example/querySubmitter.cpp
Expand Up @@ -415,7 +415,14 @@ int main(int argc, char* argv[]) {
client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, qryHandle);
client.registerSchemaChangeListener(qryHandle, SchemaListener);

client.waitForResults();
if(bTestCancel) {
// Send cancellation request after 5seconds
boost::this_thread::sleep(boost::posix_time::milliseconds(5000));
std::cout<< "\n Cancelling query: " << *queryInpIter << "\n" << std::endl;
client.cancelQuery(*qryHandle);
} else {
client.waitForResults();
}

client.freeQueryResources(qryHandle);
delete qryHandle;
Expand All @@ -440,5 +447,7 @@ int main(int argc, char* argv[]) {
std::cerr << e.what() << std::endl;
}

system("pause");

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 19, 2016

This works on all platforms?
Even if it does, can you remove this before checking in? It breaks scripting (which I use for testing every now and then)

This comment has been minimized.

Copy link
@vkorukanti

vkorukanti Jul 19, 2016

Owner

Sure. Will remove this.


return 0;
}
5 changes: 5 additions & 0 deletions contrib/native/client/src/clientlib/drillClient.cpp
Expand Up @@ -387,6 +387,11 @@ RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string&
return pIter;
}

void DrillClient::cancelQuery(QueryHandle_t handle) {
assert(handle!=NULL);

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 19, 2016

In release code this will get compiled out. So if the ODBC driver calls this incorrectly then we will crash. A log message would be very useful here.

This comment has been minimized.

Copy link
@vkorukanti

vkorukanti Jul 19, 2016

Owner

I see. I was following the existing examples here. Looks like time to cleanup all these APIs

((DrillClientQueryResult*)handle)->cancel();
}

void* DrillClient::getApplicationContext(QueryHandle_t handle){
assert(handle!=NULL);
return ((DrillClientQueryResult*)handle)->getListenerContext();
Expand Down
37 changes: 33 additions & 4 deletions contrib/native/client/src/clientlib/drillClientImpl.cpp
Expand Up @@ -521,6 +521,10 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t
return pQuery;
}

void DrillClientImpl::CancelQuery(exec::shared::QueryId* pQueryId) {
sendCancel(pQueryId);
}

void DrillClientImpl::getNextResult(){

// This call is always made from within a function where the mutex has already been acquired
Expand Down Expand Up @@ -1327,8 +1331,30 @@ void DrillClientQueryResult::waitForData() {
}
}

void DrillClientQueryResult::setQueryId(exec::shared::QueryId* q){
boost::lock_guard<boost::mutex> queryIdLock(this->m_queryIdMutex);
this->m_pQueryId=q;

// If the query status is set to cancelled, send cancellation message to Drillbit
if (q != NULL && this->isCancelled()) {
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::setQueryId: Query is cancelled. Send cancel request to Drillbit.\n";)
this->m_pClient->CancelQuery(this->m_pQueryId);

This comment has been minimized.

Copy link
@parthchandra

parthchandra Jul 19, 2016

This comment applies to both the places where CancelQuery is being called.
The existing protocol between the C++ client lib and the applications was that cancel was available only after query id has been received and that the way to cancel was to return FAILURE from the listenerCallback. So the code at the end of DrillClientImpl::processQueryData calls sendCancel if pResultsListener(...) returns QRY_FAILURE.
There is some cleanup required when the cancel is processed otherwise you end up with memory leaks.
Also it is a good idea to call setIsQueryPending and set it to false. The synchronous version of the API blocks on this flag and might wait forever if it is not reset.
(Actually, the sync API is not used by anyone that I know of so no real harm done, but one never knows)

This comment has been minimized.

Copy link
@vkorukanti

vkorukanti Jul 19, 2016

Owner

Problem is first data (barring fast schema batch) to arrive could take several minutes, in case when the query involves a join or sort. In Join/sort cases, when the first batch arrives you have already done pretty much most of the work. Thats why I want to send to cancel as soon as possible.

Need to check if there are any leaks in cases like: cancel is sent, but some data packets are already in flight.

Will check the query pending flag.

}
}

void DrillClientQueryResult::cancel() {
boost::lock_guard<boost::mutex> cancelLock(this->m_cancelStatusMutex);
this->m_bCancel=true;

DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::cancel: Cancel requested.\n";)

// If the query id is received, send cancellation message to Drillbit
if (this->getQueryId() != NULL) {
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::cancel: Sending cancel request to Drillbit\n";)
this->m_pClient->CancelQuery(this->m_pQueryId);
} else {
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::cancel: Cancellation is not sent as the QueryId has not arrived.\n";)
}
}

void DrillClientQueryResult::signalError(DrillClientError* pErr){
Expand Down Expand Up @@ -1382,15 +1408,18 @@ void DrillClientQueryResult::clearAndDestroy(){
}
m_columnDefs->clear();
}
if(this->m_pQueryId!=NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)

exec::shared::QueryId* queryId = this->getQueryId();
if(queryId != NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*queryId) << std::endl;)
}
//Tell the parent to remove this from its lists
m_pClient->clearMapEntries(this);

//clear query id map entries.
if(this->m_pQueryId!=NULL){
delete this->m_pQueryId; this->m_pQueryId=NULL;
if(queryId!=NULL){
delete queryId;
setQueryId(NULL);
}
if(!m_recordBatches.empty()){
// When multiple qwueries execute in parallel we sometimes get an empty record batch back from the server _after_
Expand Down
32 changes: 25 additions & 7 deletions contrib/native/client/src/clientlib/drillClientImpl.hpp
Expand Up @@ -156,14 +156,20 @@ class DrillClientQueryResult{
}

void cancel();
bool isCancelled(){return this->m_bCancel;};
bool isCancelled() {
boost::lock_guard<boost::mutex> cancelLock(this->m_cancelStatusMutex);
return this->m_bCancel;
};
bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
int32_t getCoordinationId(){ return this->m_coordinationId;}
uint64_t getCoordinationId(){ return this->m_coordinationId;}
const std::string& getQuery(){ return this->m_query;}

void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
void setQueryId(exec::shared::QueryId* q);
void* getListenerContext() {return this->m_pListenerCtx;}
exec::shared::QueryId& getQueryId(){ return *(this->m_pQueryId); }
exec::shared::QueryId* getQueryId() {
boost::lock_guard<boost::mutex> queryIdLock(this->m_queryIdMutex);
return this->m_pQueryId;
}
bool hasError(){ return m_bHasError;}
status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;}
const DrillClientError* getError(){ return m_pError;}
Expand All @@ -190,7 +196,7 @@ class DrillClientQueryResult{

DrillClientImpl* m_pClient;

int32_t m_coordinationId;
uint64_t m_coordinationId;
const std::string& m_query;

size_t m_numBatches; // number of record batches received so far
Expand All @@ -209,6 +215,12 @@ class DrillClientQueryResult{
// if the recordBatches queue is not empty
boost::condition_variable m_cv;

// Mutex to protect QueryId modification
boost::mutex m_queryIdMutex;

// Mutex to protect cancellation modification status
boost::mutex m_cancelStatusMutex;

// state
// if m_bIsQueryPending is true, we continue to wait for results
bool m_bIsQueryPending;
Expand Down Expand Up @@ -300,6 +312,12 @@ class DrillClientImpl : public DrillClientImplBase{
void Close() ;
DrillClientError* getError(){ return m_pError;}
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);

/*
* Cancel the query with given QueryId. Query must have been submitted using this client instance.
*/
void CancelQuery(exec::shared::QueryId* pQueryId);

void waitForResults();
connectionStatus_t validateHandshake(DrillUserProperties* props);
void freeQueryResources(DrillClientQueryResult* pQryResult){
Expand Down Expand Up @@ -501,13 +519,13 @@ class PooledDrillClientImpl : public DrillClientImplBase{
// When picking a drillClientImpl to use, we see how many queries each drillClientImpl
// is currently executing. If none,
std::vector<DrillClientImpl*> m_clientConnections;
boost::mutex m_poolMutex; // protect access to the vector
boost::mutex m_poolMutex; // protect access to the vector

//ZookeeperImpl zook;

// Use this to decide which drillbit to select next from the list of drillbits.
size_t m_lastConnection;
boost::mutex m_cMutex;
boost::mutex m_cMutex;

// Number of queries executed so far. Can be used to select a new Drillbit from the pool.
size_t m_queriesExecuted;
Expand Down
5 changes: 5 additions & 0 deletions contrib/native/client/src/include/drill/drillClient.hpp
Expand Up @@ -329,6 +329,11 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
*/
RecordIterator* submitQuery(Drill::QueryType t, const std::string& plan, DrillClientError* err);

/*
* Cancel the query with given handle.
*/
void cancelQuery(QueryHandle_t handle);

/*
* The client application should call this function to wait for results if it has registered a
* listener.
Expand Down

0 comments on commit e0ef634

Please sign in to comment.