Skip to content

Commit

Permalink
TEIID-2533: moving to stateless model for all CLI operations
Browse files Browse the repository at this point in the history
  • Loading branch information
rareddy committed Jun 7, 2013
1 parent cce4008 commit 89b174f
Showing 1 changed file with 59 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,62 +101,61 @@
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.vdb.runtime.VDBKey;

/**
* Keep this the class and all the extended classes stateless as there is single instance.
*/
abstract class TeiidOperationHandler extends BaseOperationHandler<DQPCore> {
private List<TransportService> transports = new ArrayList<TransportService>();
protected VDBRepository vdbRepo;
protected DQPCore engine;
protected BufferManagerService bufferMgrSvc;

protected TeiidOperationHandler(String operationName){
super(operationName);
}

@Override
protected synchronized DQPCore getService(OperationContext context, PathAddress pathAddress, ModelNode operation) throws OperationFailedException {

this.vdbRepo = null;
this.engine = null;
this.bufferMgrSvc = null;
this.transports.clear();

List<ServiceName> services = context.getServiceRegistry(false).getServiceNames();
for (ServiceName name:services) {
if (TeiidServiceNames.TRANSPORT_BASE.isParentOf(name)) {
ServiceController<?> transport = context.getServiceRegistry(false).getService(name);
if (transport != null) {
this.transports.add(TransportService.class.cast(transport.getValue()));
}
}
}

ServiceController<?> repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.VDB_REPO);
ServiceController<?> repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.ENGINE);
if (repo != null) {
this.vdbRepo = VDBRepository.class.cast(repo.getValue());
return DQPCore.class.cast(repo.getValue());
}
return null;
}

repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.BUFFER_MGR);
protected BufferManagerService getBufferManager(OperationContext context) {
ServiceController<?> repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.BUFFER_MGR);
if (repo != null) {
this.bufferMgrSvc = BufferManagerService.class.cast(repo.getService());
return BufferManagerService.class.cast(repo.getService());
}
return null;
}

repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.ENGINE);
protected VDBRepository getVDBrepository(OperationContext context) {
ServiceController<?> repo = context.getServiceRegistry(false).getRequiredService(TeiidServiceNames.VDB_REPO);
if (repo != null) {
this.engine = DQPCore.class.cast(repo.getValue());
return VDBRepository.class.cast(repo.getValue());
}
return this.engine;
return null;
}

