Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

BATCH-1881: added forceSync flag to file writers

  • Loading branch information...
commit be7754bed477cc0d635bc661fbf6698e3191c9bc 1 parent 1449eb3
Dave Syer authored July 27, 2012
26  spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/FlatFileItemWriter.java
@@ -77,6 +77,8 @@
77 77
 
78 78
 	private boolean saveState = true;
79 79
 
  80
+	private boolean forceSync = false;
  81
+
80 82
 	private boolean shouldDeleteIfExists = true;
81 83
 
82 84
 	private boolean shouldDeleteIfEmpty = false;
@@ -110,6 +112,19 @@ public void afterPropertiesSet() throws Exception {
110 112
 	}
111 113
 
112 114
 	/**
  115
+	 * Flag to indicate that changes should be force-synced to disk on flush.
  116
+	 * Defaults to false, which means that even with a local disk changes could
  117
+	 * be lost if the OS crashes in between a write and a cache flush. Setting
  118
+	 * to true may result in slower performance for usage patterns involving many
  119
+	 * frequent writes.
  120
+	 * 
  121
+	 * @param forceSync the flag value to set
  122
+	 */
  123
+	public void setForceSync(boolean forceSync) {
  124
+		this.forceSync = forceSync;
  125
+	}
  126
+
  127
+	/**
113 128
 	 * Public setter for the line separator. Defaults to the System property
114 129
 	 * line.separator.
115 130
 	 * @param lineSeparator the line separator to set
@@ -562,7 +577,16 @@ public boolean isInitialized() {
562 577
 		 */
563 578
 		private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
