From 69b1312e48231c68eef552c4a1ee024088191b21 Mon Sep 17 00:00:00 2001 From: Ramesh Reddy Date: Mon, 20 Feb 2017 11:33:40 -0500 Subject: [PATCH] TEIID-4778, TEIID-4775: external materialization scheduling issues (#903) --- .../query/processor/proc/ProcedurePlan.java | 5 ++- .../resources/org/teiid/metadata/SYSADMIN.sql | 3 ++ .../teiid/runtime/MaterializationManager.java | 42 +++++++++++++++---- .../testStoredProcedures.expected | 2 +- 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java b/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java index 89d819475d..0b5256d8b0 100644 --- a/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java +++ b/engine/src/main/java/org/teiid/query/processor/proc/ProcedurePlan.java @@ -392,11 +392,12 @@ private TupleSource processProcedure() inst.process(this); this.evaluator.close(); } - } catch (RuntimeException e) { - throw e; } catch (TeiidComponentException e) { throw e; } catch (Exception e) { + if (e instanceof RuntimeException) { + LogManager.logWarning(org.teiid.logging.LogConstants.CTX_DQP, e, "Unexpected Error"); //$NON-NLS-1$ + } //processing or teiidsqlexception boolean atomic = program.isAtomic(); while (program.getExceptionGroup() == null) { diff --git a/engine/src/main/resources/org/teiid/metadata/SYSADMIN.sql b/engine/src/main/resources/org/teiid/metadata/SYSADMIN.sql index c988caeaff..2b33fe62f9 100644 --- a/engine/src/main/resources/org/teiid/metadata/SYSADMIN.sql +++ b/engine/src/main/resources/org/teiid/metadata/SYSADMIN.sql @@ -198,6 +198,9 @@ BEGIN rowsUpdated = (EXECUTE SYSADMIN.refreshMatView(VARIABLES.fullViewName, loadMatView.invalidate)); EXECUTE logMsg(context=>'org.teiid.MATVIEWS', level=>'INFO', msg=>'Materialization of view ' || VARIABLES.fullViewName || ' completed. Rows updated = ' || VARIABLES.rowsUpdated); RETURN rowsUpdated; + EXCEPTION e + rowsUpdated = -2; + RETURN rowsUpdated; END DECLARE string beforeLoadScript = (SELECT "value" from SYS.Properties WHERE UID = VARIABLES.uid AND Name = '{http://www.teiid.org/ext/relational/2012}MATVIEW_BEFORE_LOAD_SCRIPT'); diff --git a/runtime/src/main/java/org/teiid/runtime/MaterializationManager.java b/runtime/src/main/java/org/teiid/runtime/MaterializationManager.java index 0cc24fff49..4f6038df08 100644 --- a/runtime/src/main/java/org/teiid/runtime/MaterializationManager.java +++ b/runtime/src/main/java/org/teiid/runtime/MaterializationManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.teiid.adminapi.VDB.Status; import org.teiid.adminapi.impl.ModelMetaData; @@ -65,6 +66,7 @@ private interface MaterializationAction { void process(Table table); } + private static final int WAITTIME = 60000; public abstract Executor getExecutor(); public abstract ScheduledExecutorService getScheduledExecutorService(); public abstract DQPCore getDQP(); @@ -239,24 +241,34 @@ public void scheduleJob(CompositeVDB vdb, Table table, long ttl, long delay, boo private void runJob(final CompositeVDB vdb, final Table table, final long ttl, final boolean onetimeJob) { String command = "execute SYSADMIN.loadMatView('"+StringUtil.replaceAll(table.getParent().getName(), "'", "''")+"','"+StringUtil.replaceAll(table.getName(), "'", "''")+"')"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$ //$NON-NLS-6$ //$NON-NLS-7$ try { - executeAsynchQuery(vdb.getVDB(), command).addCompletionListener(new CompletionListener() { + final AtomicInteger procReturn = new AtomicInteger(); + executeAsynchQuery(vdb.getVDB(), command, new DQPCore.ResultsListener() { + @Override + public void onResults(List columns, List> results) throws Exception { + procReturn.set((Integer)results.get(0).get(0)); + }}).addCompletionListener(new CompletionListener() { @Override public void onCompletion(ResultsFuture future) { try { future.get(); if (!onetimeJob) { - scheduleJob(vdb, table, ttl, ttl, onetimeJob); + if (procReturn.get() >= 0) { + scheduleJob(vdb, table, ttl, ttl, onetimeJob); + } else { + // when in error re-schedule in 1 min or less + scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob); + } } } catch (InterruptedException e) { } catch (ExecutionException e) { LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage()); - scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), onetimeJob); // re-schedule the same job in one minute + scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob); // re-schedule the same job in one minute } } }); } catch (SQLException e) { LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage()); - scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), onetimeJob); // re-schedule the same job in one minute + scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), onetimeJob); // re-schedule the same job in one minute } } @@ -293,12 +305,19 @@ public void run() { return; } + // when source or target data source(s) are down, there is no reason + // to run the materialization jobs. schedule for later time + if (!vdb.getVDB().isValid()) { + scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), oneTimeJob); // re-schedule the same job in one minute + return; + } + List> result = null; try { result = executeQuery(vdb.getVDB(), query); } catch (SQLException e) { LogManager.logWarning(LogConstants.CTX_MATVIEWS, e, e.getMessage()); - scheduleJob(vdb, table, ttl, Math.min(ttl/4, 60000), oneTimeJob); // re-schedule the same job in one minute + scheduleJob(vdb, table, ttl, Math.min(ttl/4, WAITTIME), oneTimeJob); // re-schedule the same job in one minute return; } @@ -325,6 +344,9 @@ public void run() { else if (loadstate.equalsIgnoreCase("loading")) { //$NON-NLS-1$ // if the process is already loading do nothing next = ttl - elapsed; + if (elapsed >= ttl) { + next = Math.min(ttl / 4, 60000); + } } else if (loadstate.equalsIgnoreCase("loaded")) { //$NON-NLS-1$ if (elapsed >= ttl) { @@ -334,7 +356,7 @@ else if (loadstate.equalsIgnoreCase("loaded")) { //$NON-NLS-1$ next = ttl - elapsed; } else if (loadstate.equalsIgnoreCase("failed_load")) { //$NON-NLS-1$ - if (elapsed > ttl/4 || elapsed > 60000) { // exceeds 1/4 of cached time or 5 mins + if (elapsed > ttl/4 || elapsed > 60000) { // exceeds 1/4 of cached time or 1 mins runJob(vdb, table, ttl, oneTimeJob); return; } @@ -350,7 +372,6 @@ public ResultsFuture executeAsynchQuery(VDBMetaData vdb, String command) thro @Override public void onResults(List columns, List> results) throws Exception { - } }); } catch (Throwable e) { @@ -358,6 +379,13 @@ public void onResults(List columns, } } + public ResultsFuture executeAsynchQuery(VDBMetaData vdb, String command, DQPCore.ResultsListener listener) throws SQLException { + try { + return DQPCore.executeQuery(command, vdb, "embedded-async", "internal", -1, getDQP(), listener); //$NON-NLS-1$ //$NON-NLS-2$ + } catch (Throwable e) { + throw new SQLException(e); + } + } public List> executeQuery(VDBMetaData vdb, String command) throws SQLException { final List> rows = new ArrayList>(); try { diff --git a/test-integration/common/src/test/resources/TestSystemVirtualModel/testStoredProcedures.expected b/test-integration/common/src/test/resources/TestSystemVirtualModel/testStoredProcedures.expected index 101f26ef04..1145372137 100644 --- a/test-integration/common/src/test/resources/TestSystemVirtualModel/testStoredProcedures.expected +++ b/test-integration/common/src/test/resources/TestSystemVirtualModel/testStoredProcedures.expected @@ -1,6 +1,6 @@ string clob Name Body -loadMatView Clob[11411] +loadMatView Clob[11461] matViewStatus Clob[3119] updateMatView Clob[8839] Row Count : 3