Skip to content

Commit

Permalink
DocumentCacheInvalidationDispatcher: work async
Browse files Browse the repository at this point in the history
  • Loading branch information
teosarca committed Nov 3, 2017
1 parent f426c3b commit c07e0ba
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import org.adempiere.ad.trx.api.ITrx;
import org.adempiere.ad.trx.api.ITrxManager;
import org.adempiere.ad.trx.api.OnTrxMissingPolicy;
import org.adempiere.exceptions.AdempiereException;
import org.adempiere.util.Services;
import org.adempiere.util.lang.IAutoCloseable;
import org.springframework.stereotype.Component;

import de.metas.ui.web.websocket.WebsocketSender;
Expand Down Expand Up @@ -49,6 +51,8 @@
@Component
public class DocumentWebsocketPublisher
{
private final ThreadLocal<JSONDocumentChangedWebSocketEventCollector> THREAD_LOCAL_COLLECTOR = new ThreadLocal<>();

private final WebsocketSender websocketSender;

public DocumentWebsocketPublisher(@NonNull final WebsocketSender websocketSender)
Expand All @@ -65,7 +69,13 @@ private void forCollector(final Consumer<JSONDocumentChangedWebSocketEventCollec

final ITrxManager trxManager = Services.get(ITrxManager.class);
final ITrx trx = trxManager.getThreadInheritedTrx(OnTrxMissingPolicy.ReturnTrxNone);
if (!trxManager.isNull(trx))
final JSONDocumentChangedWebSocketEventCollector threadLocalCollector = THREAD_LOCAL_COLLECTOR.get();
if (threadLocalCollector != null)
{
collector = threadLocalCollector;
autoflush = false;
}
else if (!trxManager.isNull(trx))
{
collector = trx.getProperty(JSONDocumentChangedWebSocketEventCollector.class.getName(), () -> createCollectorAndBind(trx, websocketSender));
autoflush = false;
Expand Down Expand Up @@ -137,8 +147,8 @@ public void convertAndPublish(final List<JSONDocument> jsonDocumentEvents)

final JSONDocumentChangedWebSocketEventCollector collectorToMerge = JSONDocumentChangedWebSocketEventCollector.newInstance();
jsonDocumentEvents.forEach(event -> collectFrom(collectorToMerge, event));
if(collectorToMerge.isEmpty())

if (collectorToMerge.isEmpty())
{
return;
}
Expand All @@ -164,4 +174,30 @@ private static final void collectFrom(final JSONDocumentChangedWebSocketEventCol

event.getIncludedTabsInfos().forEach(tabInfo -> collector.mergeFrom(windowId, documentId, tabInfo));
}

public IAutoCloseable temporaryCollectOnThisThread()
{
if (THREAD_LOCAL_COLLECTOR.get() != null)
{
throw new AdempiereException("A thread level collector was already set");
}

final JSONDocumentChangedWebSocketEventCollector collector = JSONDocumentChangedWebSocketEventCollector.newInstance();
THREAD_LOCAL_COLLECTOR.set(collector);
return new IAutoCloseable()
{
@Override
public String toString()
{
return "AutoCloseable[" + collector + "]";
}

@Override
public void close()
{
THREAD_LOCAL_COLLECTOR.set(null);
sendAllAndClear(collector, websocketSender);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import javax.annotation.PostConstruct;

import org.adempiere.ad.dao.cache.CacheInvalidateMultiRequest;
import org.adempiere.ad.dao.cache.CacheInvalidateRequest;
import org.adempiere.util.lang.IAutoCloseable;
import org.compiere.util.CacheMgt;
import org.compiere.util.ICacheResetListener;
import org.slf4j.Logger;
Expand All @@ -14,6 +16,7 @@
import org.springframework.stereotype.Component;

import de.metas.logging.LogManager;
import de.metas.ui.web.window.events.DocumentWebsocketPublisher;
import lombok.NonNull;

/*
Expand Down Expand Up @@ -53,13 +56,13 @@ public class DocumentCacheInvalidationDispatcher implements ICacheResetListener
private DocumentCollection documents;

private final Executor async;

public DocumentCacheInvalidationDispatcher()
{
final CustomizableThreadFactory asyncThreadFactory = new CustomizableThreadFactory(DocumentCacheInvalidationDispatcher.class.getSimpleName());
asyncThreadFactory.setDaemon(true);
async = Executors.newSingleThreadExecutor(asyncThreadFactory);

async = Executors.newSingleThreadExecutor(asyncThreadFactory);
}

@PostConstruct
Expand All @@ -69,16 +72,22 @@ private void postConstruct()
}

@Override
public int reset(@NonNull final CacheInvalidateRequest request)
public int reset(@NonNull final CacheInvalidateMultiRequest request)
{
// FIXME: atm if we are reseting async, the events are no longer aggregated because there is no trx.
// async.execute(() -> resetNow(request));
resetNow(request);

async.execute(() -> resetNow(request));
return 1; // not relevant
}

public void resetNow(final CacheInvalidateRequest request)
private void resetNow(final CacheInvalidateMultiRequest request)
{
final DocumentWebsocketPublisher websocketPublisher = documents.getWebsocketPublisher();
try (IAutoCloseable c = websocketPublisher.temporaryCollectOnThisThread())
{
request.getRequests().forEach(this::resetNow);
}
}

private void resetNow(final CacheInvalidateRequest request)
{
logger.debug("Got {}", request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ public DocumentPrint createDocumentPrint(final DocumentPath documentPath)
.reportData(processExecutionResult.getReportData())
.build();
}

public DocumentWebsocketPublisher getWebsocketPublisher()
{
return websocketPublisher;
}

/**
* Invalidates all root documents identified by tableName/recordId and notifies frontend (via websocket).
Expand Down

0 comments on commit c07e0ba

Please sign in to comment.