564 579
 			try {
565  
-				Writer writer = Channels.newWriter(fileChannel, encoding);
  580
+				final FileChannel channel = fileChannel;
  581
+				Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) {
  582
+					@Override
  583
+					public void flush() throws IOException {
  584
+						super.flush();
  585
+						if (forceSync) {
  586
+							channel.force(false);
  587
+						}
  588
+					}
  589
+				};
566 590
 				if (transactional) {
567 591
 					return new TransactionAwareBufferedWriter(writer, new Runnable() {
568 592
 						public void run() {
59  spring-batch-infrastructure/src/main/java/org/springframework/batch/item/xml/StaxEventItemWriter.java
@@ -23,6 +23,7 @@
23 23
 import java.io.OutputStreamWriter;
24 24
 import java.io.UnsupportedEncodingException;
25 25
 import java.io.Writer;
  26
+import java.nio.channels.Channels;
26 27
 import java.nio.channels.FileChannel;
27 28
 import java.util.List;
28 29
 import java.util.Map;
@@ -139,6 +140,8 @@
139 140
 
140 141
 	private boolean transactional = true;
141 142
 
  143
+	private boolean forceSync;
  144
+
142 145
 	public StaxEventItemWriter() {
143 146
 		setName(ClassUtils.getShortName(StaxEventItemWriter.class));
144 147
 	}
@@ -187,6 +190,19 @@ public void setTransactional(boolean transactional) {
187 190
 	}
188 191
 
189 192
 	/**
  193
+	 * Flag to indicate that changes should be force-synced to disk on flush.
  194
+	 * Defaults to false, which means that even with a local disk changes could
  195
+	 * be lost if the OS crashes in between a write and a cache flush. Setting
  196
+	 * to true may result in slower performance for usage patterns involving
  197
+	 * many frequent writes.
  198
+	 * 
  199
+	 * @param forceSync the flag value to set
  200
+	 */
  201
+	public void setForceSync(boolean forceSync) {
  202
+		this.forceSync = forceSync;
  203
+	}
  204
+
  205
+	/**
190 206
 	 * Get used encoding.
191 207
 	 * 
192 208
 	 * @return the encoding used
@@ -359,12 +375,14 @@ private void open(long position, boolean restarted) {
359 375
 
360 376
 		File file;
361 377
 		FileOutputStream os = null;
  378
+		FileChannel fileChannel = null;
362 379
 
363 380
 		try {
364 381
 			file = resource.getFile();
365 382
 			FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput);
366 383
 			Assert.state(resource.exists(), "Output resource must exist");
367 384
 			os = new FileOutputStream(file, true);
  385
+			fileChannel = os.getChannel();
368 386
 			channel = os.getChannel();
369 387
 			setPosition(position);
370 388
 		}
@@ -384,24 +402,33 @@ private void open(long position, boolean restarted) {
384 402
 		}
385 403
 		if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) {
386 404
 			// On restart we don't write the root element so we have to disable
387  
-			// structural validation (see: 
  405
+			// structural validation (see:
388 406
 			// http://jira.springframework.org/browse/BATCH-1681).
389 407
 			outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE);
390 408
 		}
391 409
 
392 410
 		try {
  411
+			final FileChannel channel = fileChannel;
  412
+			Writer writer = new BufferedWriter(new OutputStreamWriter(os, encoding)) {
  413
+				@Override
  414
+				public void flush() throws IOException {
  415
+					super.flush();
  416
+					if (forceSync) {
  417
+						channel.force(false);
  418
+					}
  419
+				}
  420
+			};
393 421
 			if (transactional) {
394  
-				bufferedWriter = new TransactionAwareBufferedWriter(new OutputStreamWriter(os, encoding),
395  
-						new Runnable() {
396  
-							public void run() {
397  
-								closeStream();
398  
-							}
399  
-						});
  422
+				bufferedWriter = new TransactionAwareBufferedWriter(writer, new Runnable() {
  423
+					public void run() {
  424
+						closeStream();
  425
+					}
  426
+				});
400 427
 			}
401 428
 			else {
402  
-				bufferedWriter = new BufferedWriter(new OutputStreamWriter(os, encoding));
  429
+				bufferedWriter = writer;
403 430
 			}
404  
-			delegateEventWriter = createXmlEventWriter(outputFactory,bufferedWriter);
  431
+			delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter);
405 432
 			eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
406 433
 			if (!restarted) {
407 434
 				startDocument(delegateEventWriter);
@@ -424,7 +451,7 @@ public void run() {
424 451
 	 * @return an xml writer
425 452
 	 * @throws XMLStreamException
426 453
 	 */
427  
-	protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory,Writer writer)
  454
+	protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
428 455
 			throws XMLStreamException {
429 456
 		return outputFactory.createXMLEventWriter(writer);
430 457
 	}
@@ -434,22 +461,20 @@ protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory,Wri
434 461
 	 * @return a factory for the xml output
435 462
 	 * @throws FactoryConfigurationError
436 463
 	 */
437  
-	protected XMLOutputFactory createXmlOutputFactory()
438  
-			throws FactoryConfigurationError {
  464
+	protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError {
439 465
 		return XMLOutputFactory.newInstance();
440 466
 	}
441  
-	
  467
+
442 468
 	/**
443 469
 	 * Subclasses can override to customize the event factory.
444 470
 	 * @return a factory for the xml events
445 471
 	 * @throws FactoryConfigurationError
446 472
 	 */
447  
-	protected XMLEventFactory createXmlEventFactory()
448  
-			throws FactoryConfigurationError {
  473
+	protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError {
449 474
 		XMLEventFactory factory = XMLEventFactory.newInstance();
450 475
 		return factory;
451 476
 	}
452  
-	
  477
+
453 478
 	/**
454 479
 	 * Subclasses can override to customize the stax result.
455 480
 	 * @return a result for writing to
@@ -613,7 +638,7 @@ public void write(List<? extends T> items) throws XmlMappingException, Exception
613 638
 			Assert.state(marshaller.supports(object.getClass()),
614 639
 					"Marshaller must support the class of the marshalled object");
615 640
 			Result result = createStaxResult();
616  
-			marshaller.marshal(object, result );
  641
+			marshaller.marshal(object, result);
617 642
 		}
618 643
 		try {
619 644
 			eventWriter.flush();
11  spring-batch-infrastructure/src/test/java/org/springframework/batch/item/file/FlatFileItemWriterTests.java
@@ -202,6 +202,17 @@ public void testWriteString() throws Exception {
202 202
 		assertEquals(TEST_STRING, lineFromFile);
203 203
 	}
204 204
 
  205
+	@Test
  206
+	public void testForcedWriteString() throws Exception {
  207
+		writer.setForceSync(true);
  208
+		writer.open(executionContext);
  209
+		writer.write(Collections.singletonList(TEST_STRING));
  210
+		writer.close();
  211
+		String lineFromFile = readLine();
  212
+
  213
+		assertEquals(TEST_STRING, lineFromFile);
  214
+	}
  215
+
205 216
 	/**
206 217
 	 * Regular usage of <code>write(String)</code> method
207 218
 	 * 
10  spring-batch-infrastructure/src/test/java/org/springframework/batch/item/xml/StaxEventItemWriterTests.java
@@ -90,6 +90,16 @@ public void testWriteAndFlush() throws Exception {
90 90
 		assertTrue("Wrong content: " + content, content.contains(TEST_STRING));
91 91
 	}
92 92
 
  93
+	@Test
  94
+	public void testWriteAndForceFlush() throws Exception {
  95
+		writer.setForceSync(true);
  96
+		writer.open(executionContext);
  97
+		writer.write(items);
  98
+		writer.close();
  99
+		String content = getOutputFileContent();
  100
+		assertTrue("Wrong content: " + content, content.contains(TEST_STRING));
  101
+	}
  102
+
93 103
 	/**
94 104
 	 * Restart scenario - content is appended to the output file after restart.
95 105
 	 */

0 notes on commit be7754b

Please sign in to comment.
Something went wrong with that request. Please try again.