/
DocumentWebsocketPublisher.java
206 lines (175 loc) · 6.26 KB
/
DocumentWebsocketPublisher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package de.metas.ui.web.window.events;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.adempiere.ad.trx.api.ITrx;
import org.adempiere.ad.trx.api.ITrxListenerManager.TrxEventTiming;
import org.adempiere.ad.trx.api.ITrxManager;
import org.adempiere.ad.trx.api.OnTrxMissingPolicy;
import org.adempiere.exceptions.AdempiereException;
import org.adempiere.util.Services;
import org.adempiere.util.lang.IAutoCloseable;
import org.springframework.stereotype.Component;
import de.metas.ui.web.websocket.WebsocketSender;
import de.metas.ui.web.window.datatypes.DocumentId;
import de.metas.ui.web.window.datatypes.DocumentPath;
import de.metas.ui.web.window.datatypes.WindowId;
import de.metas.ui.web.window.datatypes.json.JSONDocument;
import de.metas.ui.web.window.descriptor.DetailId;
import lombok.NonNull;
/*
* #%L
* metasfresh-webui-api
* %%
* Copyright (C) 2017 metas GmbH
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 2 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-2.0.html>.
* #L%
*/
/**
* Publishes document related events to websocket endpoints.
*
* @author metas-dev <dev@metasfresh.com>
*
*/
@Component
public class DocumentWebsocketPublisher
{
private final ThreadLocal<JSONDocumentChangedWebSocketEventCollector> THREAD_LOCAL_COLLECTOR = new ThreadLocal<>();
private final WebsocketSender websocketSender;
public DocumentWebsocketPublisher(@NonNull final WebsocketSender websocketSender)
{
this.websocketSender = websocketSender;
}
private void forCollector(final Consumer<JSONDocumentChangedWebSocketEventCollector> consumer)
{
//
// Get the collector to use
final JSONDocumentChangedWebSocketEventCollector collector;
final boolean autoflush;
final ITrxManager trxManager = Services.get(ITrxManager.class);
final ITrx trx = trxManager.getThreadInheritedTrx(OnTrxMissingPolicy.ReturnTrxNone);
final JSONDocumentChangedWebSocketEventCollector threadLocalCollector = THREAD_LOCAL_COLLECTOR.get();
if (threadLocalCollector != null)
{
collector = threadLocalCollector;
autoflush = false;
}
else if(trxManager.isActive(trx))
{
collector = trx.getProperty(JSONDocumentChangedWebSocketEventCollector.class.getName(), () -> createCollectorAndBind(trx, websocketSender));
autoflush = false;
}
else
{
collector = JSONDocumentChangedWebSocketEventCollector.newInstance();
autoflush = true;
}
//
// Call the consumer
consumer.accept(collector);
//
// Autoflush if needed
if (autoflush)
{
sendAllAndClear(collector, websocketSender);
}
}
private static JSONDocumentChangedWebSocketEventCollector createCollectorAndBind(final ITrx trx, final WebsocketSender websocketSender)
{
final JSONDocumentChangedWebSocketEventCollector collector = JSONDocumentChangedWebSocketEventCollector.newInstance();
trx.getTrxListenerManager()
.newEventListener(TrxEventTiming.AFTER_COMMIT)
.registerHandlingMethod(transaction -> sendAllAndClear(collector, websocketSender));
return collector;
}
private static void sendAllAndClear(final JSONDocumentChangedWebSocketEventCollector collector, final WebsocketSender websocketSender)
{
final List<JSONDocumentChangedWebSocketEvent> events = collector.getEventsAndClear();
websocketSender.convertAndSend(events);
}
public void staleByDocumentPath(final DocumentPath documentPath)
{
forCollector(collector -> collector.staleByDocumentPath(documentPath));
}
public void staleRootDocument(final WindowId windowId, final DocumentId documentId)
{
forCollector(collector -> collector.staleRootDocument(windowId, documentId));
}
public void staleTab(final WindowId windowId, final DocumentId documentId, final DetailId tabId)
{
forCollector(collector -> collector.staleTab(windowId, documentId, tabId));
}
public void staleTabs(final WindowId windowId, final DocumentId documentId, final Set<DetailId> tabIds)
{
forCollector(collector -> collector.staleTabs(windowId, documentId, tabIds));
}
public void staleIncludedDocument(final WindowId windowId, final DocumentId documentId, final DetailId tabId, final DocumentId rowId)
{
forCollector(collector -> collector.staleIncludedDocument(windowId, documentId, tabId, rowId));
}
public void convertAndPublish(final List<JSONDocument> jsonDocumentEvents)
{
if (jsonDocumentEvents == null || jsonDocumentEvents.isEmpty())
{
return;
}
final JSONDocumentChangedWebSocketEventCollector collectorToMerge = JSONDocumentChangedWebSocketEventCollector.newInstance();
jsonDocumentEvents.forEach(event -> collectFrom(collectorToMerge, event));
if (collectorToMerge.isEmpty())
{
return;
}
forCollector(collector -> collector.mergeFrom(collectorToMerge));
}
private static final void collectFrom(final JSONDocumentChangedWebSocketEventCollector collector, final JSONDocument event)
{
final WindowId windowId = event.getWindowId();
if (windowId == null)
{
return;
}
// Included document => nothing to publish about it
if (event.getTabId() != null)
{
return;
}
final DocumentId documentId = event.getId();
event.getIncludedTabsInfos().forEach(tabInfo -> collector.mergeFrom(windowId, documentId, tabInfo));
}
public IAutoCloseable temporaryCollectOnThisThread()
{
if (THREAD_LOCAL_COLLECTOR.get() != null)
{
throw new AdempiereException("A thread level collector was already set");
}
final JSONDocumentChangedWebSocketEventCollector collector = JSONDocumentChangedWebSocketEventCollector.newInstance();
THREAD_LOCAL_COLLECTOR.set(collector);
return new IAutoCloseable()
{
@Override
public String toString()
{
return "AutoCloseable[" + collector + "]";
}
@Override
public void close()
{
THREAD_LOCAL_COLLECTOR.set(null);
sendAllAndClear(collector, websocketSender);
}
};
}
}