Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Refactor the reindex process to use the new async task framework.
Browse files Browse the repository at this point in the history
Some issues with the original implementation still remain. This revision does not take care of them.
  • Loading branch information
Carlos A. Munoz committed Aug 21, 2013
1 parent f323076 commit e61464e
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 147 deletions.
64 changes: 50 additions & 14 deletions zanata-war/src/main/java/org/zanata/action/ReindexActionBean.java
Expand Up @@ -2,6 +2,8 @@

import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

import org.jboss.seam.ScopeType;
import org.jboss.seam.annotations.In;
Expand All @@ -12,6 +14,7 @@
import org.joda.time.Period;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.zanata.async.AsyncHandle;

@Name("reindexAction")
@Scope(ScopeType.APPLICATION)
Expand Down Expand Up @@ -101,7 +104,7 @@ public void setOptimizeAll(boolean selected)

public boolean isInProgress()
{
return reindexAsync.getProcessHandle().isInProgress();
return reindexAsync.getProcessHandle() != null && !reindexAsync.getProcessHandle().isDone();
}

public String getCurrentClass()
Expand All @@ -111,40 +114,73 @@ public String getCurrentClass()

public boolean isError()
{
return reindexAsync.getProcessHandle().hasError();
AsyncHandle<Boolean> taskHandle = reindexAsync.getProcessHandle();
if( taskHandle == null )
{
return false;
}
else if( taskHandle.isDone() )
{
try
{
taskHandle.get();
}
catch (InterruptedException e)
{
return true;
}
catch (ExecutionException e)
{
return true;
}
catch (CancellationException e)
{
return false;
}
}
return false;
}

public int getReindexCount()
{
return reindexAsync.getProcessHandle().getMaxProgress();
if(reindexAsync.getProcessHandle() == null)
{
return 0;
}
else
{
return reindexAsync.getProcessHandle().getMaxProgress();
}
}

public int getReindexProgress()
{
return reindexAsync.getProcessHandle().getCurrentProgress();
if(reindexAsync.getProcessHandle() == null)
{
return 0;
}
else
{
return reindexAsync.getProcessHandle().getCurrentProgress();
}
}

public void reindexDatabase()
{
if (!reindexAsync.getProcessHandle().isInProgress())
if (reindexAsync.getProcessHandle() == null || reindexAsync.getProcessHandle().isDone())
{
reindexAsync.startProcess();
}
}

public void cancel()
{
reindexAsync.getProcessHandle().stop();
reindexAsync.getProcessHandle().cancel(false);
}

public boolean isCanceled()
{
return reindexAsync.getProcessHandle().shouldStop();
}

public boolean isStarted()
{
return reindexAsync.getProcessHandle().isStarted();
return reindexAsync.getProcessHandle() != null && reindexAsync.getProcessHandle().isCancelled();
}

// TODO move to common location with ViewAllStatusAction
Expand All @@ -171,13 +207,13 @@ private String formatTimePeriod( long durationInMillis )
}
}

public String getElapsedTime()
/*public String getElapsedTime()
{
return formatTimePeriod(reindexAsync.getProcessHandle().getElapsedTime());
}
public String getEstimatedTimeRemaining()
{
return formatTimePeriod(reindexAsync.getProcessHandle().getEstimatedTimeRemaining());
}
}*/
}
136 changes: 73 additions & 63 deletions zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java
Expand Up @@ -18,25 +18,28 @@
import org.jboss.seam.annotations.Scope;
import org.jboss.seam.annotations.Startup;
import org.jboss.seam.log.Log;
import org.zanata.async.AsyncHandle;
import org.zanata.async.AsyncTask;
import org.zanata.async.SimpleAsyncTask;
import org.zanata.model.HAccount;
import org.zanata.model.HGlossaryEntry;
import org.zanata.model.HGlossaryTerm;
import org.zanata.model.HProject;
import org.zanata.model.HProjectIteration;
import org.zanata.model.HTextFlowTarget;
import org.zanata.model.tm.TransMemoryUnit;
import org.zanata.process.RunnableProcess;
import org.zanata.search.AbstractIndexingStrategy;
import org.zanata.search.ClassIndexer;
import org.zanata.search.HTextFlowTargetIndexingStrategy;
import org.zanata.search.IndexerProcessHandle;
import org.zanata.search.SimpleClassIndexingStrategy;
import org.zanata.service.ProcessManagerService;
import org.zanata.service.impl.AsyncTaskManagerServiceImpl;

