Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh19webui (2) #659

Merged
merged 2 commits into from
Nov 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import org.adempiere.ad.trx.api.ITrx;
import org.adempiere.ad.trx.api.ITrxManager;
import org.adempiere.ad.trx.api.OnTrxMissingPolicy;
import org.adempiere.exceptions.AdempiereException;
import org.adempiere.util.Services;
import org.adempiere.util.lang.IAutoCloseable;
import org.springframework.stereotype.Component;

import de.metas.ui.web.websocket.WebsocketSender;
Expand Down Expand Up @@ -49,6 +51,8 @@
@Component
public class DocumentWebsocketPublisher
{
private final ThreadLocal<JSONDocumentChangedWebSocketEventCollector> THREAD_LOCAL_COLLECTOR = new ThreadLocal<>();

private final WebsocketSender websocketSender;

public DocumentWebsocketPublisher(@NonNull final WebsocketSender websocketSender)
Expand All @@ -65,7 +69,13 @@ private void forCollector(final Consumer<JSONDocumentChangedWebSocketEventCollec

final ITrxManager trxManager = Services.get(ITrxManager.class);
final ITrx trx = trxManager.getThreadInheritedTrx(OnTrxMissingPolicy.ReturnTrxNone);
if (!trxManager.isNull(trx))
final JSONDocumentChangedWebSocketEventCollector threadLocalCollector = THREAD_LOCAL_COLLECTOR.get();
if (threadLocalCollector != null)
{
collector = threadLocalCollector;
autoflush = false;
}
else if (!trxManager.isNull(trx))
{
collector = trx.getProperty(JSONDocumentChangedWebSocketEventCollector.class.getName(), () -> createCollectorAndBind(trx, websocketSender));
autoflush = false;
Expand Down Expand Up @@ -137,8 +147,8 @@ public void convertAndPublish(final List<JSONDocument> jsonDocumentEvents)

final JSONDocumentChangedWebSocketEventCollector collectorToMerge = JSONDocumentChangedWebSocketEventCollector.newInstance();
jsonDocumentEvents.forEach(event -> collectFrom(collectorToMerge, event));
if(collectorToMerge.isEmpty())

if (collectorToMerge.isEmpty())
{
return;
}
Expand All @@ -164,4 +174,30 @@ private static final void collectFrom(final JSONDocumentChangedWebSocketEventCol

event.getIncludedTabsInfos().forEach(tabInfo -> collector.mergeFrom(windowId, documentId, tabInfo));
}

public IAutoCloseable temporaryCollectOnThisThread()
{
if (THREAD_LOCAL_COLLECTOR.get() != null)
{
throw new AdempiereException("A thread level collector was already set");
}

final JSONDocumentChangedWebSocketEventCollector collector = JSONDocumentChangedWebSocketEventCollector.newInstance();
THREAD_LOCAL_COLLECTOR.set(collector);
return new IAutoCloseable()
{
@Override
public String toString()
{
return "AutoCloseable[" + collector + "]";
}

@Override
public void close()
{
THREAD_LOCAL_COLLECTOR.set(null);
sendAllAndClear(collector, websocketSender);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

import javax.annotation.PostConstruct;

import org.adempiere.ad.dao.cache.CacheInvalidateMultiRequest;
import org.adempiere.ad.dao.cache.CacheInvalidateRequest;
import org.adempiere.util.lang.IAutoCloseable;
import org.compiere.util.CacheMgt;
import org.compiere.util.ICacheResetListener;
import org.slf4j.Logger;
Expand All @@ -14,6 +16,7 @@
import org.springframework.stereotype.Component;

import de.metas.logging.LogManager;
import de.metas.ui.web.window.events.DocumentWebsocketPublisher;
import lombok.NonNull;

/*
Expand Down Expand Up @@ -53,13 +56,13 @@ public class DocumentCacheInvalidationDispatcher implements ICacheResetListener
private DocumentCollection documents;

private final Executor async;

public DocumentCacheInvalidationDispatcher()
{
final CustomizableThreadFactory asyncThreadFactory = new CustomizableThreadFactory(DocumentCacheInvalidationDispatcher.class.getSimpleName());
asyncThreadFactory.setDaemon(true);
async = Executors.newSingleThreadExecutor(asyncThreadFactory);

async = Executors.newSingleThreadExecutor(asyncThreadFactory);
}

@PostConstruct
Expand All @@ -69,16 +72,22 @@ private void postConstruct()
}

@Override
public int reset(@NonNull final CacheInvalidateRequest request)
public int reset(@NonNull final CacheInvalidateMultiRequest request)
{
// FIXME: atm if we are reseting async, the events are no longer aggregated because there is no trx.
// async.execute(() -> resetNow(request));
resetNow(request);

async.execute(() -> resetNow(request));
return 1; // not relevant
}

public void resetNow(final CacheInvalidateRequest request)
private void resetNow(final CacheInvalidateMultiRequest request)
{
final DocumentWebsocketPublisher websocketPublisher = documents.getWebsocketPublisher();
try (IAutoCloseable c = websocketPublisher.temporaryCollectOnThisThread())
{
request.getRequests().forEach(this::resetNow);
}
}

private void resetNow(final CacheInvalidateRequest request)
{
logger.debug("Got {}", request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ public DocumentPrint createDocumentPrint(final DocumentPath documentPath)
.reportData(processExecutionResult.getReportData())
.build();
}

public DocumentWebsocketPublisher getWebsocketPublisher()
{
return websocketPublisher;
}

/**
* Invalidates all root documents identified by tableName/recordId and notifies frontend (via websocket).
Expand Down