-
Notifications
You must be signed in to change notification settings - Fork 200
/
BaseReplicationDownloader.java
380 lines (302 loc) · 13 KB
/
BaseReplicationDownloader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
// This software is released into the Public Domain. See copying.txt for details.
package org.openstreetmap.osmosis.replication.v0_6;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.logging.Logger;
import org.openstreetmap.osmosis.core.OsmosisRuntimeException;
import org.openstreetmap.osmosis.core.OsmosisConstants;
import org.openstreetmap.osmosis.core.task.common.RunnableTask;
import org.openstreetmap.osmosis.core.util.FileBasedLock;
import org.openstreetmap.osmosis.core.util.PropertiesPersister;
import org.openstreetmap.osmosis.replication.common.ReplicationSequenceFormatter;
import org.openstreetmap.osmosis.replication.common.ReplicationState;
import org.openstreetmap.osmosis.replication.common.ServerStateReader;
import org.openstreetmap.osmosis.replication.v0_6.impl.ReplicationDownloaderConfiguration;
import org.openstreetmap.osmosis.xml.common.CompressionMethod;
import org.openstreetmap.osmosis.xml.v0_6.XmlChangeReader;
/**
* This class downloads a set of replication files from a HTTP server and tracks the progress of
* which files have already been processed. The actual processing of changeset files is performed by
* sub-classes. This class forms the basis of a replication mechanism.
*
* @author Brett Henderson
*/
public abstract class BaseReplicationDownloader implements RunnableTask {
private static final Logger LOG = Logger.getLogger(BaseReplicationDownloader.class.getName());
private static final String LOCK_FILE = "download.lock";
private static final String CONFIG_FILE = "configuration.txt";
private static final String LOCAL_STATE_FILE = "state.txt";
private static final String CUSTOM_SERVER_STATE_FILE = "custom.state.txt";
private File workingDirectory;
private ReplicationSequenceFormatter sequenceFormatter;
private ServerStateReader serverStateReader;
private boolean single;
/**
* Creates a new instance.
*
* @param workingDirectory
* The directory containing configuration and tracking files.
* @param single
* Set to true if you want to only replicate a single diff file from the server
*/
public BaseReplicationDownloader(File workingDirectory, boolean single) {
this.workingDirectory = workingDirectory;
sequenceFormatter = new ReplicationSequenceFormatter(9, 3);
serverStateReader = new ServerStateReader();
this.single = single;
}
/**
* Provides sub-classes with access to the working directory.
*
* @return The working directory for the task.
*/
protected File getWorkingDirectory() {
return workingDirectory;
}
/**
* Downloads the file from the server with the specified name and writes it
* to a local temporary file.
*
* @param fileName
* The name of the file to download.
* @param baseUrl
* The url of the directory containing change files.
* @return The temporary file containing the downloaded data.
*/
private File downloadReplicationFile(String fileName, URL baseUrl) {
URL changesetUrl;
try {
changesetUrl = new URL(baseUrl, fileName);
} catch (MalformedURLException e) {
throw new OsmosisRuntimeException("The server file URL could not be created.", e);
}
try {
File outputFile;
// Open an input stream for the changeset file on the server.
URLConnection connection = changesetUrl.openConnection();
connection.setReadTimeout(15 * 60 * 1000); // timeout 15 minutes
connection.setConnectTimeout(15 * 60 * 1000); // timeout 15 minutes
connection.setRequestProperty("User-Agent", "Osmosis/" + OsmosisConstants.VERSION);
try (BufferedInputStream source = new BufferedInputStream(connection.getInputStream(), 65536)) {
// Create a temporary file to write the data to.
outputFile = File.createTempFile("change", null);
// Open a output stream for the destination file.
try (BufferedOutputStream sink = new BufferedOutputStream(new FileOutputStream(outputFile), 65536)) {
// Download the file.
byte[] buffer = new byte[65536];
for (int bytesRead = source.read(buffer); bytesRead > 0; bytesRead = source.read(buffer)) {
sink.write(buffer, 0, bytesRead);
}
}
}
return outputFile;
} catch (IOException e) {
throw new OsmosisRuntimeException("Unable to read the changeset file " + fileName + " from the server.", e);
}
}
private void processReplicationFile(File replicationFile, ReplicationState replicationState) {
try {
XmlChangeReader xmlReader;
// Send the contents of the replication file to the sink but suppress the complete
// and release methods.
xmlReader = new XmlChangeReader(replicationFile, true, CompressionMethod.GZip);
// Delegate to the sub-class to process the xml.
processChangeset(xmlReader, replicationState);
} finally {
if (!replicationFile.delete()) {
LOG.warning("Unable to delete file " + replicationFile.getName());
}
}
}
/**
* Determines the maximum timestamp of data to be downloaded during this invocation. This may be
* overriden by sub-classes, but the sub-classes must call this implemention first and then
* limit the maximum timestamp further if needed. A sub-class may never increase the maximum
* timestamp beyond that calculated by this method.
*
* @param configuration
* The configuration.
* @param serverTimestamp
* The timestamp of the latest data on the server.
* @param localTimestamp
* The timestamp of the most recently downloaded data.
* @return The maximum timestamp for this invocation.
*/
protected Date calculateMaximumTimestamp(ReplicationDownloaderConfiguration configuration, Date serverTimestamp,
Date localTimestamp) {
Date maximumTimestamp;
maximumTimestamp = serverTimestamp;
// Limit the duration according to the maximum defined in the configuration.
if (configuration.getMaxInterval() > 0) {
if ((serverTimestamp.getTime() - localTimestamp.getTime())
> configuration.getMaxInterval()) {
maximumTimestamp = new Date(localTimestamp.getTime() + configuration.getMaxInterval());
}
}
LOG.finer("Maximum timestamp is " + maximumTimestamp);
return maximumTimestamp;
}
private ReplicationState download(ReplicationDownloaderConfiguration configuration, ReplicationState serverState,
ReplicationState initialLocalState) {
URL baseUrl;
ReplicationState localState;
Date maximumDownloadTimestamp;
localState = initialLocalState;
// Determine the location of download files.
baseUrl = configuration.getBaseUrl();
// Determine the maximum timestamp that can be downloaded.
maximumDownloadTimestamp =
calculateMaximumTimestamp(configuration, serverState.getTimestamp(), localState.getTimestamp());
LOG.fine("The maximum timestamp to be downloaded is " + maximumDownloadTimestamp + ".");
// Download all files and send their contents to the sink.
while (localState.getSequenceNumber() < serverState.getSequenceNumber()) {
File replicationFile;
long sequenceNumber;
ReplicationState fileReplicationState;
// Check to see if our local state has already reached the maximum
// allowable timestamp. This will typically occur if a job is run
// again before new data becomes available, or if an implementation
// of this class (eg. ReplicationFileMerger) is waiting for a full
// time period of data to become available before processing.
if (localState.getTimestamp().compareTo(maximumDownloadTimestamp) >= 0) {
break;
}
// Calculate the next sequence number.
sequenceNumber = localState.getSequenceNumber() + 1;
LOG.finer("Processing replication sequence " + sequenceNumber + ".");
// Get the state associated with the next file.
fileReplicationState = serverStateReader.getServerState(baseUrl, sequenceNumber);
// Ensure that the next state is within the allowable timestamp
// range. We must stop if the next data takes us beyond the maximum
// timestamp. This will either occur if a maximum download time
// duration limit has been imposed, or if a time-aligned boundary
// has been reached.
if (fileReplicationState.getTimestamp().compareTo(maximumDownloadTimestamp) > 0) {
// We will always allow at least one replication interval
// through to deal with the case where a single interval exceeds
// the maximum duration. This can happen if the source data has
// a long time gap between two intervals due to system downtime.
if (localState.getSequenceNumber() != initialLocalState.getSequenceNumber()) {
break;
}
}
// Download the next replication file to a temporary file.
replicationFile =
downloadReplicationFile(sequenceFormatter.getFormattedName(sequenceNumber, ".osc.gz"), baseUrl);
// Process the file and send its contents to the sink.
processReplicationFile(replicationFile, fileReplicationState);
// Update the local state to reflect the file state just processed.
localState = fileReplicationState;
// if single is set to true it means that we only want to get a single replication file
// and not up to the current one.
if (single) {
break;
}
}
return localState;
}
private void runImpl() {
try {
ReplicationDownloaderConfiguration configuration;
ReplicationState serverState;
ReplicationState localState;
PropertiesPersister localStatePersistor;
// Instantiate utility objects.
configuration = new ReplicationDownloaderConfiguration(new File(workingDirectory, CONFIG_FILE));
// check for custom server state file
File customServerStateFile = new File(workingDirectory, CUSTOM_SERVER_STATE_FILE);
if (customServerStateFile.exists()) {
serverState = new ReplicationState(new PropertiesPersister(customServerStateFile).loadMap());
LOG.info(String.format("Reading custom server state. [%s]", serverState.toString()));
} else {
// Obtain the server state.
serverState = serverStateReader.getServerState(configuration.getBaseUrl());
LOG.info(String.format("Reading current server state. [%s]", serverState.toString()));
}
// Obtain the server state.
LOG.fine("Reading current server state.");
serverState = serverStateReader.getServerState(configuration.getBaseUrl());
// Build the local state persister which is used for both loading and storing local state.
localStatePersistor = new PropertiesPersister(new File(workingDirectory, LOCAL_STATE_FILE));
// Begin processing.
processInitialize(Collections.<String, Object>emptyMap());
// If local state isn't available we need to copy server state to be the initial local state
// then exit.
if (localStatePersistor.exists()) {
localState = new ReplicationState(localStatePersistor.loadMap());
// Download and process the replication files.
localState = download(configuration, serverState, localState);
} else {
localState = serverState;
processInitializeState(localState);
}
// Commit downstream changes.
processComplete();
// Persist the local state.
localStatePersistor.store(localState.store());
} finally {
processRelease();
}
}
/**
* This is called prior to any processing being performed. It allows any
* setup activities to be performed.
*
* @param metaData
* The meta data associated with this processing request (empty
* in the current implementation).
*/
protected abstract void processInitialize(Map<String, Object> metaData);
/**
* Invoked once during the first execution run to allow initialisation based on the initial
* replication state downloaded from the server.
*
* @param initialState
* The first server state.
*/
protected abstract void processInitializeState(ReplicationState initialState);
/**
* Processes the changeset.
*
* @param xmlReader
* The changeset reader initialised to point to the changeset file.
* @param replicationState
* The replication state associated with the changeset file.
*/
protected abstract void processChangeset(XmlChangeReader xmlReader, ReplicationState replicationState);
/**
* This is implemented by sub-classes and is called when all changesets have been processed.
* This should perform any completion tasks such as committing changes to a database.
*/
protected abstract void processComplete();
/**
* This is implemented by sub-classes and is called and the completion of all processing
* regardless of whether it was successful or not. This should perform any cleanup tasks such as
* closing files or releasing database connections.
*/
protected abstract void processRelease();
/**
* {@inheritDoc}
*/
@Override
public void run() {
FileBasedLock fileLock;
fileLock = new FileBasedLock(new File(workingDirectory, LOCK_FILE));
try {
fileLock.lock();
runImpl();
fileLock.unlock();
} finally {
fileLock.close();
}
}
}