Skip to content

Commit

Permalink
TEIID-1092 adding session variables and a minor refactoring of dqpcore
Browse files Browse the repository at this point in the history
inner classes
  • Loading branch information
shawkins committed Mar 26, 2013
1 parent 83dbfd9 commit 9d15cb7
Show file tree
Hide file tree
Showing 20 changed files with 285 additions and 108 deletions.
17 changes: 15 additions & 2 deletions api/src/main/java/org/teiid/CommandContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ GeneratedKeys returnGeneratedKeys(String[] columnNames,
* Returns the last set of generated keys or null if no keys have been generated.
*/
GeneratedKeys getGeneratedKeys();



/**
* Set the session variable and return the old value if any
* @param key
* @param value
* @return
*/
Object setVariable(String key, Object value);

/**
* Get the session variable
* @param key
* @return
*/
Object getVariable(String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA.
*/

package org.teiid.dqp.internal.process;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import org.teiid.dqp.message.RequestID;
import org.teiid.query.tempdata.TempTableStore;

/**
*
*/
//TODO: merge with DQPWorkContext
class ClientState {
List<RequestID> requests;
TempTableStore sessionTables;

public ClientState(TempTableStore tableStoreImpl) {
this.sessionTables = tableStoreImpl;
}

public synchronized void addRequest(RequestID requestID) {
if (requests == null) {
requests = new LinkedList<RequestID>();
}
requests.add(requestID);
}

public synchronized List<RequestID> getRequests() {
if (requests == null) {
return Collections.emptyList();
}
return new ArrayList<RequestID>(requests);
}

public synchronized void removeRequest(RequestID requestID) {
if (requests != null) {
requests.remove(requestID);
}
}
}
90 changes: 0 additions & 90 deletions engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import javax.resource.spi.work.Work;
import javax.transaction.xa.Xid;

import org.teiid.adminapi.AdminException;
Expand Down Expand Up @@ -67,7 +65,6 @@
import org.teiid.core.util.ApplicationInfo;
import org.teiid.core.util.ExecutorUtils;
import org.teiid.core.util.PropertiesUtils;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.TransactionContext;
Expand Down Expand Up @@ -96,93 +93,6 @@ public interface CompletionListener<T> {
void onCompletion(FutureWork<T> future);
}

public final static class FutureWork<T> extends FutureTask<T> implements PrioritizedRunnable, Work {
private int priority;
private long creationTime = System.currentTimeMillis();
private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
private List<CompletionListener<T>> completionListeners = new LinkedList<CompletionListener<T>>();
private String parentName;

public FutureWork(final Callable<T> processor, int priority) {
super(processor);
this.parentName = Thread.currentThread().getName();
this.priority = priority;
}

public FutureWork(final Runnable processor, T result, int priority) {
super(processor, result);
this.priority = priority;
}

@Override
public void run() {
LogManager.logDetail(LogConstants.CTX_DQP, "Running task for parent thread", parentName); //$NON-NLS-1$
super.run();
}

@Override
public int getPriority() {
return priority;
}

@Override
public long getCreationTime() {
return creationTime;
}

@Override
public DQPWorkContext getDqpWorkContext() {
return workContext;
}

@Override
public void release() {

}

void addCompletionListener(CompletionListener<T> completionListener) {
this.completionListeners.add(completionListener);
}

@Override
protected void done() {
for (CompletionListener<T> listener : this.completionListeners) {
listener.onCompletion(this);
}
completionListeners.clear();
}

}

static class ClientState {
List<RequestID> requests;
TempTableStore sessionTables;

public ClientState(TempTableStore tableStoreImpl) {
this.sessionTables = tableStoreImpl;
}

public synchronized void addRequest(RequestID requestID) {
if (requests == null) {
requests = new LinkedList<RequestID>();
}
requests.add(requestID);
}

public synchronized List<RequestID> getRequests() {
if (requests == null) {
return Collections.emptyList();
}
return new ArrayList<RequestID>(requests);
}

public synchronized void removeRequest(RequestID requestID) {
if (requests != null) {
requests.remove(requestID);
}
}
}

private TeiidExecutor processWorkerPool;

// Resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public static void setWorkContext(DQPWorkContext context) {
private Version clientVersion = Version.SEVEN_4;
private boolean admin;
private MetadataFactory metadataFactory;
private Map<String, Object> sessionVariables = Collections.synchronizedMap(new HashMap<String, Object>(2));

public DQPWorkContext() {
}
Expand Down Expand Up @@ -337,9 +338,13 @@ public boolean isAdmin() {

public MetadataFactory getTempMetadataFactory() {
if (this.metadataFactory == null) {
this.metadataFactory = new MetadataFactory("temp", 1, "temp", SystemMetadata.getInstance().getRuntimeTypeMap(), null, null);
this.metadataFactory = new MetadataFactory("temp", 1, "temp", SystemMetadata.getInstance().getRuntimeTypeMap(), null, null); //$NON-NLS-1$ //$NON-NLS-2$
}
return this.metadataFactory;
}

public Map<String, Object> getSessionVariables() {
return sessionVariables;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.teiid.core.util.ReaderInputStream;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.message.AtomicRequestMessage;
import org.teiid.dqp.message.AtomicResultsMessage;
import org.teiid.events.EventDistributor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* JBoss, Home of Professional Open Source.
* See the COPYRIGHT.txt file distributed with this work for information
* regarding copyright ownership. Some portions may be licensed
* to Red Hat, Inc. under one or more contributor license agreements.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA.
*/

package org.teiid.dqp.internal.process;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

import javax.resource.spi.work.Work;

import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;

public final class FutureWork<T> extends FutureTask<T> implements PrioritizedRunnable, Work {
private int priority;
private long creationTime = System.currentTimeMillis();
private DQPWorkContext workContext = DQPWorkContext.getWorkContext();
private List<CompletionListener<T>> completionListeners = new LinkedList<CompletionListener<T>>();
private String parentName;

public FutureWork(final Callable<T> processor, int priority) {
super(processor);
this.parentName = Thread.currentThread().getName();
this.priority = priority;
}

public FutureWork(final Runnable processor, T result, int priority) {
super(processor, result);
this.priority = priority;
}

@Override
public void run() {
LogManager.logDetail(LogConstants.CTX_DQP, "Running task for parent thread", parentName); //$NON-NLS-1$
super.run();
}

@Override
public int getPriority() {
return priority;
}

@Override
public long getCreationTime() {
return creationTime;
}

@Override
public DQPWorkContext getDqpWorkContext() {
return workContext;
}

@Override
public void release() {

}

void addCompletionListener(CompletionListener<T> completionListener) {
this.completionListeners.add(completionListener);
}

@Override
protected void done() {
for (CompletionListener<T> listener : this.completionListeners) {
listener.onCompletion(this);
}
completionListeners.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.JDBCSQLTypeInfo;
import org.teiid.core.types.XMLType;
import org.teiid.dqp.internal.process.DQPCore.ClientState;
import org.teiid.dqp.internal.process.DQPWorkContext.Version;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.message.RequestID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.teiid.core.types.DataTypeManager;
import org.teiid.dqp.internal.process.AuthorizationValidator.CommandType;
import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.internal.process.SessionAwareCache.CacheID;
import org.teiid.dqp.internal.process.ThreadReuseExecutor.PrioritizedRunnable;
import org.teiid.dqp.message.AtomicRequestID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.teiid.client.xa.XATransactionException;
import org.teiid.client.xa.XidImpl;
import org.teiid.core.util.Assertion;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
import org.teiid.dqp.service.TransactionContext;
import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.dqp.service.TransactionService;
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/org/teiid/query/QueryPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ public static enum Event implements BundleUtil.Event{
TEIID31132,
TEIID31133,
TEIID31134,
TEIID31135,
TEIID31135,
TEIID31136,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.teiid.json.simple.JSONParser;
import org.teiid.json.simple.ParseException;
import org.teiid.query.QueryPlugin;
import org.teiid.query.function.metadata.FunctionCategoryConstants;
import org.teiid.query.function.source.XMLSystemFunctions;
import org.teiid.query.util.CommandContext;

Expand Down Expand Up @@ -200,7 +201,7 @@ public ClobType close() throws TeiidProcessingException {

}

@TeiidFunction(category="JSON")
@TeiidFunction(category=FunctionCategoryConstants.JSON)
public static ClobType jsonParse(ClobType val, boolean wellformed) throws SQLException, IOException, ParseException {
Reader r = null;
if (val.getType() == Type.JSON) {
Expand All @@ -224,7 +225,7 @@ public static ClobType jsonParse(ClobType val, boolean wellformed) throws SQLExc
}
}

@TeiidFunction(category="JSON")
@TeiidFunction(category=FunctionCategoryConstants.JSON)
public static ClobType jsonParse(BlobType val, boolean wellformed) throws SQLException, IOException, ParseException {
InputStreamReader r = XMLSystemFunctions.getJsonReader(val);
try {
Expand All @@ -243,7 +244,7 @@ public static ClobType jsonParse(BlobType val, boolean wellformed) throws SQLExc
}
}

@TeiidFunction(category="JSON")
@TeiidFunction(category=FunctionCategoryConstants.JSON)
public static ClobType jsonArray(CommandContext context, Object... vals) throws TeiidProcessingException {
if (vals == null) {
return null;
Expand Down
Loading

0 comments on commit 9d15cb7

Please sign in to comment.