import lombok.AllArgsConstructor;

@Name("reindexAsync")
@Scope(ScopeType.APPLICATION)
@Startup
public class ReindexAsyncBean extends RunnableProcess<IndexerProcessHandle> implements Serializable
public class ReindexAsyncBean implements Serializable
{
private static final long serialVersionUID = 1L;

Expand All @@ -47,7 +50,7 @@ public class ReindexAsyncBean extends RunnableProcess<IndexerProcessHandle> impl
EntityManagerFactory entityManagerFactory;

@In
ProcessManagerService processManagerServiceImpl;
AsyncTaskManagerServiceImpl asyncTaskManagerServiceImpl;

private FullTextSession session;

Expand All @@ -56,13 +59,11 @@ public class ReindexAsyncBean extends RunnableProcess<IndexerProcessHandle> impl
private LinkedHashMap<Class<?>, ReindexClassOptions> indexingOptions = new LinkedHashMap<Class<?>, ReindexClassOptions>();
private Class<?> currentClass;

private IndexerProcessHandle handle;
private AsyncHandle<Boolean> handle;

@Create
public void create()
{
handle = new IndexerProcessHandle(0);

indexables.add(HAccount.class);
indexables.add(HGlossaryEntry.class);
indexables.add(HGlossaryTerm.class);
Expand Down Expand Up @@ -118,7 +119,7 @@ public List<ReindexClassOptions> getReindexOptions()
return result;
}

public IndexerProcessHandle getProcessHandle()
public AsyncHandle<Boolean> getProcessHandle()
{
return handle;
}
Expand All @@ -133,18 +134,10 @@ public String getCurrentClassName()
}

/**
* Prepare to reindex lucene search index. This ensures that progress counts
* are properly initialised before the asynchronous run() method is
* called.
* Returns the number of total operations to perform
*/
private void prepareReindex()
private int getTotalOperations()
{
// TODO print message? throw exception?
if (handle.isInProgress())
return;

log.info("Re-indexing started");

session = Search.getFullTextSession((Session) entityManagerFactory.createEntityManager().getDelegate());

// set up progress counter
Expand All @@ -167,17 +160,16 @@ private void prepareReindex()
totalOperations++;
}
}
handle = new IndexerProcessHandle(totalOperations);
return totalOperations;
}

/**
* Facility method to start the background process with this instance's own internal process handle.
*/
public void startProcess()
{
prepareReindex();
// Invoke a self proxy
processManagerServiceImpl.startProcess(this, this.handle);
String taskId = asyncTaskManagerServiceImpl.startTask(new ReindexTask());
this.handle = asyncTaskManagerServiceImpl.getHandle(taskId);
}

@SuppressWarnings("rawtypes")
Expand All @@ -197,58 +189,76 @@ ClassIndexer getIndexer(Class<?> clazz)
}

