Skip to content

Commit

Permalink
SPARK-1729. Use foreach instead of map for all Options.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed Jul 14, 2014
1 parent 8136aa6 commit 9fd0da7
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
val processor = new TransactionProcessor(channel, sequenceNumber,
n, transactionTimeout, backOffInterval, this)
transactionExecutorOpt.map(executor => {
transactionExecutorOpt.foreach(executor => {
executor.submit(processor)
})
processorMap.put(sequenceNumber, processor)
// Wait until a batch is available - will be an error if
processor.getEventBatch
// Wait until a batch is available - will be an error if error message is non-empty
val batch = processor.getEventBatch
if (batch.getErrorMsg != null && !batch.getErrorMsg.equals("")) {
processorMap.put(sequenceNumber, processor)
}

batch
}

/**
Expand Down Expand Up @@ -93,7 +97,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
Option(removeAndGetProcessor(sequenceNumber)).map(processor => {
Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
processor.batchProcessed(success)
})
}
Expand All @@ -112,7 +116,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
* Shuts down the executor used to process transactions.
*/
def shutdown() {
transactionExecutorOpt.map(executor => {
transactionExecutorOpt.foreach(executor => {
executor.shutdownNow()
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SparkSink extends AbstractSink with Configurable {
// dependencies which are being excluded in the build. In practice,
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
serverOpt.map(server => {
serverOpt.foreach(server => {
LOG.info("Starting Avro server for sink: " + getName)
server.start()
})
Expand All @@ -93,10 +93,10 @@ class SparkSink extends AbstractSink with Configurable {

override def stop() {
LOG.info("Stopping Spark Sink: " + getName)
handler.map(callbackHandler => {
handler.foreach(callbackHandler => {
callbackHandler.shutdown()
})
serverOpt.map(server => {
serverOpt.foreach(server => {
LOG.info("Stopping Avro Server for sink: " + getName)
server.close()
server.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
eventBatch.setErrorMsg("Something went wrong. Channel was " +
"unable to create a transaction!")
}
txOpt.map(tx => {
txOpt.foreach(tx => {
tx.begin()
val events = new util.ArrayList[SparkSinkEvent](maxBatchSize)
val loop = new Breaks
Expand Down Expand Up @@ -145,7 +145,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
LOG.error("Error while processing transaction.", e)
eventBatch.setErrorMsg(e.getMessage)
try {
txOpt.map(tx => {
txOpt.foreach(tx => {
rollbackAndClose(tx, close = true)
})
} finally {
Expand All @@ -163,7 +163,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
*/
private def processAckOrNack() {
batchAckLatch.await(transactionTimeout, TimeUnit.SECONDS)
txOpt.map(tx => {
txOpt.foreach(tx => {
if (batchSuccess) {
try {
tx.commit()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[streaming] class FlumePollingReceiver(
override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver")
receiverExecutor.shutdownNow()
connections.map(connection => {
connections.foreach(connection => {
connection.transceiver.close()
})
channelFactory.releaseExternalResources()
Expand Down

0 comments on commit 9fd0da7

Please sign in to comment.