From e61464e06ac66233d979c678f2e27e7f75ac0d75 Mon Sep 17 00:00:00 2001 From: "Carlos A. Munoz" Date: Wed, 21 Aug 2013 13:53:11 +1000 Subject: [PATCH] Refactor the reindex process to use the new async task framework. Some issues with the original implementation still remain. This revision does not take care of them. --- .../org/zanata/action/ReindexActionBean.java | 64 +++++++-- .../org/zanata/action/ReindexAsyncBean.java | 136 ++++++++++-------- .../search/AbstractIndexingStrategy.java | 8 +- .../java/org/zanata/search/ClassIndexer.java | 21 +-- .../zanata/search/IndexerProcessHandle.java | 50 ------- zanata-war/src/main/webapp/admin/search.xhtml | 4 +- 6 files changed, 136 insertions(+), 147 deletions(-) delete mode 100644 zanata-war/src/main/java/org/zanata/search/IndexerProcessHandle.java diff --git a/zanata-war/src/main/java/org/zanata/action/ReindexActionBean.java b/zanata-war/src/main/java/org/zanata/action/ReindexActionBean.java index af4079fb36..f9fc940cd9 100644 --- a/zanata-war/src/main/java/org/zanata/action/ReindexActionBean.java +++ b/zanata-war/src/main/java/org/zanata/action/ReindexActionBean.java @@ -2,6 +2,8 @@ import java.io.Serializable; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import org.jboss.seam.ScopeType; import org.jboss.seam.annotations.In; @@ -12,6 +14,7 @@ import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; +import org.zanata.async.AsyncHandle; @Name("reindexAction") @Scope(ScopeType.APPLICATION) @@ -101,7 +104,7 @@ public void setOptimizeAll(boolean selected) public boolean isInProgress() { - return reindexAsync.getProcessHandle().isInProgress(); + return reindexAsync.getProcessHandle() != null && !reindexAsync.getProcessHandle().isDone(); } public String getCurrentClass() @@ -111,22 +114,60 @@ public String getCurrentClass() public boolean isError() { - return reindexAsync.getProcessHandle().hasError(); + AsyncHandle taskHandle = reindexAsync.getProcessHandle(); + if( taskHandle == null ) + { + return false; + } + else if( taskHandle.isDone() ) + { + try + { + taskHandle.get(); + } + catch (InterruptedException e) + { + return true; + } + catch (ExecutionException e) + { + return true; + } + catch (CancellationException e) + { + return false; + } + } + return false; } public int getReindexCount() { - return reindexAsync.getProcessHandle().getMaxProgress(); + if(reindexAsync.getProcessHandle() == null) + { + return 0; + } + else + { + return reindexAsync.getProcessHandle().getMaxProgress(); + } } public int getReindexProgress() { - return reindexAsync.getProcessHandle().getCurrentProgress(); + if(reindexAsync.getProcessHandle() == null) + { + return 0; + } + else + { + return reindexAsync.getProcessHandle().getCurrentProgress(); + } } public void reindexDatabase() { - if (!reindexAsync.getProcessHandle().isInProgress()) + if (reindexAsync.getProcessHandle() == null || reindexAsync.getProcessHandle().isDone()) { reindexAsync.startProcess(); } @@ -134,17 +175,12 @@ public void reindexDatabase() public void cancel() { - reindexAsync.getProcessHandle().stop(); + reindexAsync.getProcessHandle().cancel(false); } public boolean isCanceled() { - return reindexAsync.getProcessHandle().shouldStop(); - } - - public boolean isStarted() - { - return reindexAsync.getProcessHandle().isStarted(); + return reindexAsync.getProcessHandle() != null && reindexAsync.getProcessHandle().isCancelled(); } // TODO move to common location with ViewAllStatusAction @@ -171,7 +207,7 @@ private String formatTimePeriod( long durationInMillis ) } } - public String getElapsedTime() + /*public String getElapsedTime() { return formatTimePeriod(reindexAsync.getProcessHandle().getElapsedTime()); } @@ -179,5 +215,5 @@ public String getElapsedTime() public String getEstimatedTimeRemaining() { return formatTimePeriod(reindexAsync.getProcessHandle().getEstimatedTimeRemaining()); - } + }*/ } diff --git a/zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java b/zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java index 52fe956ad1..b8cd2e90c9 100644 --- a/zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java +++ b/zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java @@ -18,6 +18,9 @@ import org.jboss.seam.annotations.Scope; import org.jboss.seam.annotations.Startup; import org.jboss.seam.log.Log; +import org.zanata.async.AsyncHandle; +import org.zanata.async.AsyncTask; +import org.zanata.async.SimpleAsyncTask; import org.zanata.model.HAccount; import org.zanata.model.HGlossaryEntry; import org.zanata.model.HGlossaryTerm; @@ -25,18 +28,18 @@ import org.zanata.model.HProjectIteration; import org.zanata.model.HTextFlowTarget; import org.zanata.model.tm.TransMemoryUnit; -import org.zanata.process.RunnableProcess; import org.zanata.search.AbstractIndexingStrategy; import org.zanata.search.ClassIndexer; import org.zanata.search.HTextFlowTargetIndexingStrategy; -import org.zanata.search.IndexerProcessHandle; import org.zanata.search.SimpleClassIndexingStrategy; -import org.zanata.service.ProcessManagerService; +import org.zanata.service.impl.AsyncTaskManagerServiceImpl; + +import lombok.AllArgsConstructor; @Name("reindexAsync") @Scope(ScopeType.APPLICATION) @Startup -public class ReindexAsyncBean extends RunnableProcess implements Serializable +public class ReindexAsyncBean implements Serializable { private static final long serialVersionUID = 1L; @@ -47,7 +50,7 @@ public class ReindexAsyncBean extends RunnableProcess impl EntityManagerFactory entityManagerFactory; @In - ProcessManagerService processManagerServiceImpl; + AsyncTaskManagerServiceImpl asyncTaskManagerServiceImpl; private FullTextSession session; @@ -56,13 +59,11 @@ public class ReindexAsyncBean extends RunnableProcess impl private LinkedHashMap, ReindexClassOptions> indexingOptions = new LinkedHashMap, ReindexClassOptions>(); private Class currentClass; - private IndexerProcessHandle handle; + private AsyncHandle handle; @Create public void create() { - handle = new IndexerProcessHandle(0); - indexables.add(HAccount.class); indexables.add(HGlossaryEntry.class); indexables.add(HGlossaryTerm.class); @@ -118,7 +119,7 @@ public List getReindexOptions() return result; } - public IndexerProcessHandle getProcessHandle() + public AsyncHandle getProcessHandle() { return handle; } @@ -133,18 +134,10 @@ public String getCurrentClassName() } /** - * Prepare to reindex lucene search index. This ensures that progress counts - * are properly initialised before the asynchronous run() method is - * called. + * Returns the number of total operations to perform */ - private void prepareReindex() + private int getTotalOperations() { - // TODO print message? throw exception? - if (handle.isInProgress()) - return; - - log.info("Re-indexing started"); - session = Search.getFullTextSession((Session) entityManagerFactory.createEntityManager().getDelegate()); // set up progress counter @@ -167,7 +160,7 @@ private void prepareReindex() totalOperations++; } } - handle = new IndexerProcessHandle(totalOperations); + return totalOperations; } /** @@ -175,9 +168,8 @@ private void prepareReindex() */ public void startProcess() { - prepareReindex(); - // Invoke a self proxy - processManagerServiceImpl.startProcess(this, this.handle); + String taskId = asyncTaskManagerServiceImpl.startTask(new ReindexTask()); + this.handle = asyncTaskManagerServiceImpl.getHandle(taskId); } @SuppressWarnings("rawtypes") @@ -197,58 +189,76 @@ ClassIndexer getIndexer(Class clazz) } /** - * Begin reindexing lucene search index. This method should only be called - * after a single call to prepareReindex() + * Private reindex Asynchronous task. + * NB: Separate from the main Bean class as it is not recommended to reuse async tasks. */ - @Override - protected void run(IndexerProcessHandle handle) throws Exception + private class ReindexTask implements AsyncTask> { - // TODO this is necessary because isInProgress checks number of operations, which may be 0 - // look at updating isInProgress not to care about count - if (handle.getMaxProgress() == 0) - { - log.info("Reindexing aborted because there are no actions to perform (may be indexing an empty table)"); - return; - } - for (Class clazz : indexables) + private AsyncHandle handle; + + @Override + public AsyncHandle getHandle() { - if (!handle.shouldStop() && indexingOptions.get(clazz).isPurge()) + if( handle == null ) { - log.info("purging index for {0}", clazz); - currentClass = clazz; - session.purgeAll(clazz); - handle.incrementProgress(1); + handle = new AsyncHandle(); + handle.setMaxProgress( getTotalOperations() ); } - if (!handle.shouldStop() && indexingOptions.get(clazz).isReindex()) + return handle; + } + + @Override + public Boolean call() throws Exception + { + // TODO this is necessary because isInProgress checks number of operations, which may be 0 + // look at updating isInProgress not to care about count + if (getHandle().getMaxProgress() == 0) { - log.info("reindexing {0}", clazz); - currentClass = clazz; - getIndexer(clazz).index(); + log.info("Reindexing aborted because there are no actions to perform (may be indexing an empty table)"); + return null; } - if (!handle.shouldStop() && indexingOptions.get(clazz).isOptimize()) + for (Class clazz : indexables) { - log.info("optimizing {0}", clazz); - currentClass = clazz; - session.getSearchFactory().optimize(clazz); - handle.incrementProgress(1); + if (!getHandle().isCancelled() && indexingOptions.get(clazz).isPurge()) + { + log.info("purging index for {0}", clazz); + currentClass = clazz; + session.purgeAll(clazz); + getHandle().increaseProgress(1); + } + if (!getHandle().isCancelled() && indexingOptions.get(clazz).isReindex()) + { + log.info("reindexing {0}", clazz); + currentClass = clazz; + getIndexer(clazz).index(); + } + if (!getHandle().isCancelled() && indexingOptions.get(clazz).isOptimize()) + { + log.info("optimizing {0}", clazz); + currentClass = clazz; + session.getSearchFactory().optimize(clazz); + getHandle().increaseProgress(1); + } } - } - if (handle.shouldStop()) { - log.info("index operation canceled by user"); - } - else - { - if (handle.getCurrentProgress() != handle.getMaxProgress()) - { - // @formatter: off - log.warn("Did not reindex the expected number of objects. Counted {0} but indexed {1}. " - + "The index may be out-of-sync. " - + "This may be caused by lack of sufficient memory, or by database activity during reindexing.", handle.getMaxProgress(), handle.getCurrentProgress()); - // @formatter: on + if (getHandle().isCancelled()) { + log.info("index operation canceled by user"); } + else + { + if (getHandle().getCurrentProgress() != getHandle().getMaxProgress()) + { + // @formatter: off + log.warn("Did not reindex the expected number of objects. Counted {0} but indexed {1}. " + + "The index may be out-of-sync. " + + "This may be caused by lack of sufficient memory, or by database activity during reindexing.", + getHandle().getMaxProgress(), getHandle().getCurrentProgress()); + // @formatter: on + } - log.info("Re-indexing finished" + (handle.hasError() ? " with errors" : "")); + log.info("Re-indexing finished"); + } + return true; } } } diff --git a/zanata-war/src/main/java/org/zanata/search/AbstractIndexingStrategy.java b/zanata-war/src/main/java/org/zanata/search/AbstractIndexingStrategy.java index fc18958cfa..3f49a7df0d 100644 --- a/zanata-war/src/main/java/org/zanata/search/AbstractIndexingStrategy.java +++ b/zanata-war/src/main/java/org/zanata/search/AbstractIndexingStrategy.java @@ -2,6 +2,7 @@ import org.hibernate.ScrollableResults; import org.hibernate.search.FullTextSession; +import org.zanata.async.AsyncHandle; import lombok.extern.slf4j.Slf4j; @@ -30,18 +31,18 @@ public AbstractIndexingStrategy(Class entityType, FullTextSession session) /** * Performs the indexing. */ - public void invoke(IndexerProcessHandle handle) + public void invoke(AsyncHandle handle) { int rowNum = 0; scrollableResults = queryResults(rowNum); try { - while (scrollableResults.next() && !handle.shouldStop()) + while (scrollableResults.next() && !handle.isCancelled()) { rowNum++; T entity = (T) scrollableResults.get(0); session.index(entity); - handle.incrementProgress(1); + handle.increaseProgress(1); if (rowNum % sessionClearBatchSize == 0) { log.info("periodic flush and clear for {} (n={})", entityType, rowNum); @@ -69,7 +70,6 @@ public void invoke(IndexerProcessHandle handle) /** * Returns the Scrollable results for instances of clazz * @param offset - * @param entityType The type of entity to be returned by the Scrollable results * @return */ protected abstract ScrollableResults queryResults(int offset); diff --git a/zanata-war/src/main/java/org/zanata/search/ClassIndexer.java b/zanata-war/src/main/java/org/zanata/search/ClassIndexer.java index 84749cf480..d72f1ed886 100644 --- a/zanata-war/src/main/java/org/zanata/search/ClassIndexer.java +++ b/zanata-war/src/main/java/org/zanata/search/ClassIndexer.java @@ -27,6 +27,7 @@ import org.hibernate.FlushMode; import org.hibernate.criterion.Projections; import org.hibernate.search.FullTextSession; +import org.zanata.async.AsyncHandle; /** * @author Sean Flanigan sflaniga@redhat.com @@ -38,10 +39,10 @@ public class ClassIndexer private final AbstractIndexingStrategy indexingStrategy; private FullTextSession session; - private IndexerProcessHandle handle; + private AsyncHandle handle; private Class entityType; - public ClassIndexer(FullTextSession session, IndexerProcessHandle handle, + public ClassIndexer(FullTextSession session, AsyncHandle handle, Class entityType, AbstractIndexingStrategy indexingStrategy) { this.session = session; @@ -61,22 +62,14 @@ public int getEntityCount() return result.intValue(); } - public void index() + public void index() throws Exception { log.info("Setting manual-flush and ignore-cache for {}", entityType); session.setFlushMode(FlushMode.MANUAL); session.setCacheMode(CacheMode.IGNORE); - try - { - indexingStrategy.invoke(handle); - session.flushToIndexes(); // apply changes to indexes - session.clear(); // clear since the queue is processed - } - catch (Exception e) - { - log.warn("Unable to index objects of type {}", e, entityType.getName()); - handle.setHasError(true); - } + indexingStrategy.invoke(handle); + session.flushToIndexes(); // apply changes to indexes + session.clear(); // clear since the queue is processed } } diff --git a/zanata-war/src/main/java/org/zanata/search/IndexerProcessHandle.java b/zanata-war/src/main/java/org/zanata/search/IndexerProcessHandle.java deleted file mode 100644 index 6e7a963027..0000000000 --- a/zanata-war/src/main/java/org/zanata/search/IndexerProcessHandle.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2013, Red Hat, Inc. and individual contributors - * as indicated by the @author tags. See the copyright.txt file in the - * distribution for a full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ - -package org.zanata.search; - -import java.io.Serializable; - -import org.zanata.process.ProcessHandle; - -public class IndexerProcessHandle extends ProcessHandle implements Serializable -{ - private static final long serialVersionUID = 1L; - public boolean hasError; - - /** - * @param maxProgress - */ - public IndexerProcessHandle(int maxProgress) - { - setMaxProgress(maxProgress); - } - - public boolean hasError() - { - return hasError; - } - - void setHasError(boolean hasError) - { - this.hasError = hasError; - } -} \ No newline at end of file diff --git a/zanata-war/src/main/webapp/admin/search.xhtml b/zanata-war/src/main/webapp/admin/search.xhtml index c66eeb6328..0adb9c8d8c 100644 --- a/zanata-war/src/main/webapp/admin/search.xhtml +++ b/zanata-war/src/main/webapp/admin/search.xhtml @@ -105,10 +105,10 @@

#{messages['jsf.ManageSearch.CurrentProgress']}

- +

#{messages['jsf.ManageSearch.NoOperationsRunning']}

- +

#{messages['jsf.ManageSearch.Completed']}