Skip to content

Commit

Permalink
Update logical type suplier scalafix
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Jan 30, 2024
1 parent 1825299 commit cbd11fb
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,41 @@ package fix.v0_14_0
import com.spotify.scio.ScioContext
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.avro._
import com.spotify.scio.parquet.avro.LogicalTypeSupplier
import com.spotify.scio.values.SCollection
import org.apache.avro.specific.SpecificRecordBase
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import org.apache.avro.generic.GenericRecord
import org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}

object FixLogicalTypeSuppliers {
implicit val c: Coder[GenericRecord] = ???
val sc = ScioContext()

sc.parquetAvroFile[SpecificRecordBase](
sc.parquetAvroFile[GenericRecord](
"input",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
))

sc.parquetAvroFile[SpecificRecordBase](
sc.parquetAvroFile[GenericRecord](
"input",
null,
null,
ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier]
(AvroReadSupport.AVRO_DATA_SUPPLIER, classOf[LogicalTypeSupplier])
))

sc.parquetAvroFile[SpecificRecordBase](
sc.parquetAvroFile[GenericRecord](
"input",
conf = ParquetConfiguration.of(
AvroReadSupport.AVRO_DATA_SUPPLIER -> classOf[LogicalTypeSupplier],
"foo" -> "bar"
))

sc.parquetAvroFile[SpecificRecordBase](
sc.parquetAvroFile[GenericRecord](
"input",
null,
null,
Expand All @@ -45,7 +49,7 @@ object FixLogicalTypeSuppliers {
"foo" -> "bar"
))

val data: SCollection[SpecificRecordBase] = ???
val data: SCollection[GenericRecord] = ???
data.saveAsParquetAvroFile(
"output",
conf = ParquetConfiguration.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,33 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.parquet.ParquetConfiguration
import com.spotify.scio.parquet.avro._
import com.spotify.scio.values.SCollection
import org.apache.avro.specific.SpecificRecordBase
import org.apache.beam.sdk.extensions.smb.AvroLogicalTypeSupplier
import com.spotify.scio.avro._
import com.spotify.scio.coders.Coder
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.{AvroDataSupplier, AvroReadSupport, AvroWriteSupport}

object FixLogicalTypeSuppliers {
implicit val c: Coder[GenericRecord] = ???
val sc = ScioContext()

sc.parquetAvroFile[SpecificRecordBase]("input")
sc.parquetAvroFile[GenericRecord]("input")

sc.parquetAvroFile[SpecificRecordBase]("input", null, null)
sc.parquetAvroFile[GenericRecord]("input", null, null)

sc.parquetAvroFile[SpecificRecordBase]("input", conf = ParquetConfiguration.of("foo" -> "bar"))
sc.parquetAvroFile[GenericRecord]("input", conf = ParquetConfiguration.of("foo" -> "bar"))

sc.parquetAvroFile[SpecificRecordBase]("input", null, null, ParquetConfiguration.of("foo" -> "bar"))
sc.parquetAvroFile[GenericRecord]("input", null, null, ParquetConfiguration.of("foo" -> "bar"))

val data: SCollection[SpecificRecordBase] = ???
val data: SCollection[GenericRecord] = ???
data.saveAsParquetAvroFile("output")

data.saveAsParquetAvroFile("output", conf = ParquetConfiguration.of("foo" -> "bar"))

val conf = new Configuration()




conf.setClass("someClass", classOf[String], classOf[CharSequence])
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,56 +7,81 @@ object FixLogicalTypeSupplier {
val ParquetConfigurationMatcher: SymbolMatcher = SymbolMatcher.normalized(
"com/spotify/scio/parquet/package/ParquetConfiguration#of"
)

val SetClassMatcher: SymbolMatcher = SymbolMatcher.normalized(
"org/apache/hadoop/conf/Configuration#setClass"
)

private val ParquetAvroPrefix = "com/spotify/scio/parquet/avro/syntax"
private val ParquetAvroReadMatcher = SymbolMatcher.normalized(s"$ParquetAvroPrefix/ScioContextOps#parquetAvroFile")
private val ParquetAvroWriteMatcher = SymbolMatcher.normalized(s"$ParquetAvroPrefix/SCollectionOps#saveAsParquetAvroFile")
val JavaClassMatcher: SymbolMatcher = SymbolMatcher.normalized("java/lang/Class")

private val ParquetAvroPrefix = "com/spotify/scio/parquet/avro"
val LogicalTypeSupplierMatcher: SymbolMatcher = SymbolMatcher.normalized(
s"$ParquetAvroPrefix/LogicalTypeSupplier",
"org/apache/beam/sdk/extensions/smb/AvroLogicalTypeSupplier"
)

private val ParquetAvroMatcher = SymbolMatcher.normalized(
s"$ParquetAvroPrefix/syntax/ScioContextOps#parquetAvroFile",
s"$ParquetAvroPrefix/syntax/SCollectionOps#saveAsParquetAvroFile"
)
}

class FixLogicalTypeSupplier extends SemanticRule("FixLogicalTypeSupplier") {
import FixLogicalTypeSupplier._

private def updateIOArgs(fnArgs: Seq[Term])(implicit doc: SemanticDocument): Seq[Term] = {
def removeTypeSupplier(confArgs: Seq[Term]): Option[Term] = {
val newConfiguration = confArgs.filterNot {
case q"$l -> classOf[LogicalTypeSupplier]" => true
case q"$l -> classOf[AvroLogicalTypeSupplier]" => true
case _ => false
}.toList

if (newConfiguration.isEmpty) {
None
} else {
Some(q"ParquetConfiguration.of(..$newConfiguration)")
}
private def isLogicalTypeSupplier(term: Term)(implicit doc: SemanticDocument): Boolean =
term match {
case q"classOf[$tpe]" => LogicalTypeSupplierMatcher.matches(tpe.symbol)
case _ =>
term.symbol.info
.map(_.signature)
.collect { case MethodSignature(_, _, returnedType) => returnedType }
.collect { case TypeRef(_, sym, tpe :: Nil) if JavaClassMatcher.matches(sym) => tpe }
.collect { case TypeRef(_, sym, _) => sym }
.exists(LogicalTypeSupplierMatcher.matches)
}

private def parquetConfigurationArgs(
confArgs: List[Term]
)(implicit doc: SemanticDocument): List[Term] = confArgs.filterNot {
case q"($_, $rhs)" => isLogicalTypeSupplier(rhs)
case q"$_ -> $rhs" => isLogicalTypeSupplier(rhs)
case _ => false
}

private def updateIOArgs(fnArgs: List[Term])(implicit doc: SemanticDocument): List[Term] = {
fnArgs.flatMap {
case Term.Assign(lhs, q"$fn(..$confArgs)") if ParquetConfigurationMatcher.matches(fn.symbol) =>
removeTypeSupplier(confArgs).map(c => Term.Assign(lhs, c))
case q"$lhs = $fn(..$confArgs)" if ParquetConfigurationMatcher.matches(fn.symbol) =>
val filtered = parquetConfigurationArgs(confArgs)
if (filtered.isEmpty) None else Some(q"$lhs = ParquetConfiguration.of(..$filtered)")
case q"$fn(..$confArgs)" if ParquetConfigurationMatcher.matches(fn.symbol) =>
removeTypeSupplier(confArgs)
case a => Some(a)
val filtered = parquetConfigurationArgs(confArgs)
if (filtered.isEmpty) None else Some(q"ParquetConfiguration.of(..$filtered)")
case a =>
Some(a)
}
}


override def fix(implicit doc: SemanticDocument): Patch = {
doc.tree.collect {
case method@q"$fn(..$args)"
if (ParquetAvroReadMatcher.matches(fn.symbol) || ParquetAvroWriteMatcher.matches(fn.symbol)) && args.nonEmpty =>
val newArgs = updateIOArgs(args).toList
case method @ q"$fn(..$args)" if ParquetAvroMatcher.matches(fn.symbol) =>
val newArgs = updateIOArgs(args)
Patch.replaceTree(method, q"$fn(..$newArgs)".syntax)
case method@q"$lhs.$fn(..$args)"
if SetClassMatcher.matches(fn.symbol) && args.collect {
case q"classOf[LogicalTypeSupplier]" => true
case q"classOf[AvroLogicalTypeSupplier]" => true
}.nonEmpty =>
Patch.removeTokens(method.tokens)
case method @ q"$_.$fn($_, $theClass, $xface)" if SetClassMatcher.matches(fn.symbol) =>
if (isLogicalTypeSupplier(theClass) || isLogicalTypeSupplier(xface)) {
Patch.removeTokens(method.tokens)
} else {
Patch.empty
}
case importer"com.spotify.scio.parquet.avro.{..$importees}" =>
importees.collect {
case i @ importee"LogicalTypeSupplier" => Patch.removeImportee(i)
case _ => Patch.empty
}.asPatch
case importer"org.apache.beam.sdk.extensions.smb.{..$importees}" =>
importees.collect {
case i @ importee"AvroLogicalTypeSupplier" => Patch.removeImportee(i)
case _ => Patch.empty
}.asPatch
}.asPatch
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ final class ConsistenceJoinNames extends SemanticRule("ConsistenceJoinNames") {
val updatedArgs = renameNamedArgs(args)
Patch.replaceTree(t, q"$qual.$updatedFn(..$updatedArgs)".syntax)
case t @ q"$qual.$fn(..$args)" =>
println(fn.symbol)
Patch.empty
}
}.asPatch
Expand Down

0 comments on commit cbd11fb

Please sign in to comment.