Skip to content

Commit

Permalink
Upgrade to sbt 1.3.10
Browse files Browse the repository at this point in the history
  • Loading branch information
Duhemm committed Apr 24, 2020
1 parent a5e8257 commit 64120b7
Show file tree
Hide file tree
Showing 95 changed files with 2,236 additions and 2,031 deletions.
778 changes: 444 additions & 334 deletions build.sbt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.18
sbt.version=1.3.10
26 changes: 13 additions & 13 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ resolvers ++= Seq(
"Twitter Maven" at "https://maven.twttr.com"
)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
addSbtPlugin("com.fortysevendeg" % "sbt-microsites" % "0.3.3")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "18.9.0")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.14")
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.1.1")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.3")
addSbtPlugin("com.47deg" % "sbt-microsites" % "1.1.4")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.13")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "20.4.0")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.7.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.6.3")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.2")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.7")
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import collection.JavaConverters._
import cascading.tuple.Fields

package object avro {
def writePackedAvro[T](pipe: TypedPipe[T], path: String)(implicit mf: Manifest[T],
def writePackedAvro[T](pipe: TypedPipe[T], path: String)(implicit
mf: Manifest[T],
st: AvroSchemaType[T],
conv: TupleConverter[T],
set: TupleSetter[T],
Expand All @@ -31,7 +32,8 @@ package object avro {
pipe.write(sink)
}

def writeUnpackedAvro[T <: Product](pipe: TypedPipe[T], path: String, schema: Schema)(implicit mf: Manifest[T],
def writeUnpackedAvro[T <: Product](pipe: TypedPipe[T], path: String, schema: Schema)(implicit
mf: Manifest[T],
conv: TupleConverter[T],
set: TupleSetter[T],
flow: FlowDef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M])
override protected def prepareBinaryWritable(): GenericWritable[M] =
new GenericWritable(conv)

override def sourceConfInit(fp: FlowProcess[JobConf],
override def sourceConfInit(
fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {

Expand All @@ -131,7 +132,8 @@ class LzoGenericScheme[M](@transient conv: BinaryConverter[M], clazz: Class[M])
DelegateCombineFileInputFormat.setDelegateInputFormat(conf, classOf[MultiInputFormat[_]])
}

override def sinkConfInit(fp: FlowProcess[JobConf],
override def sinkConfInit(
fp: FlowProcess[JobConf],
tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]],
conf: JobConf): Unit = {
LzoGenericScheme.setConverter(conv, conf, SinkConfigBinaryConverterProvider.ProviderConfKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.

package com.twitter.scalding.commons.source


import com.twitter.elephantbird.mapreduce.io.BinaryConverter
import com.twitter.scalding._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ object VersionedKeyValSource {

class VersionedKeyValSource[K, V](val path: String, val sourceVersion: Option[Long], val sinkVersion: Option[Long],
val maxFailures: Int, val versionsToKeep: Int)(
implicit @transient codec: Injection[(K, V), (Array[Byte], Array[Byte])]) extends Source
implicit
@transient codec: Injection[(K, V), (Array[Byte], Array[Byte])]) extends Source
with Mappable[(K, V)]
with TypedSink[(K, V)] {

Expand Down Expand Up @@ -253,7 +254,8 @@ class RichPipeEx(pipe: Pipe) extends java.io.Serializable {

// VersionedKeyValSource always merges with the most recent complete
// version
def writeIncremental[K, V](src: VersionedKeyValSource[K, V], fields: Fields, reducers: Int = 1)(implicit monoid: Monoid[V],
def writeIncremental[K, V](src: VersionedKeyValSource[K, V], fields: Fields, reducers: Int = 1)(implicit
monoid: Monoid[V],
flowDef: FlowDef,
mode: Mode) = {
def appendToken(pipe: Pipe, token: Int) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object KMeans {
scale(1.0 / count, vec)
}

private def closest[Id](from: Vector[Double],
private def closest[Id](
from: Vector[Double],
centroids: TraversableOnce[(Id, Vector[Double])]): (Id, Vector[Double]) =
centroids
// compute the distance to each center
Expand All @@ -55,7 +56,8 @@ object KMeans {
* the new clusters
* and the new list of labeled vectors
*/
def kmeansStep(k: Int,
def kmeansStep(
k: Int,
s: Stat,
clusters: ValuePipe[List[LabeledVector]],
points: TypedPipe[LabeledVector]): Execution[(ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector])] = {
Expand Down Expand Up @@ -112,13 +114,15 @@ object KMeans {
* Run the full k-means algorithm by flatMapping the above function into itself
* while the number of vectors that changed is not zero
*/
def kmeans(k: Int,
def kmeans(
k: Int,
clusters: ValuePipe[List[LabeledVector]],
points: TypedPipe[LabeledVector]): Execution[(Int, ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector])] = {

val key = StatKey("changed", "scalding.kmeans")

def go(s: Stat,
def go(
s: Stat,
c: ValuePipe[List[LabeledVector]],
p: TypedPipe[LabeledVector],
step: Int): Execution[(Int, ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector])] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ object Hdfs {
def default: Hdfs = Hdfs(true, new Configuration)
}

case class HadoopTest(@transient conf: Configuration,
case class HadoopTest(
@transient conf: Configuration,
@transient buffers: Source => Option[Buffer[Tuple]])
extends HadoopMode with TestMode {

Expand Down
12 changes: 7 additions & 5 deletions scalding-core/src/main/scala/com/twitter/scalding/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.io.serializer.{ Serialization => HSerialization }
import com.twitter.chill.{ ExternalizerCodec, ExternalizerInjection, Externalizer, KryoInstantiator }
import com.twitter.chill.config.{ ScalaMapConfig, ConfiguredInstantiator }
import com.twitter.bijection.{ Base64String, Injection }
import com.twitter.scalding.filecache.{CachedFile, DistributedCacheFile, HadoopCachedFile}
import com.twitter.scalding.filecache.{ CachedFile, DistributedCacheFile, HadoopCachedFile }

import cascading.pipe.assembly.AggregateBy
import cascading.flow.{ FlowListener, FlowStepListener, FlowProps, FlowStepStrategy }
Expand Down Expand Up @@ -63,7 +63,8 @@ abstract class Config extends Serializable {
* @return new Config with cached files
*/
def addDistributedCacheFiles(cachedFiles: CachedFile*): Config =
cachedFiles.foldLeft(this) { case (config, file) =>
cachedFiles.foldLeft(this) {
case (config, file) =>
file match {
case hadoopFile: HadoopCachedFile =>
Config.addDistributedCacheFile(hadoopFile.sourceUri, config)
Expand Down Expand Up @@ -116,7 +117,6 @@ abstract class Config extends Serializable {
} catch { case err: Throwable => Failure(err) }
}


/*
* Used in joins to determine how much of the "right hand side" of
* the join to keep in memory
Expand Down Expand Up @@ -207,12 +207,14 @@ abstract class Config extends Serializable {
* with a class to serialize to bootstrap the process:
* Left((classOf[serialization.KryoHadoop], myInstance))
*/
def setSerialization(kryo: Either[(Class[_ <: KryoInstantiator], KryoInstantiator), Class[_ <: KryoInstantiator]],
def setSerialization(
kryo: Either[(Class[_ <: KryoInstantiator], KryoInstantiator), Class[_ <: KryoInstantiator]],
userHadoop: Seq[Class[_ <: HSerialization[_]]] = Nil): Config = {

// Hadoop and Cascading should come first
val first: Seq[Class[_ <: HSerialization[_]]] =
Seq(classOf[org.apache.hadoop.io.serializer.WritableSerialization],
Seq(
classOf[org.apache.hadoop.io.serializer.WritableSerialization],
classOf[cascading.tuple.hadoop.TupleSerialization],
classOf[serialization.WrappedSerialization[_]])
// this must come last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object CumulativeSum {
val pipe: TypedPipe[(K, (U, V))]) {
/** Takes a sortable field and a monoid and returns the cumulative sum of that monoid **/
def cumulativeSum(
implicit sg: Semigroup[V],
implicit
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
pipe.group
Expand All @@ -59,7 +60,8 @@ object CumulativeSum {
* partitions for a single key to go through a single scan.
*/
def cumulativeSum[S](partition: U => S)(
implicit ordS: Ordering[S],
implicit
ordS: Ordering[S],
sg: Semigroup[V],
ordU: Ordering[U],
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
Expand Down

0 comments on commit 64120b7

Please sign in to comment.