Skip to content

Commit

Permalink
#73 change ref assignment for EntityManager inside the job
Browse files Browse the repository at this point in the history
  • Loading branch information
mincong-h committed Jul 2, 2016
1 parent bd0fe3a commit 75bcf3e
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 40 deletions.
Expand Up @@ -2,6 +2,8 @@

import java.util.Set;

import javax.persistence.EntityManager;

public interface MassIndexer {

public long start();
Expand All @@ -17,6 +19,8 @@ public interface MassIndexer {
public MassIndexer purgeAtStart(boolean purgeAtStart);
public MassIndexer rootEntities(Set<Class<?>> rootEntities);
public MassIndexer threads(int threads);
// TODO: should be reviewed
public MassIndexer entityManager(EntityManager entityManager);

public int getArrayCapacity();
public int getFetchSize();
Expand All @@ -28,4 +32,5 @@ public interface MassIndexer {
public boolean isPurgeAtStart();
public Set<Class<?>> getRootEntities();
public int getThreads();
public EntityManager getEntityManager();
}
Expand Up @@ -8,6 +8,7 @@
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.CDI;
import javax.persistence.EntityManager;

import org.hibernate.search.jsr352.internal.IndexingContext;

Expand All @@ -23,6 +24,7 @@ public class MassIndexerImpl implements MassIndexer {
private int partitions = 4;
private int threads = 2;
private Set<Class<?>> rootEntities;
private EntityManager entityManager;

private final String JOB_NAME = "mass-index";

Expand Down Expand Up @@ -58,6 +60,7 @@ public MassIndexerImpl() {
public long start() {

registrerRootEntities(rootEntities);
registrerEntityManager(entityManager);

Properties jobParams = new Properties();
jobParams.setProperty("fetchSize", String.valueOf(fetchSize));
Expand Down Expand Up @@ -164,6 +167,12 @@ public MassIndexer rootEntities(Set<Class<?>> rootEntities) {
return this;
}

@Override
public MassIndexer entityManager(EntityManager entityManager) {
this.entityManager = entityManager;
return this;
}

@Override
public boolean isOptimizeAfterPurge() {
return optimizeAfterPurge;
Expand Down Expand Up @@ -226,4 +235,19 @@ public void registrerRootEntities(Set<Class<?>> rootEntities) {
Class<?>[] r = rootEntities.toArray(new Class<?>[s]);
indexingContext.setRootEntities(r);
}

private void registrerEntityManager(EntityManager entityManager) {
BeanManager bm = CDI.current().getBeanManager();
Bean<IndexingContext> bean = (Bean<IndexingContext>) bm
.resolve(bm.getBeans(IndexingContext.class));
IndexingContext indexingContext = bm
.getContext(bean.getScope())
.get(bean, bm.createCreationalContext(bean));
indexingContext.setEntityManager(entityManager);
}

@Override
public EntityManager getEntityManager() {
return entityManager;
}
}
Expand Up @@ -10,7 +10,6 @@
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
import javax.persistence.criteria.CriteriaBuilder.In;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Path;
Expand Down Expand Up @@ -66,15 +65,10 @@
@Named
public class BatchItemProcessor implements ItemProcessor {

@PersistenceContext(unitName = "jsr352")
private EntityManager em;
private Session session;
private ExtendedSearchIntegrator searchIntegrator;
private DocumentBuilderIndexedEntity docBuilder;
private EntityIndexBinding entityIndexBinding;
private InstanceInitializer sessionInitializer;
private ConversionContext conversionContext;
private IndexShardingStrategy shardingStrategy;

@Inject private IndexingContext indexingContext;
@Inject private StepContext stepContext;
Expand All @@ -97,16 +91,14 @@ public class BatchItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object item) throws Exception {

if (em == null) {
em = indexingContext.getEntityManager();
}

logger.debugf("processItem(Object) called. entityType=%s", entityType);
Class<?> entityClazz = findClass(entityType);

// TODO: change the id to generic type
// TODO: accept all entity type. For instance, only Address.class works
if (entityType.equals("org.hibernate.search.jsr352.test.entity.Stock")) {
updateWorksCount(0);
return null;
}

// TODO: should keep item as "Serializable[]" and not cast to "int[]"
int[] ids = toIntArray((Serializable[]) item);
List<?> entities = null;
List<AddLuceneWork> addWorks = null;
Expand Down Expand Up @@ -160,18 +152,25 @@ private List<AddLuceneWork> buildAddLuceneWorks(List<?> entities,
Class<?> entityClazz) {

List<AddLuceneWork> addWorks = new LinkedList<>();
// TODO: tenant ID should not be null
// Or may it be fine to be null? Gunnar's integration test in Hibernate
// Search: MassIndexingTimeoutIT does not mention the tenant ID neither
// (The tenant ID is not included mass indexer setup in the ConcertManager)
String tenantId = null;

session = em.unwrap(Session.class);
searchIntegrator = ContextHelper.getSearchintegrator(session);
entityIndexBinding = searchIntegrator
.getIndexBindings()
.get(entityClazz);
shardingStrategy = entityIndexBinding.getSelectionStrategy();

DocumentBuilderIndexedEntity docBuilder = entityIndexBinding.getDocumentBuilder();
// NotSharedStrategy
IndexShardingStrategy shardingStrategy = entityIndexBinding.getSelectionStrategy();
logger.infof("indexShardingStrategy=%s", shardingStrategy.toString());
indexingContext.setIndexShardingStrategy(shardingStrategy);
docBuilder = entityIndexBinding.getDocumentBuilder();
conversionContext = new ContextualExceptionBridgeHelper();
sessionInitializer = new HibernateSessionLoadingInitializer(
ConversionContext conversionContext = new ContextualExceptionBridgeHelper();
final InstanceInitializer sessionInitializer = new HibernateSessionLoadingInitializer(
(SessionImplementor) session
);

Expand All @@ -185,6 +184,7 @@ private List<AddLuceneWork> buildAddLuceneWorks(List<?> entities,
.setClass(entityClazz)
.twoWayConversionContext(idBridge)
.objectToString(id);
logger.infof("idInString=%s", idInString);
} finally {
conversionContext.popProperty();
}
Expand Down
Expand Up @@ -9,7 +9,6 @@
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
Expand All @@ -35,7 +34,6 @@ public class IdProducerBatchlet implements Batchlet {
@Inject @BatchProperty private int maxResults;
@Inject @BatchProperty private String entityType;

@PersistenceContext(unitName = "jsr352")
private EntityManager em;
private Session session;

Expand All @@ -51,7 +49,10 @@ public String process() throws Exception {

// get entity class type
Class<?> entityClazz = Class.forName(entityType);


if (em == null) {
em = indexingContext.getEntityManager();
}
// unwrap session from entity manager
session = em.unwrap(Session.class);

Expand Down
Expand Up @@ -6,6 +6,7 @@

import javax.inject.Named;
import javax.inject.Singleton;
import javax.persistence.EntityManager;

import org.hibernate.search.store.IndexShardingStrategy;
import org.jboss.logging.Logger;
Expand All @@ -30,6 +31,8 @@ public class IndexingContext {
private Class<?>[] rootEntities;
private IndexShardingStrategy indexShardingStrategy;
private long entityCount = 0;
private EntityManager entityManager;

private static final Logger logger = Logger.getLogger(IndexingContext.class);

public void add(Serializable[] clazzIDs, Class<?> clazz) {
Expand Down Expand Up @@ -88,4 +91,12 @@ public Class<?>[] getRootEntities() {
public void setRootEntities(Class<?>[] rootEntities) {
this.rootEntities = rootEntities;
}

public void setEntityManager(EntityManager entityManager) {
this.entityManager = entityManager;
}

public EntityManager getEntityManager() {
return entityManager;
}
}
Expand Up @@ -137,8 +137,27 @@ public void testJob() throws InterruptedException {
logger.info("Mass indexing finished");

//
// Test again after index, target entities
// should be found this time
// Target entities should be found after index
// ---
// TODO: but it doesn't work. We need to launch the integration test
// again to make it work. issue #78
//
// TODO:
// Q: Problem may come from the utility class, used in CompanyManager.
// org.hibernate.search.jpa.Search creates 2 instances of full text
// entity manager, once per search (the first one is the search
// before indexing and the second one is the search after indexing)
// A: But my code for method #findCompanyByName(String) is exactly the
// copy of Gunnar's.
//
// TODO:
// Q: Problem may come from EntityManager. The Hibernate Search mass
// indexer uses an existing EntityManger, provided in input param.
// But my implementation uses the CDI through @PersistenContext
// during the mass indexing. This entity manager might be another
// instance. So the indexed information are not shared in the same
// session. issue #73
// A: This should be changed now. But still having the same failure.
//
companies = companyManager.findCompanyByName(keyword);
assertEquals(1, companies.size());
Expand Down Expand Up @@ -216,6 +235,7 @@ private MassIndexer createAndInitJob() {
.partitions(PARTITIONS)
.purgeAtStart(PURGE_AT_START)
.threads(THREADS)
.entityManager(companyManager.getEntityManager())
.rootEntities(getRootEntities());
return massIndexer;
}
Expand Down
Expand Up @@ -16,7 +16,7 @@
@Stateless
public class CompanyManager {

@PersistenceContext(name="jsr352")
@PersistenceContext(name="h2")
private EntityManager em;

@TransactionTimeout(value=5, unit=TimeUnit.MINUTES)
Expand Down Expand Up @@ -44,17 +44,21 @@ public void indexCompany() {
// MassIndexer massIndexer = new MassIndexerImpl().rootEntities(rootEntities);
// long executionId = massIndexer.start();
// logger.infof("job execution id = %d", executionId);
try {
Search.getFullTextEntityManager( em )
.createIndexer()
.batchSizeToLoadObjects( 1 )
.threadsToLoadObjects( 1 )
.transactionTimeout( 10 )
.cacheMode( CacheMode.IGNORE )
.startAndWait();
}
catch (InterruptedException e) {
throw new RuntimeException( e );
}
}
try {
Search.getFullTextEntityManager( em )
.createIndexer()
.batchSizeToLoadObjects( 1 )
.threadsToLoadObjects( 1 )
.transactionTimeout( 10 )
.cacheMode( CacheMode.IGNORE )
.startAndWait();
}
catch (InterruptedException e) {
throw new RuntimeException( e );
}
}

public EntityManager getEntityManager() {
return em;
}
}
@@ -1,18 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.1" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
<persistence-unit name="jsr352" transaction-type="JTA">
<persistence-unit name="h2" transaction-type="JTA">
<provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<!-- <class>org.hibernate.search.jsr352.test.entity.Address</class>
<class>org.hibernate.search.jsr352.test.entity.Stock</class> -->
<jta-data-source>java:jboss/datasources/ExampleDS</jta-data-source>
<class>org.hibernate.search.jsr352.test.entity.Company</class>
<properties>
<property name="hibernate.dialect" value="org.hibernate.dialect.H2Dialect" />
<property name="hibernate.show_sql" value="false" />
<property name="hibernate.format_sql" value="false" />
<property name="hibernate.hbm2ddl.auto" value="create-drop" />
<property name="hibernate.search.default.directory_provider" value="ram" />
<!-- <property name="hibernate.search.default.directory_provider" value="ram" /> -->
<property name="hibernate.search.indexing_strategy" value="manual" />
<property name="hibernate.search.lucene_version" value="LUCENE_CURRENT" />
</properties>
</persistence-unit>
</persistence>

0 comments on commit 75bcf3e

Please sign in to comment.