/**
* Begin reindexing lucene search index. This method should only be called
* after a single call to prepareReindex()
* Private reindex Asynchronous task.
* NB: Separate from the main Bean class as it is not recommended to reuse async tasks.
*/
@Override
protected void run(IndexerProcessHandle handle) throws Exception
private class ReindexTask implements AsyncTask<Boolean, AsyncHandle<Boolean>>
{
// TODO this is necessary because isInProgress checks number of operations, which may be 0
// look at updating isInProgress not to care about count
if (handle.getMaxProgress() == 0)
{
log.info("Reindexing aborted because there are no actions to perform (may be indexing an empty table)");
return;
}
for (Class<?> clazz : indexables)
private AsyncHandle<Boolean> handle;

@Override
public AsyncHandle<Boolean> getHandle()
{
if (!handle.shouldStop() && indexingOptions.get(clazz).isPurge())
if( handle == null )
{
log.info("purging index for {0}", clazz);
currentClass = clazz;
session.purgeAll(clazz);
handle.incrementProgress(1);
handle = new AsyncHandle<Boolean>();
handle.setMaxProgress( getTotalOperations() );
}
if (!handle.shouldStop() && indexingOptions.get(clazz).isReindex())
return handle;
}

@Override
public Boolean call() throws Exception
{
// TODO this is necessary because isInProgress checks number of operations, which may be 0
// look at updating isInProgress not to care about count
if (getHandle().getMaxProgress() == 0)
{
log.info("reindexing {0}", clazz);
currentClass = clazz;
getIndexer(clazz).index();
log.info("Reindexing aborted because there are no actions to perform (may be indexing an empty table)");
return null;
}
if (!handle.shouldStop() && indexingOptions.get(clazz).isOptimize())
for (Class<?> clazz : indexables)
{
log.info("optimizing {0}", clazz);
currentClass = clazz;
session.getSearchFactory().optimize(clazz);
handle.incrementProgress(1);
if (!getHandle().isCancelled() && indexingOptions.get(clazz).isPurge())
{
log.info("purging index for {0}", clazz);
currentClass = clazz;
session.purgeAll(clazz);
getHandle().increaseProgress(1);
}
if (!getHandle().isCancelled() && indexingOptions.get(clazz).isReindex())
{
log.info("reindexing {0}", clazz);
currentClass = clazz;
getIndexer(clazz).index();
}
if (!getHandle().isCancelled() && indexingOptions.get(clazz).isOptimize())
{
log.info("optimizing {0}", clazz);
currentClass = clazz;
session.getSearchFactory().optimize(clazz);
getHandle().increaseProgress(1);
}
}
}

if (handle.shouldStop()) {
log.info("index operation canceled by user");
}
else
{
if (handle.getCurrentProgress() != handle.getMaxProgress())
{
// @formatter: off
log.warn("Did not reindex the expected number of objects. Counted {0} but indexed {1}. "
+ "The index may be out-of-sync. "
+ "This may be caused by lack of sufficient memory, or by database activity during reindexing.", handle.getMaxProgress(), handle.getCurrentProgress());
// @formatter: on
if (getHandle().isCancelled()) {
log.info("index operation canceled by user");
}
else
{
if (getHandle().getCurrentProgress() != getHandle().getMaxProgress())
{
// @formatter: off
log.warn("Did not reindex the expected number of objects. Counted {0} but indexed {1}. "
+ "The index may be out-of-sync. "
+ "This may be caused by lack of sufficient memory, or by database activity during reindexing.",
getHandle().getMaxProgress(), getHandle().getCurrentProgress());
// @formatter: on
}

log.info("Re-indexing finished" + (handle.hasError() ? " with errors" : ""));
log.info("Re-indexing finished");
}
return true;
}
}
}
Expand Up @@ -2,6 +2,7 @@

import org.hibernate.ScrollableResults;
import org.hibernate.search.FullTextSession;
import org.zanata.async.AsyncHandle;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -30,18 +31,18 @@ public AbstractIndexingStrategy(Class<T> entityType, FullTextSession session)
/**
* Performs the indexing.
*/
public void invoke(IndexerProcessHandle handle)
public void invoke(AsyncHandle handle)
{
int rowNum = 0;
scrollableResults = queryResults(rowNum);
try
{
while (scrollableResults.next() && !handle.shouldStop())
while (scrollableResults.next() && !handle.isCancelled())
{
rowNum++;
T entity = (T) scrollableResults.get(0);
session.index(entity);
handle.incrementProgress(1);
handle.increaseProgress(1);
if (rowNum % sessionClearBatchSize == 0)
{
log.info("periodic flush and clear for {} (n={})", entityType, rowNum);
Expand Down Expand Up @@ -69,7 +70,6 @@ public void invoke(IndexerProcessHandle handle)
/**
* Returns the Scrollable results for instances of clazz
* @param offset
* @param entityType The type of entity to be returned by the Scrollable results
* @return
*/
protected abstract ScrollableResults queryResults(int offset);
Expand Down

0 comments on commit e61464e

Please sign in to comment.