Skip to content

Commit

Permalink
Merge branch 'master' of github.com:teiid/teiid
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Feb 21, 2017
2 parents 054d5b2 + 69b1312 commit 318a88a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
Expand Up @@ -392,11 +392,12 @@ private TupleSource processProcedure()
inst.process(this);
this.evaluator.close();
}
} catch (RuntimeException e) {
throw e;
} catch (TeiidComponentException e) {
throw e;
} catch (Exception e) {
if (e instanceof RuntimeException) {
LogManager.logWarning(org.teiid.logging.LogConstants.CTX_DQP, e, "Unexpected Error"); //$NON-NLS-1$
}
//processing or teiidsqlexception
boolean atomic = program.isAtomic();
while (program.getExceptionGroup() == null) {
Expand Down
3 changes: 3 additions & 0 deletions engine/src/main/resources/org/teiid/metadata/SYSADMIN.sql
Expand Up @@ -198,6 +198,9 @@ BEGIN
rowsUpdated = (EXECUTE SYSADMIN.refreshMatView(VARIABLES.fullViewName, loadMatView.invalidate));
EXECUTE logMsg(context=>'org.teiid.MATVIEWS', level=>'INFO', msg=>'Materialization of view ' || VARIABLES.fullViewName || ' completed. Rows updated = ' || VARIABLES.rowsUpdated);
RETURN rowsUpdated;
EXCEPTION e
rowsUpdated = -2;
RETURN rowsUpdated;
END

DECLARE string beforeLoadScript = (SELECT "value" from SYS.Properties WHERE UID = VARIABLES.uid AND Name = '{http://www.teiid.org/ext/relational/2012}MATVIEW_BEFORE_LOAD_SCRIPT');
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.teiid.adminapi.VDB.Status;
import org.teiid.adminapi.impl.ModelMetaData;
Expand Down Expand Up @@ -65,6 +66,7 @@ private interface MaterializationAction {
void process(Table table);
}

private static final int WAITTIME = 60000;
public abstract Executor getExecutor();
public abstract ScheduledExecutorService getScheduledExecutorService();
public abstract DQPCore getDQP();
Expand Down Expand Up @@ -239,24 +241,34 @@ public void scheduleJob(CompositeVDB vdb, Table table, long ttl, long delay, boo
private void runJob(final CompositeVDB vdb, final Table table, final long ttl, final boolean onetimeJob) {
String command = "execute SYSADMIN.loadMatView('"+StringUtil.replaceAll(table.getParent().getName(), "'", "''")+"','"+StringUtil.replaceAll(table.getName(), "'", "''")+"')"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$
try {
executeAsynchQuery(vdb.getVDB(), command).addCompletionListener(new CompletionListener() {
final AtomicInteger procReturn = new AtomicInteger();
executeAsynchQuery(vdb.getVDB(), command, new DQPCore.ResultsListener() {
@Override
public void onResults(List<String> columns, List<? extends List<?>> results) throws Exception {
procReturn.set((Integer)results.get(0).get(0));
}}).addCompletionListener(new CompletionListener() {
@Override
public void onCompletion(ResultsFuture future) {
try {
future.get();
if (!onetimeJob) {
scheduleJob(vdb, table, ttl, ttl, onetimeJob);
if (procReturn.get() >= 0) {
scheduleJob(vdb, table, ttl, ttl, onetimeJob);
} else {
// when in error re-schedule in 1 min or less
scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob);
}
}
} catch (InterruptedException e) {
} catch (ExecutionException e) {
LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage());
scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), onetimeJob); // re-schedule the same job in one minute
scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob); // re-schedule the same job in one minute
}
}
});
} catch (SQLException e) {
LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage());
scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), onetimeJob); // re-schedule the same job in one minute
scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob); // re-schedule the same job in one minute
}
}

Expand Down Expand Up @@ -293,12 +305,19 @@ public void run() {
return;
}

// when source or target data source(s) are down, there is no reason
// to run the materialization jobs. schedule for later time
if (!vdb.getVDB().isValid()) {
scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), oneTimeJob); // re-schedule the same job in one minute
return;
}

List<Map<String, String>> result = null;
try {
result = executeQuery(vdb.getVDB(), query);
} catch (SQLException e) {
LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage());
scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), oneTimeJob); // re-schedule the same job in one minute
scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), oneTimeJob); // re-schedule the same job in one minute
return;
}

Expand All @@ -325,6 +344,9 @@ public void run() {
else if (loadstate.equalsIgnoreCase("loading")) { //$NON-NLS-1$
// if the process is already loading do nothing
next = ttl - elapsed;
if (elapsed >= ttl) {
next = Math.min(ttl / 4, 60000);
}
}
else if (loadstate.equalsIgnoreCase("loaded")) { //$NON-NLS-1$
if (elapsed >= ttl) {
Expand All @@ -334,7 +356,7 @@ else if (loadstate.equalsIgnoreCase("loaded")) { //$NON-NLS-1$
next = ttl - elapsed;
}
else if (loadstate.equalsIgnoreCase("failed_load")) { //$NON-NLS-1$
if (elapsed > ttl/4 || elapsed > 60000) { // exceeds 1/4 of cached time or 5 mins
if (elapsed > ttl/4 || elapsed > 60000) { // exceeds 1/4 of cached time or 1 mins
runJob(vdb, table, ttl, oneTimeJob);
return;
}
Expand All @@ -350,14 +372,20 @@ public ResultsFuture<?> executeAsynchQuery(VDBMetaData vdb, String command) thro
@Override
public void onResults(List<String> columns,
List<? extends List<?>> results) throws Exception {

}
});
} catch (Throwable e) {
throw new SQLException(e);
}
}

public ResultsFuture<?> executeAsynchQuery(VDBMetaData vdb, String command, DQPCore.ResultsListener listener) throws SQLException {
try {
return DQPCore.executeQuery(command, vdb, "embedded-async", "internal", -1, getDQP(), listener); //$NON-NLS-1$ //$NON-NLS-2$
} catch (Throwable e) {
throw new SQLException(e);
}
}
public List<Map<String, String>> executeQuery(VDBMetaData vdb, String command) throws SQLException {
final List<Map<String, String>> rows = new ArrayList<Map<String,String>>();
try {
Expand Down
@@ -1,6 +1,6 @@
string clob
Name Body
loadMatView Clob[11411]
loadMatView Clob[11461]
matViewStatus Clob[3119]
updateMatView Clob[8839]
Row Count : 3
Expand Down

0 comments on commit 318a88a

Please sign in to comment.