Skip to content

Commit

Permalink
TEIID-2507 allowing for session scoped mat views and correcting the
Browse files Browse the repository at this point in the history
scope of session variables to specifically the session
  • Loading branch information
shawkins committed May 29, 2013
1 parent dcf7d54 commit f592590
Show file tree
Hide file tree
Showing 18 changed files with 227 additions and 56 deletions.
Expand Up @@ -21,7 +21,10 @@
*/
package org.teiid.adminapi.impl;

import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.security.auth.Subject;

Expand Down Expand Up @@ -57,6 +60,7 @@ public class SessionMetadata extends AdminObjectImpl implements Session {
private transient Subject subject;
private transient Object securityContext;
private transient boolean embedded;
private transient Map<String, Object> sessionVariables = Collections.synchronizedMap(new HashMap<String, Object>(2));

@Override
public String getApplicationName() {
Expand Down Expand Up @@ -214,4 +218,8 @@ public void setClientHardwareAddress(String clientHardwareAddress) {
this.clientHardwareAddress = clientHardwareAddress;
}

public Map<String, Object> getSessionVariables() {
return sessionVariables;
}

}
1 change: 1 addition & 0 deletions build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html
Expand Up @@ -37,6 +37,7 @@ <H2><A NAME="Highlights"></A>Highlights</H2>
<li>TEIID-2504 <b>Improved socket results processing</b> large results processing over a socket transport have been significantly improved for forward only results. Scrollable results will have only a marginal improvement.
<li>TEIID-2427 TEIID-2311 <b>Column Masking</b> added column masking and refined row-based security. Also allow for permissions to be granted via the MetadataFactory methods addPermission and addColumnPermisison to enable
adding roles via a MetadataRepository.
<li>TEIID-2507 <b>Session scoped mat views</b> - mat views can now be scoped to sessions via the cache hint, rather than to just the global scope.
</ul>

<h2><a name="Compatibility">Compatibility Issues</a></h2>
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;

import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.dqp.message.RequestID;
import org.teiid.query.tempdata.TempTableStore;

Expand All @@ -37,6 +38,7 @@
class ClientState {
List<RequestID> requests;
TempTableStore sessionTables;
volatile SessionMetadata session;

public ClientState(TempTableStore tableStoreImpl) {
this.sessionTables = tableStoreImpl;
Expand Down
17 changes: 17 additions & 0 deletions engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
Expand Up @@ -44,6 +44,7 @@
import org.teiid.adminapi.Request.ThreadState;
import org.teiid.adminapi.VDB.Status;
import org.teiid.adminapi.impl.RequestMetadata;
import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.adminapi.impl.TransactionMetadata;
import org.teiid.adminapi.impl.WorkerPoolStatisticsMetadata;
import org.teiid.client.DQP;
Expand Down Expand Up @@ -79,9 +80,11 @@
import org.teiid.logging.MessageLevel;
import org.teiid.query.QueryPlugin;
import org.teiid.query.processor.QueryProcessor;
import org.teiid.query.tempdata.GlobalTableStoreImpl;
import org.teiid.query.tempdata.TempTableDataManager;
import org.teiid.query.tempdata.TempTableStore;
import org.teiid.query.tempdata.TempTableStore.TransactionMode;
import org.teiid.query.util.CommandContext;
import org.teiid.query.util.Options;

/**
Expand Down Expand Up @@ -243,6 +246,9 @@ public ResultsFuture<ResultsMessage> executeRequest(long reqID,RequestMessage re
request = new Request();
}
ClientState state = this.getClientState(workContext.getSessionId(), true);
if (state.session == null) {
state.session = workContext.getSession();
}
request.initialize(requestMsg, bufferManager,
dataTierMgr, transactionService, state.sessionTables,
workContext, this.prepPlanCache);
Expand Down Expand Up @@ -442,6 +448,17 @@ public void terminateSession(String sessionId) {
LogManager.logWarning(LogConstants.CTX_DQP, err, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30026,reqId));
}
}
SessionMetadata session = state.session;
if (session != null) {
GlobalTableStoreImpl store = CommandContext.removeSessionScopedStore(session);
if (store != null) {
try {
store.getTempTableStore().removeTempTables();
} catch (TeiidComponentException e) {
LogManager.logDetail(LogConstants.CTX_DQP, e, "Error removing session scoped temp tables"); //$NON-NLS-1$
}
}
}
}

try {
Expand Down
Expand Up @@ -137,7 +137,6 @@ public static void setWorkContext(DQPWorkContext context) {
private Version clientVersion = Version.SEVEN_4;
private boolean admin;
private MetadataFactory metadataFactory;
private Map<String, Object> sessionVariables = Collections.synchronizedMap(new HashMap<String, Object>(2));

public DQPWorkContext() {
}
Expand Down Expand Up @@ -310,12 +309,11 @@ private boolean matchesPrincipal(Set<String> userRoles, DataPolicy policy) {
}

private Set<String> getUserRoles() {
Set<String> roles = new HashSet<String>();

if (getSubject() == null) {
return Collections.emptySet();
}

Set<String> roles = new HashSet<String>();
Set<Principal> principals = getSubject().getPrincipals();
for(Principal p: principals) {
// this JBoss specific, but no code level dependencies
Expand Down Expand Up @@ -353,8 +351,4 @@ public MetadataFactory getTempMetadataFactory() {
return this.metadataFactory;
}

public Map<String, Object> getSessionVariables() {
return sessionVariables;
}

}
Expand Up @@ -242,11 +242,18 @@ public void fillRow(List<Object> row, Table table,
if (table.getMaterializedTable() == null) {
GlobalTableStore globalStore = cc.getGlobalTableStore();
matTableName = RelationalPlanner.MAT_PREFIX+table.getFullName().toUpperCase();
TempMetadataID id = globalStore.getGlobalTempTableMetadataId(matTableName);
if (id != null && id.getCacheHint() != null && id.getCacheHint().getScope() != null && Scope.VDB.compareTo(id.getCacheHint().getScope()) > 0) {
//consult the session store instead
globalStore = cc.getSessionScopedStore(false);
if (globalStore == null) {
globalStore = cc.getGlobalTableStore();
}
}
MatTableInfo info = globalStore.getMatTableInfo(matTableName);
valid = info.isValid();
state = info.getState().name();
updated = info.getUpdateTime()==-1?null:new Timestamp(info.getUpdateTime());
TempMetadataID id = globalStore.getGlobalTempTableMetadataId(matTableName);
if (id != null) {
cardinaltity = id.getCardinality();
}
Expand Down
Expand Up @@ -29,7 +29,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.id.IDGenerator;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.analysis.AnalysisRecord;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.QueryOptimizer;
Expand Down Expand Up @@ -70,6 +69,8 @@ public QueryProcessorFactoryImpl(BufferManager bufferMgr,
@Override
public QueryProcessor createQueryProcessor(String query, String recursionGroup, CommandContext commandContext, Object... params) throws TeiidProcessingException, TeiidComponentException {
CommandContext copy = commandContext.clone();
copy.resetDeterminismLevel(true);
copy.setDataObjects(null);
QueryMetadataInterface metadata = commandContext.getMetadata();
if (metadata == null) {
metadata = defaultMetadata;
Expand Down Expand Up @@ -97,20 +98,15 @@ public PreparedPlan getPreparedPlan(String query, String recursionGroup,

AbstractValidationVisitor visitor = new ValidationVisitor();
Request.validateWithVisitor(visitor, metadata, newCommand);
Determinism determinismLevel = commandContext.resetDeterminismLevel();
try {
newCommand = QueryRewriter.rewrite(newCommand, metadata, commandContext);
AnalysisRecord record = new AnalysisRecord(false, false);
ProcessorPlan plan = QueryOptimizer.optimizePlan(newCommand, metadata, idGenerator, finder, record, commandContext);
pp = new PreparedPlan();
pp.setPlan(plan, commandContext);
pp.setReferences(references);
pp.setAnalysisRecord(record);
pp.setCommand(newCommand);
commandContext.putPlan(query, pp, commandContext.getDeterminismLevel());
} finally {
commandContext.setDeterminismLevel(determinismLevel);
}
newCommand = QueryRewriter.rewrite(newCommand, metadata, commandContext);
AnalysisRecord record = new AnalysisRecord(false, false);
ProcessorPlan plan = QueryOptimizer.optimizePlan(newCommand, metadata, idGenerator, finder, record, commandContext);
pp = new PreparedPlan();
pp.setPlan(plan, commandContext);
pp.setReferences(references);
pp.setAnalysisRecord(record);
pp.setCommand(newCommand);
commandContext.putPlan(query, pp, commandContext.getDeterminismLevel());
}
return pp;
}
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/org/teiid/query/QueryPlugin.java
Expand Up @@ -554,6 +554,7 @@ public static enum Event implements BundleUtil.Event{
TEIID31139,
TEIID31140,
TEIID31141,
TEIID31142,
TEIID31142,
TEIID31143,
}
}
Expand Up @@ -24,8 +24,8 @@

import java.util.Map;

import org.teiid.adminapi.impl.SessionMetadata;
import org.teiid.api.exception.query.FunctionExecutionException;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.QueryPlugin;
import org.teiid.query.function.metadata.FunctionCategoryConstants;
Expand All @@ -42,8 +42,8 @@ public static Object teiid_session_get(CommandContext context, String key) {

@TeiidFunction(category=FunctionCategoryConstants.SYSTEM, determinism=Determinism.COMMAND_DETERMINISTIC)
public static Object teiid_session_set(CommandContext context, String key, Object value) throws FunctionExecutionException {
DQPWorkContext workContext = context.getDQPWorkContext();
Map<String, Object> variables = workContext.getSessionVariables();
SessionMetadata session = context.getSession();
Map<String, Object> variables = session.getSessionVariables();
if (variables.size() > MAX_VARIABLES && !variables.containsKey(key)) {
throw new FunctionExecutionException(QueryPlugin.Event.TEIID31136, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID31136, MAX_VARIABLES));
}
Expand Down
Expand Up @@ -82,6 +82,7 @@
import org.teiid.query.sql.symbol.Reference;
import org.teiid.query.tempdata.GlobalTableStoreImpl.MatTableInfo;
import org.teiid.query.util.CommandContext;
import org.teiid.translator.CacheDirective.Scope;

/**
* This proxy ProcessorDataManager is used to handle temporary tables.
Expand Down Expand Up @@ -308,11 +309,9 @@ private TupleSource handleCachedProcedure(final CommandContext context,
param.setExpression(new Reference(i++));
}
final QueryProcessor qp = context.getQueryProcessorFactory().createQueryProcessor(cloneProc.toString(), fullName.toUpperCase(), context, vals.toArray());
qp.getContext().setDataObjects(null);
final BatchCollector bc = qp.createBatchCollector();

return new ProxyTupleSource() {
Determinism determinismLevelOrig = context.resetDeterminismLevel();
boolean success = false;

@Override
Expand All @@ -321,21 +320,19 @@ protected TupleSource createTupleSource() throws TeiidComponentException,
TupleBuffer tb = bc.collectTuples();
CachedResults cr = new CachedResults();
cr.setResults(tb, qp.getProcessorPlan());
//there is a chance that this level is not correct as other stuff may have been processed along with this plan
Determinism determinismLevel = context.getDeterminismLevel();
Determinism determinismLevel = qp.getContext().getDeterminismLevel();
if (hint != null && hint.getDeterminism() != null) {
LogManager.logTrace(LogConstants.CTX_DQP, new Object[] { "Cache hint modified the query determinism from ",determinismLevel, " to ", hint.getDeterminism() }); //$NON-NLS-1$ //$NON-NLS-2$
determinismLevel = hint.getDeterminism();
}
cache.put(cid, determinismLevel, cr, hint != null?hint.getTtl():null);
context.setDeterminismLevel(determinismLevelOrig);
context.setDeterminismLevel(determinismLevel);
success = true;
return tb.createIndexedTupleSource();
}

@Override
public void closeSource() {
context.setDeterminismLevel(determinismLevelOrig);
super.closeSource();
qp.closeProcessing();
if (!success && bc.getTupleBuffer() != null) {
Expand All @@ -351,10 +348,10 @@ private TupleSource handleSystemProcedures(final CommandContext context, StoredP
QueryValidatorException, TeiidProcessingException,
ExpressionEvaluationException {
final QueryMetadataInterface metadata = context.getMetadata();
final GlobalTableStore globalStore = context.getGlobalTableStore();
if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEW)) {
Object groupID = validateMatView(metadata, (String)((Constant)proc.getParameter(2).getExpression()).getValue());
Object matTableId = globalStore.getGlobalTempTableMetadataId(groupID);
TempMetadataID matTableId = context.getGlobalTableStore().getGlobalTempTableMetadataId(groupID);
final GlobalTableStore globalStore = getGlobalStore(context, matTableId);
String matViewName = metadata.getFullName(groupID);
String matTableName = metadata.getFullName(matTableId);
LogManager.logDetail(LogConstants.CTX_MATVIEWS, "processing refreshmatview for", matViewName); //$NON-NLS-1$
Expand All @@ -368,6 +365,8 @@ private TupleSource handleSystemProcedures(final CommandContext context, StoredP
return loadGlobalTable(context, matTable, matTableName, globalStore);
} else if (StringUtil.endsWithIgnoreCase(proc.getProcedureCallableName(), REFRESHMATVIEWROW)) {
final Object groupID = validateMatView(metadata, (String)((Constant)proc.getParameter(2).getExpression()).getValue());
TempMetadataID matTableId = context.getGlobalTableStore().getGlobalTempTableMetadataId(groupID);
final GlobalTableStore globalStore = getGlobalStore(context, matTableId);
Object pk = metadata.getPrimaryKey(groupID);
String matViewName = metadata.getFullName(groupID);
if (pk == null) {
Expand All @@ -394,7 +393,6 @@ private TupleSource handleSystemProcedures(final CommandContext context, StoredP
String queryString = Reserved.SELECT + " * " + Reserved.FROM + ' ' + matViewName + ' ' + Reserved.WHERE + ' ' + //$NON-NLS-1$
metadata.getFullName(id) + " = ?" + ' ' + Reserved.OPTION + ' ' + Reserved.NOCACHE; //$NON-NLS-1$
final QueryProcessor qp = context.getQueryProcessorFactory().createQueryProcessor(queryString, matViewName.toUpperCase(), context, value);
qp.getContext().setDataObjects(null);
final TupleSource ts = new BatchCollector.BatchProducerTupleSource(qp);
return new ProxyTupleSource() {

Expand Down Expand Up @@ -449,7 +447,8 @@ private TupleSource registerQuery(final CommandContext context,
}
final String tableName = group.getNonCorrelationName();
if (group.isGlobalTable()) {
final GlobalTableStore globalStore = context.getGlobalTableStore();
TempMetadataID matTableId = (TempMetadataID)group.getMetadataID();
final GlobalTableStore globalStore = getGlobalStore(context, matTableId);
final MatTableInfo info = globalStore.getMatTableInfo(tableName);
return new ProxyTupleSource() {
Future<Void> moreWork = null;
Expand Down Expand Up @@ -563,6 +562,14 @@ protected TupleSource createTupleSource()
};
}

private GlobalTableStore getGlobalStore(final CommandContext context, TempMetadataID matTableId) {
GlobalTableStore globalStore = context.getGlobalTableStore();
if (matTableId.getCacheHint() == null || matTableId.getCacheHint().getScope() == null || Scope.VDB.compareTo(matTableId.getCacheHint().getScope()) <= 0) {
return globalStore;
}
return context.getSessionScopedStore(true);
}

private void loadViaRefresh(final CommandContext context, final String tableName, VDBMetaData vdb, MatTableInfo info) throws TeiidProcessingException, TeiidComponentException {
info.setAsynchLoad();
DQPWorkContext workContext = createWorkContext(context, vdb);
Expand Down Expand Up @@ -613,12 +620,13 @@ protected TupleSource createTupleSource() throws TeiidComponentException,
String fullName = metadata.getFullName(group.getMetadataID());
String transformation = metadata.getVirtualPlan(group.getMetadataID()).getQuery();
qp = context.getQueryProcessorFactory().createQueryProcessor(transformation, fullName, context);
qp.getContext().setDataObjects(null);
insertTupleSource = new BatchCollector.BatchProducerTupleSource(qp);
}
table.insert(insertTupleSource, allColumns, false, null);
table.getTree().compact();
rowCount = table.getRowCount();
Determinism determinism = qp.getContext().getDeterminismLevel();
context.setDeterminismLevel(determinism);
//TODO: could pre-process indexes to remove overlap
for (Object index : metadata.getIndexesInGroup(group.getMetadataID())) {
List<ElementSymbol> columns = GlobalTableStoreImpl.resolveIndex(metadata, allColumns, index);
Expand All @@ -632,6 +640,9 @@ protected TupleSource createTupleSource() throws TeiidComponentException,
if (hint != null && table.getPkLength() > 0) {
table.setUpdatable(hint.isUpdatable(false));
}
if (determinism.compareTo(Determinism.VDB_DETERMINISTIC) < 0 && (hint == null || hint.getScope() == null || Scope.VDB.compareTo(hint.getScope()) <= 0)) {
LogManager.logInfo(LogConstants.CTX_DQP, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID31143, determinism, tableName)); //$NON-NLS-1$
}
globalStore.loaded(tableName, table);
success = true;
LogManager.logInfo(LogConstants.CTX_MATVIEWS, QueryPlugin.Util.gs(QueryPlugin.Event.TEIID30014, tableName, rowCount));
Expand Down

0 comments on commit f592590

Please sign in to comment.