From c07e0ba2c43351aee6e6f67b40f74a27be56cab2 Mon Sep 17 00:00:00 2001 From: Teo Sarca Date: Fri, 3 Nov 2017 11:18:45 +0200 Subject: [PATCH] DocumentCacheInvalidationDispatcher: work async https://github.com/metasfresh/metasfresh-webui-api/issues/19 --- .../events/DocumentWebsocketPublisher.java | 42 +++++++++++++++++-- .../DocumentCacheInvalidationDispatcher.java | 27 ++++++++---- .../web/window/model/DocumentCollection.java | 5 +++ 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/metas/ui/web/window/events/DocumentWebsocketPublisher.java b/src/main/java/de/metas/ui/web/window/events/DocumentWebsocketPublisher.java index bcd14e48b..9522a08f9 100644 --- a/src/main/java/de/metas/ui/web/window/events/DocumentWebsocketPublisher.java +++ b/src/main/java/de/metas/ui/web/window/events/DocumentWebsocketPublisher.java @@ -7,7 +7,9 @@ import org.adempiere.ad.trx.api.ITrx; import org.adempiere.ad.trx.api.ITrxManager; import org.adempiere.ad.trx.api.OnTrxMissingPolicy; +import org.adempiere.exceptions.AdempiereException; import org.adempiere.util.Services; +import org.adempiere.util.lang.IAutoCloseable; import org.springframework.stereotype.Component; import de.metas.ui.web.websocket.WebsocketSender; @@ -49,6 +51,8 @@ @Component public class DocumentWebsocketPublisher { + private final ThreadLocal THREAD_LOCAL_COLLECTOR = new ThreadLocal<>(); + private final WebsocketSender websocketSender; public DocumentWebsocketPublisher(@NonNull final WebsocketSender websocketSender) @@ -65,7 +69,13 @@ private void forCollector(final Consumer createCollectorAndBind(trx, websocketSender)); autoflush = false; @@ -137,8 +147,8 @@ public void convertAndPublish(final List jsonDocumentEvents) final JSONDocumentChangedWebSocketEventCollector collectorToMerge = JSONDocumentChangedWebSocketEventCollector.newInstance(); jsonDocumentEvents.forEach(event -> collectFrom(collectorToMerge, event)); - - if(collectorToMerge.isEmpty()) + + if (collectorToMerge.isEmpty()) { return; } @@ -164,4 +174,30 @@ private static final void collectFrom(final JSONDocumentChangedWebSocketEventCol event.getIncludedTabsInfos().forEach(tabInfo -> collector.mergeFrom(windowId, documentId, tabInfo)); } + + public IAutoCloseable temporaryCollectOnThisThread() + { + if (THREAD_LOCAL_COLLECTOR.get() != null) + { + throw new AdempiereException("A thread level collector was already set"); + } + + final JSONDocumentChangedWebSocketEventCollector collector = JSONDocumentChangedWebSocketEventCollector.newInstance(); + THREAD_LOCAL_COLLECTOR.set(collector); + return new IAutoCloseable() + { + @Override + public String toString() + { + return "AutoCloseable[" + collector + "]"; + } + + @Override + public void close() + { + THREAD_LOCAL_COLLECTOR.set(null); + sendAllAndClear(collector, websocketSender); + } + }; + } } diff --git a/src/main/java/de/metas/ui/web/window/model/DocumentCacheInvalidationDispatcher.java b/src/main/java/de/metas/ui/web/window/model/DocumentCacheInvalidationDispatcher.java index d2cfb6290..0d7c0e70f 100644 --- a/src/main/java/de/metas/ui/web/window/model/DocumentCacheInvalidationDispatcher.java +++ b/src/main/java/de/metas/ui/web/window/model/DocumentCacheInvalidationDispatcher.java @@ -5,7 +5,9 @@ import javax.annotation.PostConstruct; +import org.adempiere.ad.dao.cache.CacheInvalidateMultiRequest; import org.adempiere.ad.dao.cache.CacheInvalidateRequest; +import org.adempiere.util.lang.IAutoCloseable; import org.compiere.util.CacheMgt; import org.compiere.util.ICacheResetListener; import org.slf4j.Logger; @@ -14,6 +16,7 @@ import org.springframework.stereotype.Component; import de.metas.logging.LogManager; +import de.metas.ui.web.window.events.DocumentWebsocketPublisher; import lombok.NonNull; /* @@ -53,13 +56,13 @@ public class DocumentCacheInvalidationDispatcher implements ICacheResetListener private DocumentCollection documents; private final Executor async; - + public DocumentCacheInvalidationDispatcher() { final CustomizableThreadFactory asyncThreadFactory = new CustomizableThreadFactory(DocumentCacheInvalidationDispatcher.class.getSimpleName()); asyncThreadFactory.setDaemon(true); - - async = Executors.newSingleThreadExecutor(asyncThreadFactory); + + async = Executors.newSingleThreadExecutor(asyncThreadFactory); } @PostConstruct @@ -69,16 +72,22 @@ private void postConstruct() } @Override - public int reset(@NonNull final CacheInvalidateRequest request) + public int reset(@NonNull final CacheInvalidateMultiRequest request) { - // FIXME: atm if we are reseting async, the events are no longer aggregated because there is no trx. -// async.execute(() -> resetNow(request)); - resetNow(request); - + async.execute(() -> resetNow(request)); return 1; // not relevant } - public void resetNow(final CacheInvalidateRequest request) + private void resetNow(final CacheInvalidateMultiRequest request) + { + final DocumentWebsocketPublisher websocketPublisher = documents.getWebsocketPublisher(); + try (IAutoCloseable c = websocketPublisher.temporaryCollectOnThisThread()) + { + request.getRequests().forEach(this::resetNow); + } + } + + private void resetNow(final CacheInvalidateRequest request) { logger.debug("Got {}", request); diff --git a/src/main/java/de/metas/ui/web/window/model/DocumentCollection.java b/src/main/java/de/metas/ui/web/window/model/DocumentCollection.java index dab1f1439..54cea8072 100644 --- a/src/main/java/de/metas/ui/web/window/model/DocumentCollection.java +++ b/src/main/java/de/metas/ui/web/window/model/DocumentCollection.java @@ -588,6 +588,11 @@ public DocumentPrint createDocumentPrint(final DocumentPath documentPath) .reportData(processExecutionResult.getReportData()) .build(); } + + public DocumentWebsocketPublisher getWebsocketPublisher() + { + return websocketPublisher; + } /** * Invalidates all root documents identified by tableName/recordId and notifies frontend (via websocket).