int getSessionCount() throws AdminException {
protected int getSessionCount(OperationContext context) throws AdminException {
int count = 0;
List<TransportService> transportServices = getTransportServices();
List<TransportService> transportServices = getTransportServices(context);
for (TransportService t: transportServices) {
count += t.getActiveSessionsCount();
}
return count;
}

protected synchronized List<TransportService> getTransportServices(){
return new ArrayList<TransportService>(this.transports);
protected List<TransportService> getTransportServices(OperationContext context){
List<TransportService> transports = new ArrayList<TransportService>();
List<ServiceName> services = context.getServiceRegistry(false).getServiceNames();
for (ServiceName name:services) {
if (TeiidServiceNames.TRANSPORT_BASE.isParentOf(name)) {
ServiceController<?> transport = context.getServiceRegistry(false).getService(name);
if (transport != null) {
transports.add(TransportService.class.cast(transport.getValue()));
}
}
}
return transports;
}
}

Expand Down Expand Up @@ -218,7 +217,7 @@ protected GetActiveSessionsCount(String operationName) {
@Override
protected void executeOperation(OperationContext context, DQPCore engine, ModelNode operation) throws OperationFailedException{
try {
context.getResult().set(getSessionCount());
context.getResult().set(getSessionCount(context));
} catch (AdminException e) {
throw new OperationFailedException(new ModelNode().set(e.getMessage()));
}
Expand Down Expand Up @@ -250,7 +249,7 @@ protected void executeOperation(OperationContext context, DQPCore engine, ModelN
}

ModelNode result = context.getResult();
for (TransportService t: getTransportServices()) {
for (TransportService t: getTransportServices(context)) {
Collection<SessionMetadata> sessions = t.getActiveSessions();
for (SessionMetadata session:sessions) {
if (filter) {
Expand Down Expand Up @@ -366,7 +365,7 @@ protected void executeOperation(OperationContext context, DQPCore engine, ModelN
if (!isValidVDB(context, vdbName, vdbVersion)) {
throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50096, vdbName, vdbVersion)));
}
for (TransportService t: getTransportServices()) {
for (TransportService t: getTransportServices(context)) {
List<RequestMetadata> requests = t.getRequestsUsingVDB(vdbName,vdbVersion);
for (RequestMetadata request:requests) {
if (request.sourceRequest()) {
Expand Down Expand Up @@ -420,7 +419,7 @@ protected void executeOperation(OperationContext context, DQPCore engine, ModelN
if (!operation.hasDefined(OperationsConstants.SESSION.getName())) {
throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.getString(OperationsConstants.SESSION.getName()+MISSING)));
}
for (TransportService t: getTransportServices()) {
for (TransportService t: getTransportServices(context)) {
t.terminateSession(operation.get(OperationsConstants.SESSION.getName()).asString());
}
}
Expand Down Expand Up @@ -735,7 +734,7 @@ protected void executeOperation(OperationContext context, DQPCore engine, ModelN
throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.gs(IntegrationPlugin.Event.TEIID50096, vdbName, vdbVersion)));
}

result.set(executeQuery(vdbName, vdbVersion, sql, timeout, new ModelNode()));
result.set(executeQuery(context, engine, vdbName, vdbVersion, sql, timeout, new ModelNode()));
}

@Override
Expand All @@ -749,32 +748,32 @@ protected void describeParameters(SimpleOperationDefinitionBuilder builder) {
builder.setReplyValueType(ModelType.STRING);
}

public ModelNode executeQuery(final String vdbName, final int version, final String command, final long timoutInMilli, final ModelNode resultsNode) throws OperationFailedException {
public ModelNode executeQuery(final OperationContext context, final DQPCore engine, final String vdbName, final int version, final String command, final long timoutInMilli, final ModelNode resultsNode) throws OperationFailedException {
String user = "CLI ADMIN"; //$NON-NLS-1$
LogManager.logDetail(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("admin_executing", user, command)); //$NON-NLS-1$

VDBMetaData vdb = this.vdbRepo.getLiveVDB(vdbName, version);
VDBMetaData vdb = getVDBrepository(context).getLiveVDB(vdbName, version);
if (vdb == null) {
throw new OperationFailedException(new ModelNode().set(IntegrationPlugin.Util.getString("wrong_vdb")));//$NON-NLS-1$
}
final SessionMetadata session = TempTableDataManager.createTemporarySession(user, "admin-console", vdb); //$NON-NLS-1$

final long requestID = 0L;

DQPWorkContext context = new DQPWorkContext();
context.setUseCallingThread(true);
context.setSession(session);
DQPWorkContext workContext = new DQPWorkContext();
workContext.setUseCallingThread(true);
workContext.setSession(session);

try {
return context.runInContext(new Callable<ModelNode>() {
return workContext.runInContext(new Callable<ModelNode>() {
@Override
public ModelNode call() throws Exception {

long start = System.currentTimeMillis();
RequestMessage request = new RequestMessage(command);
request.setExecutionId(requestID);
request.setRowLimit(ExecuteQuery.this.engine.getMaxRowsFetchSize()); // this would limit the number of rows that are returned.
Future<ResultsMessage> message = ExecuteQuery.this.engine.executeRequest(requestID, request);
request.setRowLimit(engine.getMaxRowsFetchSize()); // this would limit the number of rows that are returned.
Future<ResultsMessage> message = engine.executeRequest(requestID, request);
ResultsMessage rm = null;
if (timoutInMilli < 0) {
rm = message.get();
Expand All @@ -793,14 +792,14 @@ public ModelNode call() throws Exception {

while (rm.getFinalRow() == -1 || rm.getLastRow() < rm.getFinalRow()) {
long elapsed = System.currentTimeMillis() - start;
message = ExecuteQuery.this.engine.processCursorRequest(requestID, rm.getLastRow()+1, 1024);
message = engine.processCursorRequest(requestID, rm.getLastRow()+1, 1024);
rm = message.get(timoutInMilli-elapsed, TimeUnit.MILLISECONDS);
writeResults(resultsNode, Arrays.asList(rm.getColumnNames()), rm.getResultsList());
}
}

long elapsed = System.currentTimeMillis() - start;
ResultsFuture<?> response = ExecuteQuery.this.engine.closeRequest(requestID);
ResultsFuture<?> response = engine.closeRequest(requestID);
response.get(timoutInMilli-elapsed, TimeUnit.MILLISECONDS);
return resultsNode;
}
Expand All @@ -809,10 +808,10 @@ public ModelNode call() throws Exception {
throw new OperationFailedException(new ModelNode().set(t.getMessage()));
} finally {
try {
context.runInContext(new Callable<Void>() {
workContext.runInContext(new Callable<Void>() {
@Override
public Void call() throws Exception {
ExecuteQuery.this.engine.terminateSession(session.getSessionId());
engine.terminateSession(session.getSessionId());
return null;
}
});
Expand Down Expand Up @@ -1494,17 +1493,18 @@ protected EngineStatistics() {
protected void executeOperation(OperationContext context, DQPCore engine, ModelNode operation) throws OperationFailedException{
EngineStatisticsMetadata stats = new EngineStatisticsMetadata();
try {
stats.setSessionCount(getSessionCount());
stats.setTotalMemoryUsedInKB(this.bufferMgrSvc.getHeapCacheMemoryInUseKB());
stats.setMemoryUsedByActivePlansInKB(this.bufferMgrSvc.getHeapMemoryInUseByActivePlansKB());
stats.setDiskWriteCount(this.bufferMgrSvc.getDiskWriteCount());
stats.setDiskReadCount(this.bufferMgrSvc.getDiskReadCount());
stats.setCacheReadCount(this.bufferMgrSvc.getCacheReadCount());
stats.setCacheWriteCount(this.bufferMgrSvc.getCacheWriteCount());
stats.setDiskSpaceUsedInMB(this.bufferMgrSvc.getUsedDiskBufferSpaceMB());
stats.setActivePlanCount(this.engine.getActivePlanCount());
stats.setWaitPlanCount(this.engine.getWaitingPlanCount());
stats.setMaxWaitPlanWaterMark(this.engine.getMaxWaitingPlanWatermark());
BufferManagerService bufferMgrSvc = getBufferManager(context);
stats.setSessionCount(getSessionCount(context));
stats.setTotalMemoryUsedInKB(bufferMgrSvc.getHeapCacheMemoryInUseKB());
stats.setMemoryUsedByActivePlansInKB(bufferMgrSvc.getHeapMemoryInUseByActivePlansKB());
stats.setDiskWriteCount(bufferMgrSvc.getDiskWriteCount());
stats.setDiskReadCount(bufferMgrSvc.getDiskReadCount());
stats.setCacheReadCount(bufferMgrSvc.getCacheReadCount());
stats.setCacheWriteCount(bufferMgrSvc.getCacheWriteCount());
stats.setDiskSpaceUsedInMB(bufferMgrSvc.getUsedDiskBufferSpaceMB());
stats.setActivePlanCount(engine.getActivePlanCount());
stats.setWaitPlanCount(engine.getWaitingPlanCount());
stats.setMaxWaitPlanWaterMark(engine.getMaxWaitingPlanWatermark());
VDBMetadataMapper.EngineStatisticsMetadataMapper.INSTANCE.wrap(stats, context.getResult());
} catch (AdminException e) {
throw new OperationFailedException(new ModelNode().set(e.getMessage()));
Expand Down

0 comments on commit 89b174f

Please sign in to comment.