Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Refactor reindexing and strategy classes
Browse files Browse the repository at this point in the history
  • Loading branch information
seanf committed Aug 6, 2013
1 parent acfa6b7 commit f1085c4
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 71 deletions.
18 changes: 8 additions & 10 deletions zanata-war/src/main/java/org/zanata/action/ReindexAsyncBean.java
Expand Up @@ -30,6 +30,7 @@
import org.zanata.search.ClassIndexer;
import org.zanata.search.HTextFlowTargetIndexingStrategy;
import org.zanata.search.IndexerProcessHandle;
import org.zanata.search.SimpleClassIndexingStrategy;
import org.zanata.service.ProcessManagerService;

@Name("reindexAsync")
Expand Down Expand Up @@ -158,7 +159,7 @@ private void prepareReindex()

if (opts.isReindex())
{
totalOperations += getIndexer(clazz).getEntityCount(session, clazz);
totalOperations += getIndexer(clazz).getEntityCount();
}

if (opts.isOptimize())
Expand All @@ -182,20 +183,17 @@ public void startProcess()
@SuppressWarnings("rawtypes")
ClassIndexer getIndexer(Class<?> clazz)
{
AbstractIndexingStrategy strategy;
// TODO add a strategy which uses TransMemoryStreamingDAO
if( clazz.equals( HTextFlowTarget.class ) )
{
return new ClassIndexer<HTextFlowTarget>() {
@Override
public AbstractIndexingStrategy<HTextFlowTarget> createIndexingStrategy(FullTextSession session, IndexerProcessHandle handle, Class clazz)
{
return new HTextFlowTargetIndexingStrategy(session, handle, clazz);
}
};
strategy = new HTextFlowTargetIndexingStrategy(session);
}
else
{
return new ClassIndexer();
strategy = new SimpleClassIndexingStrategy(clazz, session);
}
return new ClassIndexer(session, handle, clazz, strategy);
}

/**
Expand Down Expand Up @@ -225,7 +223,7 @@ protected void run(IndexerProcessHandle handle) throws Exception
{
log.info("reindexing {0}", clazz);
currentClass = clazz;
getIndexer(clazz).index(session, handle, clazz);
getIndexer(clazz).index(clazz);
}
if (!handle.shouldStop() && indexingOptions.get(clazz).isOptimize())
{
Expand Down
Expand Up @@ -84,4 +84,32 @@ public CloseableIterator<TransMemoryUnit> findTransUnitsByTM(TransMemory transMe

}

/**
* Finds all TransMemoryUnits.
* <p>
* NB: caller must close the iterator, or call next() until the iterator is exhausted,
* or else a database connection will be leaked.
* @param transMemory
* @return
*/
public CloseableIterator<TransMemoryUnit> findAllTransUnits()
{
StreamingEntityIterator<TransMemoryUnit> iter = createIterator();
try
{
Query q = iter.getSession().createQuery(
"FROM TransMemoryUnit tu FETCH ALL PROPERTIES " +
"JOIN FETCH tu.transUnitVariants tuv FETCH ALL PROPERTIES " +
"");
q.setComment("TransMemoryStreamingDAO.findAllTransUnits");
iter.initQuery(q);
return iter;
}
catch (Throwable e)
{
iter.close();
throw new RuntimeException(e);
}
}

}
@@ -1,6 +1,5 @@
package org.zanata.search;

import org.hibernate.Query;
import org.hibernate.ScrollableResults;
import org.hibernate.search.FullTextSession;

