From 64092b272efcc669cd51d5d041a372da626dce86 Mon Sep 17 00:00:00 2001 From: Igor Karp Date: Tue, 6 May 2014 15:40:39 -0700 Subject: [PATCH] Explicitly declare procedure return type as Unit. CR-4266: lee --- .../sparkle/loader/kafka/AvroKafkaLoader.scala | 8 ++++---- .../nest/sparkle/loader/kafka/KafkaReader.scala | 4 ++-- .../nest/sparkle/loader/kafka/KafkaWriter.scala | 8 ++++---- .../scala/nest/sparkle/util/ConfigureLog4j.scala | 2 +- .../src/main/scala/nest/sparkle/util/Watched.scala | 8 ++++---- scalastyle-config.xml | 2 +- .../sparkle/legacy/DirectoryDataRegistry.scala | 8 ++++---- .../scala/nest/sparkle/loader/FilesLoader.scala | 4 ++-- .../scala/nest/sparkle/loader/StreamLoader.scala | 4 +--- .../scala/nest/sparkle/store/WriteableStore.scala | 2 +- .../sparkle/store/cassandra/CassandraStore.scala | 14 +++++++------- .../sparkle/store/cassandra/ClusterSession.scala | 2 +- .../sparkle/store/cassandra/ColumnCatalog.scala | 2 +- .../sparkle/store/cassandra/DataSetCatalog.scala | 2 +- .../store/cassandra/ObservableResultSet.scala | 2 +- .../nest/sparkle/test/SparkleTestConfig.scala | 2 +- .../nest/sparkle/time/server/ServerLaunch.scala | 8 ++++---- .../main/scala/nest/sparkle/tools/Exporter.scala | 2 +- .../scala/nest/sparkle/util/ConfigureLogback.scala | 4 ++-- .../src/main/scala/nest/sparkle/util/Debug.scala | 4 ++-- .../scala/nest/sparkle/util/FileSystemWatch.scala | 10 +++++----- .../src/main/scala/nest/sparkle/util/Managed.scala | 2 +- .../scala/nest/sparkle/util/ObservableFuture.scala | 4 ++-- .../nest/sparkle/util/ObservableIterator.scala | 6 +++--- 24 files changed, 56 insertions(+), 58 deletions(-) diff --git a/kafka/src/main/scala/nest/sparkle/loader/kafka/AvroKafkaLoader.scala b/kafka/src/main/scala/nest/sparkle/loader/kafka/AvroKafkaLoader.scala index 45c4425e..1d826537 100644 --- a/kafka/src/main/scala/nest/sparkle/loader/kafka/AvroKafkaLoader.scala +++ b/kafka/src/main/scala/nest/sparkle/loader/kafka/AvroKafkaLoader.scala @@ -51,7 +51,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format: * Note that load() will consume a thread for each kafka topic * (there is no async api in kafka 0.8.1). */ - def load() { + def load(): Unit = { val finder = decoderFinder() readers = @@ -64,7 +64,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format: } /** terminate all readers */ - def close() { + def close(): Unit = { readers.foreach{_.close()} } @@ -75,7 +75,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format: * write the events to storage. */ private def loadKeyValues(reader: KafkaReader[ArrayRecordColumns], - decoder: KafkaKeyValues) { + decoder: KafkaKeyValues): Unit = { val stream = reader.stream() stream.subscribe { record => val id = typeTaggedToString(record.id, decoder.metaData.idType) @@ -112,7 +112,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format: /** We have written one record's worth of data to storage. Per the batch policy, commit our * position in kafka and notify any watchers. */ - private def recordComplete(reader: KafkaReader[_], updates: Seq[ColumnUpdate[K]]) { + private def recordComplete(reader: KafkaReader[_], updates: Seq[ColumnUpdate[K]]): Unit = { // TODO, commit is relatively slow. let's commit/notify only every N items and/or after a timeout reader.commit() // record the progress reading this kafka topic into zookeeper diff --git a/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaReader.scala b/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaReader.scala index 081e3112..3be259cf 100644 --- a/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaReader.scala +++ b/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaReader.scala @@ -88,14 +88,14 @@ class KafkaReader[T: Decoder](topic: String, rootConfig: Config = ConfigFactory. /** Store the current reader position in zookeeper. On restart (e.g. after a crash), * the reader will begin at the stored position for this topic and consumerGroup. */ - def commit() { + def commit(): Unit = { connection.commitOffsets // KAFKA should have () on this side-effecting function } /** Close the connection, allowing another reader in the same consumerGroup to take * over reading from this topic/partition. */ - def close() { + def close(): Unit = { connection.shutdown() } diff --git a/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaWriter.scala b/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaWriter.scala index f63a75b0..62050084 100644 --- a/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaWriter.scala +++ b/kafka/src/main/scala/nest/sparkle/loader/kafka/KafkaWriter.scala @@ -34,24 +34,24 @@ class KafkaWriter[T: Encoder](topic: String, rootConfig: Config) extends Log{ } /** write an Observable stream to kafka. */ - def writeStream(stream: Observable[T]) { + def writeStream(stream: Observable[T]): Unit = { stream.subscribe { datum => writeElement(datum) } } /** write a collection of items to kafka. Note that this blocks the calling thread until it is done. */ - def write(data: Iterable[T]) { + def write(data: Iterable[T]): Unit = { data foreach writeElement } /** close the underlying kafka producer connection */ - def close() { + def close(): Unit = { producer.close // KAFKA should have parens on close } /** write a single item to a kafka topic. */ - private def writeElement(item: T) { + private def writeElement(item: T): Unit = { log.trace(s"writing $item") val encoded = writer.toBytes(item) val message = new KeyedMessage[String, Array[Byte]](topic, encoded) diff --git a/kafka/src/main/scala/nest/sparkle/util/ConfigureLog4j.scala b/kafka/src/main/scala/nest/sparkle/util/ConfigureLog4j.scala index aae76e4e..c000ba29 100644 --- a/kafka/src/main/scala/nest/sparkle/util/ConfigureLog4j.scala +++ b/kafka/src/main/scala/nest/sparkle/util/ConfigureLog4j.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ object ConfigureLog4j { /** configure log4j logging based on a .conf file */ - def configure(config: Config) { + def configure(config: Config): Unit = { val log4jConfig = config.getConfig("log4j") val file = log4jConfig.getString("file") val append = log4jConfig.getBoolean("append") diff --git a/kafka/src/main/scala/nest/sparkle/util/Watched.scala b/kafka/src/main/scala/nest/sparkle/util/Watched.scala index e2d9f7fb..a05e582a 100644 --- a/kafka/src/main/scala/nest/sparkle/util/Watched.scala +++ b/kafka/src/main/scala/nest/sparkle/util/Watched.scala @@ -57,13 +57,13 @@ trait Watched[T] extends Log { /** client filtering request */ - private def processWatch(watch: Watch[T]) { + private def processWatch(watch: Watch[T]): Unit = { watches += watch watch.fullReport.onNext(WatchStarted()) } /** send new event on the data stream to any matching filters */ - private def processEvent(event: T) { + private def processEvent(event: T): Unit = { validSubscriptions().foreach { subscribe => subscribe.filter match { case Some(filter) if !filter(event) => // skip, filter didn't match @@ -75,12 +75,12 @@ trait Watched[T] extends Log { } /** source data stream had an error, notify watchers */ - private def handleError(error: Throwable) { + private def handleError(error: Throwable): Unit = { validSubscriptions().foreach { _.fullReport.onError(error) } } /** source data stream is finished, notify watchers */ - private def complete() { + private def complete(): Unit = { validSubscriptions().foreach { _.fullReport.onCompleted() } } diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 43ed5e62..5359b6de 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -142,5 +142,5 @@ - + diff --git a/sparkle/src/main/scala/nest/sparkle/legacy/DirectoryDataRegistry.scala b/sparkle/src/main/scala/nest/sparkle/legacy/DirectoryDataRegistry.scala index bf000e45..829120d7 100644 --- a/sparkle/src/main/scala/nest/sparkle/legacy/DirectoryDataRegistry.scala +++ b/sparkle/src/main/scala/nest/sparkle/legacy/DirectoryDataRegistry.scala @@ -46,7 +46,7 @@ object DirectoryDataRegistry { /** Internal API for the DirectoryDataRegistryActor proxy. Includes both the public and protected * proxied actor messages. */ protected trait DirectoryDataRegistryApi extends DataRegistry { - protected[legacy] def fileChange(change:WatchPath.Change) + protected[legacy] def fileChange(change:WatchPath.Change): Unit } /** A data registry backed by a filesystem subdirectory */ @@ -61,7 +61,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**") private val loadedSets = LruCache[DataSetOld](maxCapacity = defaultMaxCapacity) private val watcher = WatchPath(path, glob) - def preStart() { + def preStart(): Unit = { val initialFiles = watcher.watch(self.fileChange) val fileNames = initialFiles.await.map(_.toString) files ++= fileNames @@ -86,7 +86,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**") } /** called when the filesystem watcher notices a change */ - protected[legacy] def fileChange(change:WatchPath.Change) { + protected[legacy] def fileChange(change:WatchPath.Change): Unit = { import WatchPath._ def localPath(fullPath:Path):String = { @@ -105,7 +105,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**") } } - def postStop() { + def postStop(): Unit = { TypedActor(system).stop(watcher) } diff --git a/sparkle/src/main/scala/nest/sparkle/loader/FilesLoader.scala b/sparkle/src/main/scala/nest/sparkle/loader/FilesLoader.scala index 2e8fd7d1..fea3593a 100644 --- a/sparkle/src/main/scala/nest/sparkle/loader/FilesLoader.scala +++ b/sparkle/src/main/scala/nest/sparkle/loader/FilesLoader.scala @@ -77,7 +77,7 @@ protected class FilesLoader(loadPath: String, store: WriteableStore, strip: Int) } /** called when a file is changed in the directory we're watching */ - private def fileChange(change: Change, store: WriteableStore) { + private def fileChange(change: Change, store: WriteableStore): Unit = { change match { case Added(path) => loadFile(path, store) @@ -88,7 +88,7 @@ protected class FilesLoader(loadPath: String, store: WriteableStore, strip: Int) } } - private def loadFile(fullPath: Path, store: WriteableStore) { + private def loadFile(fullPath: Path, store: WriteableStore): Unit = { fullPath match { case ParseableFile(format) if Files.isRegularFile(fullPath) => log.info(s"Started loading $fullPath into the sparkle store") diff --git a/sparkle/src/main/scala/nest/sparkle/loader/StreamLoader.scala b/sparkle/src/main/scala/nest/sparkle/loader/StreamLoader.scala index bb1fbf3e..3d92f170 100644 --- a/sparkle/src/main/scala/nest/sparkle/loader/StreamLoader.scala +++ b/sparkle/src/main/scala/nest/sparkle/loader/StreamLoader.scala @@ -18,8 +18,6 @@ import rx.lang.scala.Observable import nest.sparkle.store.Event trait StreamLoader { - protected def storeEvents[T,U](columnPath:String, events:Observable[Event[T,U]]) { - ??? - } + protected def storeEvents[T,U](columnPath:String, events:Observable[Event[T,U]]): Unit } diff --git a/sparkle/src/main/scala/nest/sparkle/store/WriteableStore.scala b/sparkle/src/main/scala/nest/sparkle/store/WriteableStore.scala index 6d74c60c..89c9d64a 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/WriteableStore.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/WriteableStore.scala @@ -31,5 +31,5 @@ trait WriteableStore { * This erases any existing data in the store. The store is in the same state * as if it was just created. */ - def format() + def format(): Unit } diff --git a/sparkle/src/main/scala/nest/sparkle/store/cassandra/CassandraStore.scala b/sparkle/src/main/scala/nest/sparkle/store/cassandra/CassandraStore.scala index c8aa29d7..96f7f41d 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/cassandra/CassandraStore.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/cassandra/CassandraStore.scala @@ -49,7 +49,7 @@ object CassandraStore extends Log { * @param contactHosts Cassandra host to create session for. * @param keySpace keyspace to drop. */ - def dropKeySpace(contactHosts: Seq[String], keySpace: String) { + def dropKeySpace(contactHosts: Seq[String], keySpace: String): Unit = { val clusterSession = getClusterSession(contactHosts) try { log.info(s"dropping keySpace: $keySpace") @@ -65,7 +65,7 @@ object CassandraStore extends Log { * @param contactHost Cassandra host to create session for. * @param keySpace keyspace to drop. */ - def dropKeySpace(contactHost: String, keySpace: String) { + def dropKeySpace(contactHost: String, keySpace: String): Unit = { dropKeySpace(Seq(contactHost), keySpace) } @@ -130,7 +130,7 @@ trait CassandraStore extends Store with WriteableStore with Log { * * Blocks the calling thread until the session is closed */ - def close() { clusterSession.close() } + def close(): Unit = { clusterSession.close() } /** Return the dataset for the provided dataSet path (fooSet/barSet/mySet). * @@ -167,7 +167,7 @@ trait CassandraStore extends Store with WriteableStore with Log { * * This call is synchronous. */ - def format() { + def format(): Unit = { format(session) } @@ -177,7 +177,7 @@ trait CassandraStore extends Store with WriteableStore with Log { * @param session The session to use. This shadows the instance variable * because the instance variable may not be initialized yet. */ - private def useKeySpace(session: Session) { + private def useKeySpace(session: Session): Unit = { val keySpacesRows = session.executeAsync(s""" SELECT keyspace_name FROM system.schema_keyspaces""").observerableRows @@ -193,7 +193,7 @@ trait CassandraStore extends Store with WriteableStore with Log { } /** create a keyspace (db) in cassandra */ - private def createKeySpace(session: Session, keySpace: String) { + private def createKeySpace(session: Session, keySpace: String): Unit = { session.execute(s""" CREATE KEYSPACE $keySpace with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}""" @@ -206,7 +206,7 @@ trait CassandraStore extends Store with WriteableStore with Log { * The session's keyspace itself must already exist. * Any existing tables are deleted. */ - protected def format(session: Session) { + protected def format(session: Session): Unit = { dropTables(session) SparseColumnWriter.createColumnTables(session).await diff --git a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ClusterSession.scala b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ClusterSession.scala index 881b5162..99142461 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ClusterSession.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ClusterSession.scala @@ -7,7 +7,7 @@ import java.io.Closeable /** Bundle a cassandra cluster connection and session on that cluster together, * so we can close them together. */ case class ClusterSession(cluster:Cluster, session:Session) extends Closeable { - def close() { + def close(): Unit = { cluster.close() session.close() } diff --git a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ColumnCatalog.scala b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ColumnCatalog.scala index d57c6410..d1c7adfa 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ColumnCatalog.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ColumnCatalog.scala @@ -137,7 +137,7 @@ object ColumnCatalog { * * @param session Session to use. */ - def create(session: Session) { + def create(session: Session): Unit = { session.execute(s""" CREATE TABLE IF NOT EXISTS $catalogTable ( columnPath text, diff --git a/sparkle/src/main/scala/nest/sparkle/store/cassandra/DataSetCatalog.scala b/sparkle/src/main/scala/nest/sparkle/store/cassandra/DataSetCatalog.scala index 4a264881..6dc26757 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/cassandra/DataSetCatalog.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/cassandra/DataSetCatalog.scala @@ -159,7 +159,7 @@ object DataSetCatalog { * * @param session Session to use. */ - def create(session:Session) { + def create(session:Session): Unit = { session.execute(s""" CREATE TABLE IF NOT EXISTS $tableName ( parentPath text, diff --git a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ObservableResultSet.scala b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ObservableResultSet.scala index 7f8a938e..87abfdbf 100644 --- a/sparkle/src/main/scala/nest/sparkle/store/cassandra/ObservableResultSet.scala +++ b/sparkle/src/main/scala/nest/sparkle/store/cassandra/ObservableResultSet.scala @@ -49,7 +49,7 @@ object ObservableResultSet { * rowChunk() is called once for each available group ('chunk') of resultSet rows. It * recursively calls itself to process the next fetched set of rows until there are now more rows left. */ - def rowChunk() { + def rowChunk(): Unit = { if (!subscriber.isUnsubscribed) { val iterator = resultSet.iterator().asScala val availableNow = resultSet.getAvailableWithoutFetching() diff --git a/sparkle/src/main/scala/nest/sparkle/test/SparkleTestConfig.scala b/sparkle/src/main/scala/nest/sparkle/test/SparkleTestConfig.scala index 02afcc58..32167765 100644 --- a/sparkle/src/main/scala/nest/sparkle/test/SparkleTestConfig.scala +++ b/sparkle/src/main/scala/nest/sparkle/test/SparkleTestConfig.scala @@ -22,7 +22,7 @@ trait SparkleTestConfig { /** setup logging for sparkle. Triggered automatically when the caller accesses * rootConfig. Idempotent. */ - def initializeLogging(root: Config) { + def initializeLogging(root: Config): Unit = { if (!loggingInitialized) { val sparkleConfig = root.getConfig("sparkle-time-server") ConfigureLogback.configureLogging(sparkleConfig) diff --git a/sparkle/src/main/scala/nest/sparkle/time/server/ServerLaunch.scala b/sparkle/src/main/scala/nest/sparkle/time/server/ServerLaunch.scala index 7e7182c6..ad9f9bd9 100644 --- a/sparkle/src/main/scala/nest/sparkle/time/server/ServerLaunch.scala +++ b/sparkle/src/main/scala/nest/sparkle/time/server/ServerLaunch.scala @@ -63,7 +63,7 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS * Normally, there'll be dashboard at this page. (either the default sparkle dashboard, * or one provided by the user with the --root command line option.) */ - def launchDesktopBrowser() { + def launchDesktopBrowser(): Unit = { val uri = new URI(s"http://localhost:$webPort/") import system.dispatcher RepeatingRequest.get(uri + "health").onComplete { @@ -80,7 +80,7 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS * * This call will block until the server is ready to accept incoming requests. */ - private def startServer(serviceActor: ActorRef, port: Int)(implicit system: ActorSystem) { + private def startServer(serviceActor: ActorRef, port: Int)(implicit system: ActorSystem): Unit = { if (config.getBoolean("auto-start")) { implicit val timeout = Timeout(10.seconds) val started = IO(Http) ? Http.Bind(serviceActor, interface = "0.0.0.0", port = port) @@ -89,14 +89,14 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS } /** Erase and reformat the storage system if requested */ - private def possiblyErase() { + private def possiblyErase(): Unit = { if (config.getBoolean("erase-store")) { writeableStore.format() } } /** launch a FilesLoader for each configured directory */ - private def startFilesLoader() { + private def startFilesLoader(): Unit = { if (config.getBoolean("files-loader.auto-start")) { val strip = config.getInt("files-loader.directory-strip") config.getStringList("files-loader.directories").asScala.foreach { pathString => diff --git a/sparkle/src/main/scala/nest/sparkle/tools/Exporter.scala b/sparkle/src/main/scala/nest/sparkle/tools/Exporter.scala index 11e5650e..87cfb5ee 100644 --- a/sparkle/src/main/scala/nest/sparkle/tools/Exporter.scala +++ b/sparkle/src/main/scala/nest/sparkle/tools/Exporter.scala @@ -87,7 +87,7 @@ case class Exporter(config: Config) processDataSet(dataSet) } - def close() { + def close(): Unit = { store.close() } diff --git a/sparkle/src/main/scala/nest/sparkle/util/ConfigureLogback.scala b/sparkle/src/main/scala/nest/sparkle/util/ConfigureLogback.scala index 61998f61..c9b29835 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/ConfigureLogback.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/ConfigureLogback.scala @@ -28,7 +28,7 @@ object ConfigureLogback extends Log { /** configure logging based on the .conf file */ var configured = false - def configureLogging(sparkleConfig: Config) { + def configureLogging(sparkleConfig: Config): Unit = { if (!configured) { slf4j.LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) match { case rootLogger: Logger => configureLogBack(sparkleConfig, rootLogger) @@ -39,7 +39,7 @@ object ConfigureLogback extends Log { } /** configure file based logger for logback, based on settings in the .conf file */ - private def configureLogBack(config: Config, rootLogger: Logger) { + private def configureLogBack(config: Config, rootLogger: Logger): Unit = { val context = slf4j.LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext] val logConfig = config.getConfig("logback") val file = logConfig.getString("file") diff --git a/sparkle/src/main/scala/nest/sparkle/util/Debug.scala b/sparkle/src/main/scala/nest/sparkle/util/Debug.scala index 1dc042a0..f499b7b4 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/Debug.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/Debug.scala @@ -17,9 +17,9 @@ package nest.sparkle.util object Debug { /** handy for inserting a pause e.g. while debugging memory usage */ - def waitForKey() { + def waitForKey(): Unit = { Console.println("press any key to continue") - System.in.read(); + System.in.read() Console.println("..continuing") } diff --git a/sparkle/src/main/scala/nest/sparkle/util/FileSystemWatch.scala b/sparkle/src/main/scala/nest/sparkle/util/FileSystemWatch.scala index 7d20a625..6df2ffe1 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/FileSystemWatch.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/FileSystemWatch.scala @@ -47,11 +47,11 @@ protected[util] class PathWatcherActor(root: Path, glob: String) extends PathWat } /** called internally when the fileSystem watcher notices a change */ - protected def change(change: WatchPath.Change) { + protected def change(change: WatchPath.Change): Unit = { watchers.foreach { watcher => watcher(change) } } - def postStop() { + def postStop(): Unit = { fsWatcher.foreach { _.cancel() } } } @@ -67,7 +67,7 @@ protected[util] class FileSystemWatch(report: WatchPath.Change => Unit, glob: St paths foreach watchPath /** watch an additional path */ - private def watchPath(path: Path) { + private def watchPath(path: Path): Unit = { val key = path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY) watchKeys += (key -> path) @@ -82,7 +82,7 @@ protected[util] class FileSystemWatch(report: WatchPath.Change => Unit, glob: St @volatile var done = false private val runner = new Runnable { - def run() { + def run(): Unit = { while (!done) { val key = watcher.take() val events = collectEvents(key) @@ -102,7 +102,7 @@ protected[util] class FileSystemWatch(report: WatchPath.Change => Unit, glob: St } /** notify caller and internal state after an observed file system event */ - def processEvents(watchedPath:Path, events: Iterable[WatchEvent[_]]) { + def processEvents(watchedPath:Path, events: Iterable[WatchEvent[_]]): Unit = { events.foreach { event => event.kind match { case OVERFLOW => ??? // probably should rescan the directories diff --git a/sparkle/src/main/scala/nest/sparkle/util/Managed.scala b/sparkle/src/main/scala/nest/sparkle/util/Managed.scala index 011d70d0..6eabe079 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/Managed.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/Managed.scala @@ -33,7 +33,7 @@ object Managed { // LATER which SCALA ARM library to use? } class Resource[T <: Closeable](resource: T) { - def foreach(fn: T => Unit) { + def foreach(fn: T => Unit): Unit = { try { fn(resource) } finally { diff --git a/sparkle/src/main/scala/nest/sparkle/util/ObservableFuture.scala b/sparkle/src/main/scala/nest/sparkle/util/ObservableFuture.scala index cf6d5f5a..66889cc1 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/ObservableFuture.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/ObservableFuture.scala @@ -36,11 +36,11 @@ object ObservableFuture { /** return an Future that will return a single sequence for the observable stream */ def toFutureSeq: Future[Seq[T]] = { val promise = Promise[Seq[T]]() - def onNext(value:Seq[T]) { + def onNext(value:Seq[T]): Unit = { promise.complete(Success(value)) } - def onError(error:Throwable) { + def onError(error:Throwable): Unit = { promise.complete(Failure(error)) } diff --git a/sparkle/src/main/scala/nest/sparkle/util/ObservableIterator.scala b/sparkle/src/main/scala/nest/sparkle/util/ObservableIterator.scala index cfc01c02..8f89041d 100644 --- a/sparkle/src/main/scala/nest/sparkle/util/ObservableIterator.scala +++ b/sparkle/src/main/scala/nest/sparkle/util/ObservableIterator.scala @@ -38,7 +38,7 @@ object ObservableIterator extends Log { var running = false // true if the iteration thread is running /** start the background iterator if unstarted */ - def start() { + def start(): Unit = { synchronized { if (!running) { startBackgroundIterator() @@ -63,9 +63,9 @@ object ObservableIterator extends Log { /** a single execution thread reads from the iterable. iteration continues * until all the current subscribers unsubscribe. */ - def startBackgroundIterator() { + def startBackgroundIterator(): Unit = { val runnable = new Runnable { - def run() { // run in a background thread. + def run(): Unit = { // run in a background thread. iterator.takeWhile { value => currentSubscribers().foreach { _.onNext(value) } stillActiveSubscribers