Skip to content

Commit

Permalink
Explicitly declare procedure return type as Unit.
Browse files Browse the repository at this point in the history
CR-4266: lee
  • Loading branch information
Igor Karp committed May 6, 2014
1 parent 9c8e7f8 commit 64092b2
Show file tree
Hide file tree
Showing 24 changed files with 56 additions and 58 deletions.
Expand Up @@ -51,7 +51,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format:
* Note that load() will consume a thread for each kafka topic
* (there is no async api in kafka 0.8.1).
*/
def load() {
def load(): Unit = {
val finder = decoderFinder()

readers =
Expand All @@ -64,7 +64,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format:
}

/** terminate all readers */
def close() {
def close(): Unit = {
readers.foreach{_.close()}
}

Expand All @@ -75,7 +75,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format:
* write the events to storage.
*/
private def loadKeyValues(reader: KafkaReader[ArrayRecordColumns],
decoder: KafkaKeyValues) {
decoder: KafkaKeyValues): Unit = {
val stream = reader.stream()
stream.subscribe { record =>
val id = typeTaggedToString(record.id, decoder.metaData.idType)
Expand Down Expand Up @@ -112,7 +112,7 @@ class AvroKafkaLoader[K](rootConfig: Config, storage: WriteableStore) // format:
/** We have written one record's worth of data to storage. Per the batch policy, commit our
* position in kafka and notify any watchers.
*/
private def recordComplete(reader: KafkaReader[_], updates: Seq[ColumnUpdate[K]]) {
private def recordComplete(reader: KafkaReader[_], updates: Seq[ColumnUpdate[K]]): Unit = {
// TODO, commit is relatively slow. let's commit/notify only every N items and/or after a timeout
reader.commit() // record the progress reading this kafka topic into zookeeper

Expand Down
Expand Up @@ -88,14 +88,14 @@ class KafkaReader[T: Decoder](topic: String, rootConfig: Config = ConfigFactory.
/** Store the current reader position in zookeeper. On restart (e.g. after a crash),
* the reader will begin at the stored position for this topic and consumerGroup.
*/
def commit() {
def commit(): Unit = {
connection.commitOffsets // KAFKA should have () on this side-effecting function
}

/** Close the connection, allowing another reader in the same consumerGroup to take
* over reading from this topic/partition.
*/
def close() {
def close(): Unit = {
connection.shutdown()
}

Expand Down
Expand Up @@ -34,24 +34,24 @@ class KafkaWriter[T: Encoder](topic: String, rootConfig: Config) extends Log{
}

/** write an Observable stream to kafka. */
def writeStream(stream: Observable[T]) {
def writeStream(stream: Observable[T]): Unit = {
stream.subscribe { datum =>
writeElement(datum)
}
}

/** write a collection of items to kafka. Note that this blocks the calling thread until it is done. */
def write(data: Iterable[T]) {
def write(data: Iterable[T]): Unit = {
data foreach writeElement
}

/** close the underlying kafka producer connection */
def close() {
def close(): Unit = {
producer.close // KAFKA should have parens on close
}

/** write a single item to a kafka topic. */
private def writeElement(item: T) {
private def writeElement(item: T): Unit = {
log.trace(s"writing $item")
val encoded = writer.toBytes(item)
val message = new KeyedMessage[String, Array[Byte]](topic, encoded)
Expand Down
Expand Up @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
object ConfigureLog4j {

/** configure log4j logging based on a .conf file */
def configure(config: Config) {
def configure(config: Config): Unit = {
val log4jConfig = config.getConfig("log4j")
val file = log4jConfig.getString("file")
val append = log4jConfig.getBoolean("append")
Expand Down
8 changes: 4 additions & 4 deletions kafka/src/main/scala/nest/sparkle/util/Watched.scala
Expand Up @@ -57,13 +57,13 @@ trait Watched[T] extends Log {


/** client filtering request */
private def processWatch(watch: Watch[T]) {
private def processWatch(watch: Watch[T]): Unit = {
watches += watch
watch.fullReport.onNext(WatchStarted())
}

/** send new event on the data stream to any matching filters */
private def processEvent(event: T) {
private def processEvent(event: T): Unit = {
validSubscriptions().foreach { subscribe =>
subscribe.filter match {
case Some(filter) if !filter(event) => // skip, filter didn't match
Expand All @@ -75,12 +75,12 @@ trait Watched[T] extends Log {
}

/** source data stream had an error, notify watchers */
private def handleError(error: Throwable) {
private def handleError(error: Throwable): Unit = {
validSubscriptions().foreach { _.fullReport.onError(error) }
}

/** source data stream is finished, notify watchers */
private def complete() {
private def complete(): Unit = {
validSubscriptions().foreach { _.fullReport.onCompleted() }
}

Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Expand Up @@ -142,5 +142,5 @@
<check level="warning" class="org.scalastyle.scalariform.BlockImportChecker" enabled="false"/>
<check level="warning" class="org.scalastyle.scalariform.ForBraceChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.NotImplementedErrorUsage" enabled="false"/>
<check level="warning" class="org.scalastyle.scalariform.ProcedureDeclarationChecker" enabled="false"/>
<check level="warning" class="org.scalastyle.scalariform.ProcedureDeclarationChecker" enabled="true"/>
</scalastyle>
Expand Up @@ -46,7 +46,7 @@ object DirectoryDataRegistry {
/** Internal API for the DirectoryDataRegistryActor proxy. Includes both the public and protected
* proxied actor messages. */
protected trait DirectoryDataRegistryApi extends DataRegistry {
protected[legacy] def fileChange(change:WatchPath.Change)
protected[legacy] def fileChange(change:WatchPath.Change): Unit
}

/** A data registry backed by a filesystem subdirectory */
Expand All @@ -61,7 +61,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**")
private val loadedSets = LruCache[DataSetOld](maxCapacity = defaultMaxCapacity)
private val watcher = WatchPath(path, glob)

def preStart() {
def preStart(): Unit = {
val initialFiles = watcher.watch(self.fileChange)
val fileNames = initialFiles.await.map(_.toString)
files ++= fileNames
Expand All @@ -86,7 +86,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**")
}

/** called when the filesystem watcher notices a change */
protected[legacy] def fileChange(change:WatchPath.Change) {
protected[legacy] def fileChange(change:WatchPath.Change): Unit = {
import WatchPath._

def localPath(fullPath:Path):String = {
Expand All @@ -105,7 +105,7 @@ class DirectoryDataRegistryActor(path:Path, glob:String = "**")
}
}

def postStop() {
def postStop(): Unit = {
TypedActor(system).stop(watcher)
}

Expand Down
4 changes: 2 additions & 2 deletions sparkle/src/main/scala/nest/sparkle/loader/FilesLoader.scala
Expand Up @@ -77,7 +77,7 @@ protected class FilesLoader(loadPath: String, store: WriteableStore, strip: Int)
}

/** called when a file is changed in the directory we're watching */
private def fileChange(change: Change, store: WriteableStore) {
private def fileChange(change: Change, store: WriteableStore): Unit = {
change match {
case Added(path) =>
loadFile(path, store)
Expand All @@ -88,7 +88,7 @@ protected class FilesLoader(loadPath: String, store: WriteableStore, strip: Int)
}
}

private def loadFile(fullPath: Path, store: WriteableStore) {
private def loadFile(fullPath: Path, store: WriteableStore): Unit = {
fullPath match {
case ParseableFile(format) if Files.isRegularFile(fullPath) =>
log.info(s"Started loading $fullPath into the sparkle store")
Expand Down
Expand Up @@ -18,8 +18,6 @@ import rx.lang.scala.Observable
import nest.sparkle.store.Event

trait StreamLoader {
protected def storeEvents[T,U](columnPath:String, events:Observable[Event[T,U]]) {
???
}
protected def storeEvents[T,U](columnPath:String, events:Observable[Event[T,U]]): Unit
}

Expand Up @@ -31,5 +31,5 @@ trait WriteableStore {
* This erases any existing data in the store. The store is in the same state
* as if it was just created.
*/
def format()
def format(): Unit
}
Expand Up @@ -49,7 +49,7 @@ object CassandraStore extends Log {
* @param contactHosts Cassandra host to create session for.
* @param keySpace keyspace to drop.
*/
def dropKeySpace(contactHosts: Seq[String], keySpace: String) {
def dropKeySpace(contactHosts: Seq[String], keySpace: String): Unit = {
val clusterSession = getClusterSession(contactHosts)
try {
log.info(s"dropping keySpace: $keySpace")
Expand All @@ -65,7 +65,7 @@ object CassandraStore extends Log {
* @param contactHost Cassandra host to create session for.
* @param keySpace keyspace to drop.
*/
def dropKeySpace(contactHost: String, keySpace: String) {
def dropKeySpace(contactHost: String, keySpace: String): Unit = {
dropKeySpace(Seq(contactHost), keySpace)
}

Expand Down Expand Up @@ -130,7 +130,7 @@ trait CassandraStore extends Store with WriteableStore with Log {
*
* Blocks the calling thread until the session is closed
*/
def close() { clusterSession.close() }
def close(): Unit = { clusterSession.close() }

/** Return the dataset for the provided dataSet path (fooSet/barSet/mySet).
*
Expand Down Expand Up @@ -167,7 +167,7 @@ trait CassandraStore extends Store with WriteableStore with Log {
*
* This call is synchronous.
*/
def format() {
def format(): Unit = {
format(session)
}

Expand All @@ -177,7 +177,7 @@ trait CassandraStore extends Store with WriteableStore with Log {
* @param session The session to use. This shadows the instance variable
* because the instance variable may not be initialized yet.
*/
private def useKeySpace(session: Session) {
private def useKeySpace(session: Session): Unit = {
val keySpacesRows = session.executeAsync(s"""
SELECT keyspace_name FROM system.schema_keyspaces""").observerableRows

Expand All @@ -193,7 +193,7 @@ trait CassandraStore extends Store with WriteableStore with Log {
}

/** create a keyspace (db) in cassandra */
private def createKeySpace(session: Session, keySpace: String) {
private def createKeySpace(session: Session, keySpace: String): Unit = {
session.execute(s"""
CREATE KEYSPACE $keySpace
with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"""
Expand All @@ -206,7 +206,7 @@ trait CassandraStore extends Store with WriteableStore with Log {
* The session's keyspace itself must already exist.
* Any existing tables are deleted.
*/
protected def format(session: Session) {
protected def format(session: Session): Unit = {
dropTables(session)

SparseColumnWriter.createColumnTables(session).await
Expand Down
Expand Up @@ -7,7 +7,7 @@ import java.io.Closeable
/** Bundle a cassandra cluster connection and session on that cluster together,
* so we can close them together. */
case class ClusterSession(cluster:Cluster, session:Session) extends Closeable {
def close() {
def close(): Unit = {
cluster.close()
session.close()
}
Expand Down
Expand Up @@ -137,7 +137,7 @@ object ColumnCatalog {
*
* @param session Session to use.
*/
def create(session: Session) {
def create(session: Session): Unit = {
session.execute(s"""
CREATE TABLE IF NOT EXISTS $catalogTable (
columnPath text,
Expand Down
Expand Up @@ -159,7 +159,7 @@ object DataSetCatalog {
*
* @param session Session to use.
*/
def create(session:Session) {
def create(session:Session): Unit = {
session.execute(s"""
CREATE TABLE IF NOT EXISTS $tableName (
parentPath text,
Expand Down
Expand Up @@ -49,7 +49,7 @@ object ObservableResultSet {
* rowChunk() is called once for each available group ('chunk') of resultSet rows. It
* recursively calls itself to process the next fetched set of rows until there are now more rows left.
*/
def rowChunk() {
def rowChunk(): Unit = {
if (!subscriber.isUnsubscribed) {
val iterator = resultSet.iterator().asScala
val availableNow = resultSet.getAvailableWithoutFetching()
Expand Down
Expand Up @@ -22,7 +22,7 @@ trait SparkleTestConfig {
/** setup logging for sparkle. Triggered automatically when the caller accesses
* rootConfig. Idempotent.
*/
def initializeLogging(root: Config) {
def initializeLogging(root: Config): Unit = {
if (!loggingInitialized) {
val sparkleConfig = root.getConfig("sparkle-time-server")
ConfigureLogback.configureLogging(sparkleConfig)
Expand Down
Expand Up @@ -63,7 +63,7 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS
* Normally, there'll be dashboard at this page. (either the default sparkle dashboard,
* or one provided by the user with the --root command line option.)
*/
def launchDesktopBrowser() {
def launchDesktopBrowser(): Unit = {
val uri = new URI(s"http://localhost:$webPort/")
import system.dispatcher
RepeatingRequest.get(uri + "health").onComplete {
Expand All @@ -80,7 +80,7 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS
*
* This call will block until the server is ready to accept incoming requests.
*/
private def startServer(serviceActor: ActorRef, port: Int)(implicit system: ActorSystem) {
private def startServer(serviceActor: ActorRef, port: Int)(implicit system: ActorSystem): Unit = {
if (config.getBoolean("auto-start")) {
implicit val timeout = Timeout(10.seconds)
val started = IO(Http) ? Http.Bind(serviceActor, interface = "0.0.0.0", port = port)
Expand All @@ -89,14 +89,14 @@ protected class ServerLaunch(val rootConfig: Config)(implicit val system: ActorS
}

/** Erase and reformat the storage system if requested */
private def possiblyErase() {
private def possiblyErase(): Unit = {
if (config.getBoolean("erase-store")) {
writeableStore.format()
}
}

/** launch a FilesLoader for each configured directory */
private def startFilesLoader() {
private def startFilesLoader(): Unit = {
if (config.getBoolean("files-loader.auto-start")) {
val strip = config.getInt("files-loader.directory-strip")
config.getStringList("files-loader.directories").asScala.foreach { pathString =>
Expand Down
2 changes: 1 addition & 1 deletion sparkle/src/main/scala/nest/sparkle/tools/Exporter.scala
Expand Up @@ -87,7 +87,7 @@ case class Exporter(config: Config)
processDataSet(dataSet)
}

def close() {
def close(): Unit = {
store.close()
}

Expand Down
Expand Up @@ -28,7 +28,7 @@ object ConfigureLogback extends Log {

/** configure logging based on the .conf file */
var configured = false
def configureLogging(sparkleConfig: Config) {
def configureLogging(sparkleConfig: Config): Unit = {
if (!configured) {
slf4j.LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) match {
case rootLogger: Logger => configureLogBack(sparkleConfig, rootLogger)
Expand All @@ -39,7 +39,7 @@ object ConfigureLogback extends Log {
}

/** configure file based logger for logback, based on settings in the .conf file */
private def configureLogBack(config: Config, rootLogger: Logger) {
private def configureLogBack(config: Config, rootLogger: Logger): Unit = {
val context = slf4j.LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext]
val logConfig = config.getConfig("logback")
val file = logConfig.getString("file")
Expand Down
4 changes: 2 additions & 2 deletions sparkle/src/main/scala/nest/sparkle/util/Debug.scala
Expand Up @@ -17,9 +17,9 @@ package nest.sparkle.util
object Debug {

/** handy for inserting a pause e.g. while debugging memory usage */
def waitForKey() {
def waitForKey(): Unit = {
Console.println("press any key to continue")
System.in.read();
System.in.read()
Console.println("..continuing")
}

Expand Down

0 comments on commit 64092b2

Please sign in to comment.