Expand All @@ -14,33 +13,33 @@
@Slf4j
public abstract class AbstractIndexingStrategy<T>
{
private IndexerProcessHandle handle;
private int sessionClearBatchSize = 1000;
FullTextSession session;
Class<T> clazz;
ScrollableResults scrollableResults;
private ScrollableResults newScrollableResults;
private final Class<T> clazz;
private final FullTextSession session;


public AbstractIndexingStrategy(FullTextSession session, IndexerProcessHandle handle, Class<T> clazz)
/**
* @param clazz The type of entity to be returned by the Scrollable results
*/
public AbstractIndexingStrategy(Class<T> clazz, FullTextSession session)
{
this.session = session;
this.handle = handle;
this.clazz = clazz;
this.session = session;
}

/**
* Performs the indexing.
*/
public void invoke()
public void invoke(IndexerProcessHandle handle)
{
int n = 0;
newScrollableResults = queryResults(n);
try
{
scrollableResults = getScrollableResults(session, clazz, n);
while (scrollableResults.next() && !handle.shouldStop())
while (newScrollableResults.next() && !handle.shouldStop())
{
n++;
T entity = (T) scrollableResults.get(0); // index each element
T entity = (T) newScrollableResults.get(0); // index each element
session.index(entity);
handle.incrementProgress(1);
if (n % sessionClearBatchSize == 0)
Expand All @@ -54,9 +53,9 @@ public void invoke()
}
finally
{
if( scrollableResults != null )
if( newScrollableResults != null )
{
scrollableResults.close();
newScrollableResults.close();
}
}
}
Expand All @@ -68,18 +67,31 @@ public void invoke()
protected abstract void onEntityIndexed(int n);

/**
* Returns the Scrollable results
* @param session Session used to query and index the entities
* Returns the Scrollable results for instances of clazz
* @param offset
* @param clazz The type of entity to be returned by the Scrollable results
* @param firstResult
* @return
*/
protected abstract ScrollableResults getScrollableResults(FullTextSession session, Class<T> clazz, int firstResult);
protected abstract ScrollableResults queryResults(int offset);

/**
* Create a query which returns instances of clazz
* @param clazz The type of objects being returned by this query.
* @return
*/
protected abstract Query getQuery(FullTextSession session, Class<T> clazz);
}
Class<T> getClazz()
{
return clazz;
}

ScrollableResults getScrollableResults()
{
return newScrollableResults;
}

void setScrollableResults(ScrollableResults scrollableResults)
{
this.newScrollableResults = scrollableResults;
}

FullTextSession getSession()
{
return session;
}

}
23 changes: 18 additions & 5 deletions zanata-war/src/main/java/org/zanata/search/ClassIndexer.java
Expand Up @@ -36,25 +36,38 @@
public class ClassIndexer<T>
{

public AbstractIndexingStrategy<T> createIndexingStrategy(FullTextSession session, IndexerProcessHandle handle, Class<T> clazz)
private final AbstractIndexingStrategy<T> indexingStrategy;
private FullTextSession session;
private IndexerProcessHandle handle;
private Class<?> clazz;

public ClassIndexer(FullTextSession session, IndexerProcessHandle handle, Class<?> clazz, AbstractIndexingStrategy<T> indexingStrategy)
{
this.session = session;
this.handle = handle;
this.clazz = clazz;
this.indexingStrategy = indexingStrategy;
}

public AbstractIndexingStrategy<T> getIndexingStrategy()
{
return new SimpleClassIndexingStrategy(session, handle, clazz);
return indexingStrategy;
}

public int getEntityCount(FullTextSession session, Class<T> clazz)
public int getEntityCount()
{
Long result = (Long) session.createCriteria(clazz).setProjection(Projections.rowCount()).list().get(0);
return result.intValue();
}

public void index(FullTextSession session, IndexerProcessHandle handle, Class<T> clazz)
public void index(Class<T> clazz)
{
log.info("Setting manual-flush and ignore-cache for {}", clazz);
session.setFlushMode(FlushMode.MANUAL);
session.setCacheMode(CacheMode.IGNORE);
try
{
createIndexingStrategy(session, handle, clazz).invoke();
indexingStrategy.invoke(handle);
session.flushToIndexes(); // apply changes to indexes
session.clear(); // clear since the queue is processed
}
Expand Down
Expand Up @@ -35,9 +35,9 @@
*/
public class HTextFlowTargetIndexingStrategy extends AbstractIndexingStrategy<HTextFlowTarget>
{
public HTextFlowTargetIndexingStrategy(FullTextSession session, IndexerProcessHandle handle, Class clazz)
public HTextFlowTargetIndexingStrategy(FullTextSession session)
{
super(session, handle, clazz);
super(HTextFlowTarget.class, session);
}

@Override
Expand All @@ -47,22 +47,17 @@ protected void onEntityIndexed(int n)
}

@Override
protected ScrollableResults getScrollableResults(FullTextSession session, Class clazz, int firstResult)
protected ScrollableResults queryResults(int ignoredOffset)
{
Query query = getQuery(session, clazz);
// TODO move this query into something like HTextFlowTargetStreamingDAO
Query query = getSession().createQuery("from HTextFlowTarget tft " +
"join fetch tft.locale " +
"join fetch tft.textFlow " +
"join fetch tft.textFlow.document " +
"join fetch tft.textFlow.document.locale " +
"join fetch tft.textFlow.document.projectIteration " +
"join fetch tft.textFlow.document.projectIteration.project");
query.setFetchSize(Integer.MIN_VALUE);
return query.scroll(ScrollMode.FORWARD_ONLY);
}

@Override
protected Query getQuery(FullTextSession session, Class clazz)
{
return session.createQuery("from HTextFlowTarget tft " +
"join fetch tft.locale " +
"join fetch tft.textFlow " +
"join fetch tft.textFlow.document " +
"join fetch tft.textFlow.document.locale " +
"join fetch tft.textFlow.document.projectIteration " +
"join fetch tft.textFlow.document.projectIteration.project");
}
}
Expand Up @@ -37,37 +37,31 @@
@Slf4j
public class SimpleClassIndexingStrategy<T> extends AbstractIndexingStrategy<T>
{

public static final int MAX_QUERY_ROWS = 5000;

public SimpleClassIndexingStrategy(FullTextSession session, IndexerProcessHandle handle, Class<T> clazz)
public SimpleClassIndexingStrategy(Class<T> clazz, FullTextSession session)
{
super(session, handle, clazz);
super(clazz, session);
}

@Override
protected void onEntityIndexed(int n)
{
if (n % MAX_QUERY_ROWS == 0)
{
SimpleClassIndexingStrategy.log.info("restarting query for {} (n={})", clazz, n);
scrollableResults.close();
scrollableResults = getScrollableResults(session, clazz, n);
SimpleClassIndexingStrategy.log.info("restarting query for {} (n={})", getClazz(), n);
getScrollableResults().close();
setScrollableResults(queryResults(n));
}
}

@Override
protected ScrollableResults getScrollableResults(FullTextSession session, Class<T> clazz, int firstResult)
protected ScrollableResults queryResults(int offset)
{
Query query = getQuery(session, clazz);
query.setFirstResult(firstResult);
Query query = getSession().createQuery("from "+getClazz().getName());
query.setFirstResult(offset);
query.setMaxResults(MAX_QUERY_ROWS);
return query.scroll(ScrollMode.FORWARD_ONLY);
}

@Override
protected Query getQuery(FullTextSession session, Class<T> clazz)
{
return session.createQuery("from "+clazz.getName());
}
}

0 comments on commit f1085c4

Please sign in to comment.