Skip to content

Commit

Permalink
Improved arguments parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
jto authored and clairemcginty committed Aug 15, 2018
1 parent 369994c commit 1f0e3cd
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ val sparkeyVersion = "2.3.0"
val tensorFlowVersion = "1.8.0"
val zoltarVersion = "0.4.0"
val grpcVersion = "1.7.0"
val caseappVersion = "2.0.0-M3"

lazy val mimaSettings = Seq(
mimaPreviousArtifacts :=
Expand Down Expand Up @@ -324,7 +325,8 @@ lazy val scioCore: Project = Project(
"com.google.protobuf" % "protobuf-java" % protobufVersion,
"me.lyh" %% "protobuf-generic" % protobufGenericVersion,
"org.apache.xbean" % "xbean-asm5-shaded" % asmVersion,
"io.grpc" % "grpc-all" % grpcVersion exclude("io.opencensus", "opencensus-api")
"io.grpc" % "grpc-all" % grpcVersion exclude("io.opencensus", "opencensus-api"),
"com.github.alexarchambault" %% "case-app" % caseappVersion
)
).dependsOn(
scioAvro,
Expand Down
47 changes: 47 additions & 0 deletions scio-core/src/main/scala/com/spotify/scio/ScioContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,58 @@ private object RunnerContext {

/** Convenience object for creating [[ScioContext]] and [[Args]]. */
object ContextAndArgs {
// scalastyle:off regex
// scalastyle:off cyclomatic.complexity
/** Create [[ScioContext]] and [[Args]] for command line arguments. */
def apply(args: Array[String]): (ScioContext, Args) = {
val (_opts, _args) = ScioContext.parseArguments[PipelineOptions](args)
(new ScioContext(_opts, Nil), _args)
}

import caseapp._
import caseapp.core.help._
def typed[T: Parser : Help](args: Array[String]): (ScioContext, T) = {
// limit the options passed to case-app
// to options supported in T
val supportedCustomArgs =
Parser[T].args.flatMap { a =>
a.name +: a.extraNames
}.map(_.name) ++ List("help", "usage")

val Reg = "^-{1,2}(.+)$".r
val (customArgs, remainingArgs) =
args.partition { s =>
s match {
case Reg(a) =>
val name = a.takeWhile(_ != '=')
supportedCustomArgs.contains(name)
case x => true
}
}

CaseApp.detailedParseWithHelp[T](customArgs) match {
case Left(message) =>
Console.err.println(message.message)
sys.exit(1)
case Right((_, usage, help, _)) if help =>
Console.out.println(Help[T].help)
sys.exit(0)
case Right((_, usage, help, _)) if usage =>
Console.out.println(Help[T].help)
for {
i <- PipelineOptionsFactory.getRegisteredOptions.asScala
} PipelineOptionsFactory.printHelp(Console.out, i)
sys.exit(0)
case Right((Right(t), usage, help, _)) =>
val (ctx, _) = ContextAndArgs(remainingArgs)
(ctx, t)
case Right((Left(message), usage, help, _)) =>
Console.err.println(message.message)
sys.exit(1)
}
}
// scalastyle:on regex
// scalastyle:on cyclomatic.complexity
}

/** Companion object for [[ScioContext]]. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2016 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Example: Minimal Word Count Example
// Usage:

// `sbt runMain "com.spotify.scio.examples.MinimalWordCount
// --project=[PROJECT] --runner=DataflowRunner --zone=[ZONE]
// --input=gs://apache-beam-samples/shakespeare/kinglear.txt
// --output=gs://[BUCKET]/[PATH]/minimal_wordcount"`
package com.spotify.scio.examples

import com.spotify.scio._
import com.spotify.scio.examples.common.ExampleData
import caseapp._

object MinimalWordCountTypedArguments {

@AppName("Scio Examples")
@AppVersion(BuildInfo.version)
@ProgName("com.spotify.scio.examples.MinimalWordCount")
case class Arguments(
@HelpMessage("Path of the file to read from")
@ExtraName("i")
input: String = ExampleData.KING_LEAR,
@HelpMessage("Path of the file to write to")
@ExtraName("o")
output: String)

def main(cmdlineArgs: Array[String]): Unit = {
// Parse command line arguments, create `ScioContext` and `Args`.
// `ScioContext` is the entry point to a Scio pipeline. User arguments, e.g.
// `--input=gs://[BUCKET]/[PATH]/input.txt`, are accessed via `Args`.
val (sc, args) = ContextAndArgs.typed[Arguments](cmdlineArgs)

// Open text file as `SCollection[String]`. The input can be either a single file or a
// wildcard matching multiple files.
sc.textFile(args.input)
// Split input lines, filter out empty tokens and expand into a collection of tokens
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
// Count occurrences of each unique `String` to get `(String, Long)`
.countByValue
// Map `(String, Long)` tuples into strings
.map(t => t._1 + ": " + t._2)

// Save result as text files under the output path
.saveAsTextFile(args.output)

// Close the context and execute the pipeline
sc.close()
}
}

0 comments on commit 1f0e3cd

Please sign in to comment.