Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compiler warnings #4934

Merged
merged 4 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ ThisBuild / tpolecatDevModeOptions ~= { opts =>
ScalacOptions.privateWarnDeadCode,
ScalacOptions.privateWarnValueDiscard,
ScalacOptions.warnDeadCode,
ScalacOptions.warnNonUnitStatement,
ScalacOptions.warnValueDiscard
)

Expand All @@ -163,7 +164,6 @@ ThisBuild / tpolecatDevModeOptions ~= { opts =>
Scalac.privateBackendParallelism,
Scalac.privateWarnMacrosOption,
Scalac.release8,
Scalac.targetOption,
Scalac.warnConfOption,
Scalac.warnMacrosOption
)
Expand Down Expand Up @@ -710,7 +710,6 @@ lazy val `scio-google-cloud-platform`: Project = project
description := "Scio add-on for Google Cloud Platform",
libraryDependencies ++= Seq(
// compile
"com.chuusai" %% "shapeless" % shapelessVersion,
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.google.api" % "gax" % gaxVersion,
"com.google.api" % "gax-grpc" % gaxVersion,
Expand Down Expand Up @@ -1032,7 +1031,6 @@ lazy val `scio-parquet`: Project = project
).reduce(_ | _),
libraryDependencies ++= Seq(
// compile
"com.chuusai" %% "shapeless" % shapelessVersion,
"com.google.auth" % "google-auth-library-oauth2-http" % googleAuthVersion,
"com.google.cloud.bigdataoss" % "util-hadoop" % s"hadoop2-$bigdataossVersion",
"com.google.protobuf" % "protobuf-java" % protobufVersion,
Expand Down
3 changes: 0 additions & 3 deletions project/ScalacOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ object Scalac {

val release8 = ScalacOptions.release("8")

// JVM
val targetOption = ScalacOptions.other("-target:jvm-1.8")

// Warn
val privateWarnMacrosOption = ScalacOptions.privateWarnOption(
"macros:after",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ object AvroIO {
}

object AvroTyped {
private[scio] def writeTransform[T <: HasAvroAnnotation: TypeTag: Coder]()
private[scio] def writeTransform[T <: HasAvroAnnotation: TypeTag]()
: BAvroIO.TypedWrite[T, Void, GenericRecord] = {
val avroT = AvroType[T]
BAvroIO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ package object types {
* @doc("user age") age: Int)
* }}}
*/
@nowarn("msg=parameter value value in class doc is never used")
@nowarn("msg=parameter value in class doc is never used")
class doc(value: String) extends StaticAnnotation
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class ScioContext private[scio] (

if (_counters.nonEmpty) {
val counters = _counters.toArray
this.parallelize(Seq(0)).withName("Initialize counters").map { _ =>
this.parallelize(Seq(0)).withName("Initialize counters").tap { _ =>
counters.foreach(_.inc(0))
}
}
Expand Down
37 changes: 27 additions & 10 deletions scio-core/src/main/scala/com/spotify/scio/coders/BeamCoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ private[scio] object BeamCoders {
Some(unwrap(options, coder))
.collect { case c: StructuredCoder[_] => c }
.map(_.getComponents.asScala.toList)
.collect { case (c1: BCoder[K]) :: (c2: BCoder[V]) :: Nil =>
val k = Coder.beam(unwrap(options, c1))
val v = Coder.beam(unwrap(options, c2))
k -> v
.collect {
case (c1: BCoder[K @unchecked]) ::
(c2: BCoder[V @unchecked]) ::
Nil =>
val k = Coder.beam(unwrap(options, c1))
val v = Coder.beam(unwrap(options, c2))
k -> v
}
.getOrElse {
throw new IllegalArgumentException(
Expand All @@ -63,17 +66,27 @@ private[scio] object BeamCoders {
}
}

/** Get key coder from an `SCollection[(K, V)]`. */
def getKeyCoder[K, V](coll: SCollection[(K, V)]): Coder[K] = getTupleCoders[K, V](coll)._1

/** Get value coder from an `SCollection[(K, V)]`. */
def getValueCoder[K, V](coll: SCollection[(K, V)]): Coder[V] = getTupleCoders[K, V](coll)._2

def getTuple3Coders[A, B, C](coll: SCollection[(A, B, C)]): (Coder[A], Coder[B], Coder[C]) = {
val options = CoderOptions(coll.context.options)
val coder = coll.internal.getCoder
Some(unwrap(options, coder))
.collect { case c: StructuredCoder[_] => c }
.map(_.getComponents.asScala.toList)
.collect { case (c1: BCoder[A]) :: (c2: BCoder[B]) :: (c3: BCoder[C]) :: Nil =>
val a = Coder.beam(unwrap(options, c1))
val b = Coder.beam(unwrap(options, c2))
val c = Coder.beam(unwrap(options, c3))
(a, b, c)
.collect {
case (c1: BCoder[A @unchecked]) ::
(c2: BCoder[B @unchecked]) ::
(c3: BCoder[C @unchecked]) ::
Nil =>
val a = Coder.beam(unwrap(options, c1))
val b = Coder.beam(unwrap(options, c2))
val c = Coder.beam(unwrap(options, c3))
(a, b, c)
}
.getOrElse {
throw new IllegalArgumentException(
Expand All @@ -91,7 +104,11 @@ private[scio] object BeamCoders {
.collect { case c: StructuredCoder[_] => c }
.map(_.getComponents.asScala.toList)
.collect {
case (c1: BCoder[A]) :: (c2: BCoder[B]) :: (c3: BCoder[C]) :: (c4: BCoder[D]) :: Nil =>
case (c1: BCoder[A @unchecked]) ::
(c2: BCoder[B @unchecked]) ::
(c3: BCoder[C @unchecked]) ::
(c4: BCoder[D @unchecked]) ::
Nil =>
val a = Coder.beam(unwrap(options, c1))
val b = Coder.beam(unwrap(options, c2))
val c = Coder.beam(unwrap(options, c3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.coders.Coder
import com.google.common.{hash => g}
import org.apache.beam.sdk.coders.CustomCoder

class GuavaBloomFilterCoder[T](implicit val funnel: g.Funnel[T])
class GuavaBloomFilterCoder[T](implicit val funnel: g.Funnel[_ >: T])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BLoomFilter[String] can be created from a Funnel[CharSequence] (expected funnel is <? extends T>).
Associated coder should also be able to be constructed with such funnel

extends CustomCoder[g.BloomFilter[T]] {
override def encode(value: BloomFilter[T], outStream: OutputStream): Unit =
value.writeTo(outStream)
Expand All @@ -32,6 +32,6 @@ class GuavaBloomFilterCoder[T](implicit val funnel: g.Funnel[T])
}

trait GuavaCoders {
implicit def guavaBFCoder[T](implicit x: g.Funnel[T]): Coder[g.BloomFilter[T]] =
implicit def guavaBFCoder[T](implicit x: g.Funnel[_ >: T]): Coder[g.BloomFilter[T]] =
Coder.beam(new GuavaBloomFilterCoder[T])
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ trait JavaCoders extends JavaBeanCoders {
Coder.transform(c)(bc => Coder.beam(bcoders.ListCoder.of(bc)))

implicit def jArrayListCoder[T](implicit c: Coder[T]): Coder[java.util.ArrayList[T]] =
Coder.xmap(jlistCoder[T])(new java.util.ArrayList(_), identity)
Coder.xmap(jListCoder[T])(new java.util.ArrayList(_), identity)

implicit def jMapCoder[K, V](implicit ck: Coder[K], cv: Coder[V]): Coder[java.util.Map[K, V]] =
Coder.transform(ck)(bk => Coder.transform(cv)(bv => Coder.beam(bcoders.MapCoder.of(bk, bv))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package com.spotify.scio.estimators

import com.spotify.scio.coders.{BeamCoders, Coder}
import com.spotify.scio.coders.Coder
import com.spotify.scio.util.TupleFunctions._
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.{transforms => beam}
Expand Down Expand Up @@ -60,7 +60,7 @@ case class ApproximateUniqueCounter[T](sampleSize: Int) extends ApproxDistinctCo
.asInstanceOf[SCollection[Long]]

override def estimateDistinctCountPerKey[K](in: SCollection[(K, T)]): SCollection[(K, Long)] = {
implicit val keyCoder: Coder[K] = BeamCoders.getTupleCoders(in)._1
implicit val keyCoder: Coder[K] = in.keyCoder
in.toKV
.applyTransform(beam.ApproximateUnique.perKey[K, T](sampleSize))
.map(klToTuple)
Expand All @@ -83,7 +83,7 @@ case class ApproximateUniqueCounterByError[T](maximumEstimationError: Double = 0
.asInstanceOf[SCollection[Long]]

override def estimateDistinctCountPerKey[K](in: SCollection[(K, T)]): SCollection[(K, Long)] = {
implicit val keyCoder: Coder[K] = BeamCoders.getTupleCoders(in)._1
implicit val keyCoder: Coder[K] = in.keyCoder
in.toKV
.applyTransform(beam.ApproximateUnique.perKey[K, T](maximumEstimationError))
.map(klToTuple)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.spotify.scio.schemas.{ArrayType, MapType, RawRecord, Schema, Type}
import org.apache.beam.sdk.schemas.JavaBeanSchema
import org.apache.beam.sdk.schemas.Schema.{FieldType, LogicalType}

import scala.annotation.nowarn
import scala.reflect.ClassTag

trait JavaInstances {
Expand Down Expand Up @@ -66,6 +67,9 @@ trait JavaInstances {
): Schema[java.util.Map[K, V]] =
MapType(ks, vs, identity, identity)

@nowarn(
"msg=evidence parameter evidence.* of type com.spotify.scio.IsJavaBean\\[.\\] in .* is never used"
)
implicit def javaBeanSchema[T: IsJavaBean: ClassTag]: RawRecord[T] =
RawRecord[T](new JavaBeanSchema())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ sealed private[scio] trait JobInputSource[T] {
val asIterable: Try[Iterable[T]]
}

final private[scio] case class TestStreamInputSource[T: Coder](
final private[scio] case class TestStreamInputSource[T](
stream: TestStream[T]
) extends JobInputSource[T] {
override val asIterable: Try[Iterable[T]] = Failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private[scio] object Functions {
override def partitionFor(elem: T, numPartitions: Int): Int = g(elem)
}

abstract private class ReduceFn[T: Coder] extends CombineFn[T, JList[T], T] {
abstract private class ReduceFn[T] extends CombineFn[T, JList[T], T] {
override def createAccumulator(): JList[T] = new JArrayList[T]()

override def addInput(accumulator: JList[T], input: T): JList[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ abstract private[scio] class RandomValueSampler[K, V, R](val fractions: Map[K, D
protected var seed: Long = -1

// TODO: is it necessary to setSeed for each instance like Spark does?
@nowarn("msg=parameter value c in method startBundle is never used")
@nowarn("msg=parameter c in method startBundle is never used")
@StartBundle
def startBundle(c: DoFn[(K, V), (K, V)]#StartBundleContext): Unit =
rngs = fractions.iterator.map { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import com.spotify.scio.coders.{BeamCoders, Coder}
*/
class PairHashSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {

implicit private[this] val (keyCoder, valueCoder): (Coder[K], Coder[V]) =
(self.keyCoder, self.valueCoder)
// set as private to avoid conflict with PairSCollectionFunctions keyCoder/valueCoder
implicit private lazy val keyCoder: Coder[K] = BeamCoders.getKeyCoder(self)
implicit private lazy val valueCoder: Coder[V] = BeamCoders.getValueCoder(self)

/**
* Perform an inner join by replicating `rhs` to all workers. The right side should be tiny and
Expand All @@ -40,7 +41,7 @@ class PairHashSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {
def hashJoin[W](
rhs: SCollection[(K, W)]
): SCollection[(K, (V, W))] = {
implicit val wCoder = BeamCoders.getTupleCoders(rhs)._2
implicit val wCoder = rhs.valueCoder
hashJoin(rhs.asMultiMapSingletonSideInput)
}

Expand Down Expand Up @@ -87,7 +88,7 @@ class PairHashSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {
def hashLeftOuterJoin[W](
rhs: SCollection[(K, W)]
): SCollection[(K, (V, Option[W]))] = {
implicit val wCoder: Coder[W] = BeamCoders.getTupleCoders(rhs)._2
implicit val wCoder: Coder[W] = BeamCoders.getValueCoder(rhs)
hashLeftOuterJoin(rhs.asMultiMapSingletonSideInput)
}

Expand Down Expand Up @@ -123,7 +124,7 @@ class PairHashSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {
def hashFullOuterJoin[W](
rhs: SCollection[(K, W)]
): SCollection[(K, (Option[V], Option[W]))] = {
implicit val wCoder: Coder[W] = BeamCoders.getTupleCoders(rhs)._2
implicit val wCoder: Coder[W] = BeamCoders.getValueCoder(rhs)
hashFullOuterJoin(rhs.asMultiMapSingletonSideInput)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {

private[this] val context: ScioContext = self.context

implicit val (keyCoder, valueCoder): (Coder[K], Coder[V]) = BeamCoders.getTupleCoders(self)
implicit lazy val keyCoder: Coder[K] = BeamCoders.getKeyCoder(self)
implicit lazy val valueCoder: Coder[V] = BeamCoders.getValueCoder(self)

private[scio] def toKV: SCollection[KV[K, V]] =
self.map(kv => KV.of(kv._1, kv._2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.PrintStream
import java.lang.{Boolean => JBoolean, Double => JDouble, Iterable => JIterable}
import java.util.concurrent.ThreadLocalRandom
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{BeamCoders, Coder, CoderMaterializer}
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.estimators.{
ApproxDistinctCounter,
ApproximateUniqueCounter,
Expand Down Expand Up @@ -999,7 +999,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] {
def hashLookup[V](
that: SCollection[(T, V)]
): SCollection[(T, Iterable[V])] = {
implicit val vCoder = BeamCoders.getTupleCoders(that)._2
implicit val vCoder = that.valueCoder
this.transform { in =>
val side = that.asMultiMapSingletonSideInput
in.withSideInputs(side)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object RunPreReleaseIT {
val runId = args("runId")

try {
val jobs = List(parquet(runId), avro(runId), smb(runId), bigquery(runId))
val jobs = List(parquet(runId), avro(runId), smb(runId), bigquery())
Await.result(Future.sequence(jobs), 1.hour)
} catch {
case t: Throwable =>
Expand Down Expand Up @@ -139,7 +139,7 @@ object RunPreReleaseIT {
}
}

private def bigquery(runId: String): Future[Unit] = {
private def bigquery(): Future[Unit] = {
import com.spotify.scio.examples.extra.{TypedBigQueryTornadoes, TypedStorageBigQueryTornadoes}
log.info("Starting BigQuery tests... ")
val jobs = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ package com.spotify.scio.examples.extra

import com.spotify.scio._
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import com.spotify.scio.examples.common.ExampleData
import com.spotify.scio.examples.extra.MagnolifyAvroExample.wordCountType
import org.apache.avro.generic.GenericRecord

object MagnolifyAvroExample {
// limit import scope to avoid polluting namespace
Expand All @@ -43,11 +46,14 @@ object MagnolifyAvroExample {
// --input=gs://apache-beam-samples/shakespeare/kinglear.txt
// --output=gs://[BUCKET]/[PATH]/wordcount-avro"`
object MagnolifyAvroWriteExample {

implicit val genericCoder: Coder[GenericRecord] =
avroGenericRecordCoder(wordCountType.schema)

def main(cmdlineArgs: Array[String]): Unit = {
import MagnolifyAvroExample._

val (sc, args) = ContextAndArgs(cmdlineArgs)
implicit def genericCoder = avroGenericRecordCoder(wordCountType.schema)
sc.textFile(args.getOrElse("input", ExampleData.KING_LEAR))
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object MetricsExample {
val gauge: Gauge = ScioMetrics.gauge[MetricsExample.type]("gauge")

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val (sc, _) = ContextAndArgs(cmdlineArgs)

// Create and initialize counters from ScioContext
sc.initCounter("ctxcount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement, StateId}
import org.apache.beam.sdk.values.KV

import scala.annotation.nowarn

object StatefulExample {
// States are persisted on a per-key-and-window basis
type DoFnT = DoFn[KV[String, Int], KV[String, (Int, Int)]]

@nowarn("msg=private val count in class StatefulDoFn is never used")
class StatefulDoFn extends DoFnT {
// Declare mutable state
@StateId("count") private val count = StateSpecs.value[JInt]()
Expand All @@ -52,7 +55,7 @@ object StatefulExample {
}

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
val (sc, _) = ContextAndArgs(cmdlineArgs)

val input = for {
k <- Seq("a", "b")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TapsExample {
t2 <- taps.textFile("macbeth.txt")
} yield {
// execution logic when both taps are available
val (sc, args) = ContextAndArgs(cmdlineArgs)
val (sc, _) = ContextAndArgs(cmdlineArgs)
val out = (t1.open(sc) ++ t2.open(sc))
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
Expand Down
Loading