Permalink
Browse files

Handling IO Exceptions while compressing (producer side) or decompres…

…sing (consumer side) and rethrowing to the client
  • Loading branch information...
1 parent e039b8e commit 24956187434ac827a2840629fe688775b9ffb9b4 @nehanarkhede nehanarkhede committed Jul 22, 2011
Showing with 38 additions and 12 deletions.
  1. +38 −12 core/src/main/scala/kafka/message/CompressionUtils.scala
@@ -44,9 +44,12 @@ object CompressionUtils {
gzipOutput.write(messageByteBuffer.array)
} catch {
case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null) gzipOutput.close();
+ if(outputStream != null) outputStream.close()
+ throw e
} finally {
- gzipOutput.close();
- outputStream.close();
+ if(gzipOutput != null) gzipOutput.close()
+ if(outputStream != null) outputStream.close()
}
val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
@@ -65,9 +68,16 @@ object CompressionUtils {
gzipOutput.write(messageByteBuffer.array)
} catch {
case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
+ throw e
} finally {
- gzipOutput.close();
- outputStream.close();
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
}
val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
@@ -83,12 +93,20 @@ object CompressionUtils {
val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
val intermediateBuffer = new Array[Byte](1024)
- Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
- outputStream.write(intermediateBuffer, 0, dataRead)
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
}
- gzipIn.close
- outputStream.close
val outputBuffer = ByteBuffer.allocate(outputStream.size)
outputBuffer.put(outputStream.toByteArray)
outputBuffer.rewind
@@ -100,12 +118,20 @@ object CompressionUtils {
val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
val intermediateBuffer = new Array[Byte](1024)
- Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
- outputStream.write(intermediateBuffer, 0, dataRead)
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
}
- gzipIn.close
- outputStream.close
val outputBuffer = ByteBuffer.allocate(outputStream.size)
outputBuffer.put(outputStream.toByteArray)
outputBuffer.rewind

0 comments on commit 2495618

Please sign in to comment.