Skip to content

Commit

Permalink
Merge cd22230 into e5fe1e5
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzai committed Jul 15, 2021
2 parents e5fe1e5 + cd22230 commit dd5f7ab
Show file tree
Hide file tree
Showing 7 changed files with 504 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ lazy val joda = module(project) in file("modules/joda")
lazy val magnolia = module(project) in file("modules/magnolia") dependsOn `generic-base`
lazy val `scala-xml` = module(project) in file("modules/scala-xml")
lazy val scalaz = module(project) in file("modules/scalaz")
lazy val spark = module(project) in file("modules/spark")
lazy val squants = module(project) in file("modules/squants")
lazy val sttp = module(project) in file("modules/sttp")
lazy val yaml = module(project) in file("modules/yaml")
Expand Down
233 changes: 233 additions & 0 deletions modules/spark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# Spark module for PureConfig

Adds support for selected [Spark](http://spark.apache.org/) classes to PureConfig.

## Add pureconfig-spark to your project

In addition to [core PureConfig](https://github.com/pureconfig/pureconfig), you'll need:

```scala
libraryDependencies += "com.github.pureconfig" %% "pureconfig-spark" % "0.16.0"
```

Also, `pureconfig-spark` depends on `spark-sql` with `provided` scope.
Spark libraries are generally added on runtime.
This module has been tested on Spark 3 but it should also work for Spark 2.4 since basic datatype APIs should stay the same.
Please note that we are only supporting Scala 2.12 for all Spark versions.

To use the Spark module you need to import:
```scala
import pureconfig.module.spark._
```

## Supported classes

* `org.apache.spark.sql.types.DataType`
* `org.apache.spark.sql.types.StructType`
* `org.apache.spark.sql.types.Metadata`
* `org.apache.spark.sql.types.StructField` (derivable)

## Example

### Custom HOCON schema to/from Spark schema
Setup custom schema case classes and converters between custom schema and Spark schema.
```scala
import org.apache.spark.sql.types._
import pureconfig._
import pureconfig.generic.auto._
import pureconfig.module.spark.sql._
import com.typesafe.config.ConfigRenderOptions

case class MySchema(name: String, fields: List[StructField], someOtherSetting: Option[String])

def mySchemaToSparkSchema(schema: MySchema): StructType =
StructType(schema.fields)

def sparkSchemaToMySchema(name: String, schema: StructType): MySchema =
MySchema(name, schema.fields.toList, None)

val renderOpt = ConfigRenderOptions.defaults.setOriginComments(false)
```

Convert custom schema to Spark and back to custom schema. Resultant string schema should match original source.
```scala
val mySchemaRes = ConfigSource.string(
"""name: Employee,
|fields: [
| { name: name, data-type: string }, #types are case-insensitive and some types have variations/truncations
| { name: age, data-type: integer, nullable = false, metadata = "{\"k\": \"v\"}" }, #also note that `nullable` and `metadata` are optional fields with Spark defaults
| { name: salary, data-type: "decimal(6,2)" },
| { name: address, data-type: "line1 string, line2 string" } #outer `struct` is optional
|]
|""".stripMargin).load[MySchema]
// mySchemaRes: ConfigReader.Result[MySchema] = Right(
// MySchema(
// "Employee",
// List(
// StructField("name", StringType, true, {}),
// StructField("age", IntegerType, false, {"k":"v"}),
// StructField("salary", DecimalType(6, 2), true, {}),
// StructField(
// "address",
// StructType(
// StructField("line1", StringType, true, {}),
// StructField("line2", StringType, true, {})
// ),
// true,
// {}
// )
// ),
// None
// )
// )

val sparkSchemaRes = mySchemaRes.map(mySchemaToSparkSchema)
// sparkSchemaRes: Either[error.ConfigReaderFailures, StructType] = Right(
// StructType(
// StructField("name", StringType, true, {}),
// StructField("age", IntegerType, false, {"k":"v"}),
// StructField("salary", DecimalType(6, 2), true, {}),
// StructField(
// "address",
// StructType(
// StructField("line1", StringType, true, {}),
// StructField("line2", StringType, true, {})
// ),
// true,
// {}
// )
// )
// )

val mySchemaRes2 =
for {
mySchema <- mySchemaRes
sparkSchema <- sparkSchemaRes
} yield sparkSchemaToMySchema(mySchema.name, sparkSchema)
// mySchemaRes2: Either[error.ConfigReaderFailures, MySchema] = Right(
// MySchema(
// "Employee",
// List(
// StructField("name", StringType, true, {}),
// StructField("age", IntegerType, false, {"k":"v"}),
// StructField("salary", DecimalType(6, 2), true, {}),
// StructField(
// "address",
// StructType(
// StructField("line1", StringType, true, {}),
// StructField("line2", StringType, true, {})
// ),
// true,
// {}
// )
// ),
// None
// )
// )

val stringSchemaRes = mySchemaRes2.map(ConfigWriter[MySchema].to(_).render(renderOpt))
// stringSchemaRes: Either[error.ConfigReaderFailures, String] = Right(
// """{
// "fields" : [
// {
// "data-type" : "STRING",
// "metadata" : "{}",
// "name" : "name",
// "nullable" : true
// },
// {
// "data-type" : "INT",
// "metadata" : "{\"k\":\"v\"}",
// "name" : "age",
// "nullable" : false
// },
// {
// "data-type" : "DECIMAL(6,2)",
// "metadata" : "{}",
// "name" : "salary",
// "nullable" : true
// },
// {
// "data-type" : "STRUCT<`line1`: STRING, `line2`: STRING>",
// "metadata" : "{}",
// "name" : "address",
// "nullable" : true
// }
// ],
// "name" : "Employee"
// }
// """
// )
```

Note: `containsNull`/`nullable`/`metadata` optional fields for `ArrayType` or `SructFields` within `StructType` will be lost from encoding as Spark does not encode them in their DDL encoding.
```scala
case class MyConfig(field: StructField, obj: DataType, arr: DataType)
val meta = Metadata.fromJson("{\"k\": \"v\"}")
// meta: Metadata = {"k":"v"}

val myConfigString = ConfigWriter[MyConfig].to(MyConfig(
StructField("a", StringType, nullable = false, metadata = meta), //nullable/metadata will be kept within HOCON structure
StructType(StructField("b", StringType, nullable = false, metadata = meta) :: Nil), //nullable/metadata will be lost from DDL string encoding
ArrayType(StringType, containsNull = false) //containsNull will be lost
)).render(renderOpt)
// myConfigString: String = """{
// "arr" : "ARRAY<STRING>",
// "field" : {
// "data-type" : "STRING",
// "metadata" : "{\"k\":\"v\"}",
// "name" : "a",
// "nullable" : false
// },
// "obj" : "STRUCT<`b`: STRING>"
// }
// """

//lost optional values will be set to their defaults
val myConfigRes = ConfigSource.string(myConfigString).load[MyConfig]
// myConfigRes: ConfigReader.Result[MyConfig] = Right(
// MyConfig(
// StructField("a", StringType, false, {"k":"v"}),
// StructType(StructField("b", StringType, true, {})),
// ArrayType(StringType, true)
// )
// )
```

### Full schema encoded as HOCON String field to/from Spark schema
You can also read Spark schemas directly as `StructType` instead of narrowing `DataType` yourself.
Do note that the outer unlike `DataType`, the list of fields cannot be wrapped by an outer `STRUCT<...>`.
```scala
case class Config(schema: StructType)

val configRes = ConfigSource.string(
"""
|# "struct<a:int,b:string,c:struct<c1:int,c2:double>>" will not work here
|schema = "a int, b string, c struct<c1:int,c2:double>"
|""".stripMargin).load[Config]
// configRes: ConfigReader.Result[Config] = Right(
// Config(
// StructType(
// StructField("a", IntegerType, true, {}),
// StructField("b", StringType, true, {}),
// StructField(
// "c",
// StructType(
// StructField("c1", IntegerType, true, {}),
// StructField("c2", DoubleType, true, {})
// ),
// true,
// {}
// )
// )
// )
// )

val stringSchemaRes2 = configRes.map(ConfigWriter[Config].to(_).render(renderOpt))
// stringSchemaRes2: Either[error.ConfigReaderFailures, String] = Right(
// """{
// "schema" : "`a` INT,`b` STRING,`c` STRUCT<`c1`: INT, `c2`: DOUBLE>"
// }
// """
// )
```
17 changes: 17 additions & 0 deletions modules/spark/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import Dependencies.Version._

name := "pureconfig-spark"

crossScalaVersions := Seq(scala212) //Spark does not support Scala 2.13 yet

libraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "3.1.2" % "provided")
mdocLibraryDependencies ++= Seq("org.apache.spark" %% "spark-sql" % "3.1.2")

osgiSettings

OsgiKeys.exportPackage := Seq("pureconfig.module.spark.*")
OsgiKeys.privatePackage := Seq()
OsgiKeys.importPackage := Seq(
s"""scala.*;version="[${scalaBinaryVersion.value}.0,${scalaBinaryVersion.value}.50)"""",
"*"
)
103 changes: 103 additions & 0 deletions modules/spark/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Spark module for PureConfig

Adds support for selected [Spark](http://spark.apache.org/) classes to PureConfig.

## Add pureconfig-spark to your project

In addition to [core PureConfig](https://github.com/pureconfig/pureconfig), you'll need:

```scala
libraryDependencies += "com.github.pureconfig" %% "pureconfig-spark" % "@VERSION@"
```

Also, `pureconfig-spark` depends on `spark-sql` with `provided` scope.
Spark libraries are generally added on runtime.
This module has been tested on Spark 3 but it should also work for Spark 2.4 since basic datatype APIs should stay the same.
Please note that we are only supporting Scala 2.12 for all Spark versions.

To use the Spark module you need to import:
```scala
import pureconfig.module.spark._
```

## Supported classes

* `org.apache.spark.sql.types.DataType`
* `org.apache.spark.sql.types.StructType`
* `org.apache.spark.sql.types.Metadata`
* `org.apache.spark.sql.types.StructField` (derivable)

## Example

### Custom HOCON schema to/from Spark schema
Setup custom schema case classes and converters between custom schema and Spark schema.
```scala mdoc:silent
import org.apache.spark.sql.types._
import pureconfig._
import pureconfig.generic.auto._
import pureconfig.module.spark.sql._
import com.typesafe.config.ConfigRenderOptions

case class MySchema(name: String, fields: List[StructField], someOtherSetting: Option[String])

def mySchemaToSparkSchema(schema: MySchema): StructType =
StructType(schema.fields)

def sparkSchemaToMySchema(name: String, schema: StructType): MySchema =
MySchema(name, schema.fields.toList, None)

val renderOpt = ConfigRenderOptions.defaults.setOriginComments(false)
```

Convert custom schema to Spark and back to custom schema. Resultant string schema should match original source.
```scala mdoc
val mySchemaRes = ConfigSource.string(
"""name: Employee,
|fields: [
| { name: name, data-type: string }, #types are case-insensitive and some types have variations/truncations
| { name: age, data-type: integer, nullable = false, metadata = "{\"k\": \"v\"}" }, #also note that `nullable` and `metadata` are optional fields with Spark defaults
| { name: salary, data-type: "decimal(6,2)" },
| { name: address, data-type: "line1 string, line2 string" } #outer `struct` is optional
|]
|""".stripMargin).load[MySchema]

val sparkSchemaRes = mySchemaRes.map(mySchemaToSparkSchema)

val mySchemaRes2 =
for {
mySchema <- mySchemaRes
sparkSchema <- sparkSchemaRes
} yield sparkSchemaToMySchema(mySchema.name, sparkSchema)

val stringSchemaRes = mySchemaRes2.map(ConfigWriter[MySchema].to(_).render(renderOpt))
```

Note: `containsNull`/`nullable`/`metadata` optional fields for `ArrayType` or `SructFields` within `StructType` will be lost from encoding as Spark does not encode them in their DDL encoding.
```scala mdoc
case class MyConfig(field: StructField, obj: DataType, arr: DataType)
val meta = Metadata.fromJson("{\"k\": \"v\"}")

val myConfigString = ConfigWriter[MyConfig].to(MyConfig(
StructField("a", StringType, nullable = false, metadata = meta), //nullable/metadata will be kept within HOCON structure
StructType(StructField("b", StringType, nullable = false, metadata = meta) :: Nil), //nullable/metadata will be lost from DDL string encoding
ArrayType(StringType, containsNull = false) //containsNull will be lost
)).render(renderOpt)

//lost optional values will be set to their defaults
val myConfigRes = ConfigSource.string(myConfigString).load[MyConfig]
```

### Full schema encoded as HOCON String field to/from Spark schema
You can also read Spark schemas directly as `StructType` instead of narrowing `DataType` yourself.
Do note that the outer unlike `DataType`, the list of fields cannot be wrapped by an outer `STRUCT<...>`.
```scala mdoc
case class Config(schema: StructType)

val configRes = ConfigSource.string(
"""
|# "struct<a:int,b:string,c:struct<c1:int,c2:double>>" will not work here
|schema = "a int, b string, c struct<c1:int,c2:double>"
|""".stripMargin).load[Config]

val stringSchemaRes2 = configRes.map(ConfigWriter[Config].to(_).render(renderOpt))
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pureconfig.module.spark

import scala.util.Try

import org.apache.spark.sql.types.{DataType, Metadata, StructType}

import pureconfig.ConfigConvert

/** `ConfigConvert` instances for `spark-sql` data structures.
*/
package object sql {
implicit val dataTypeConvert: ConfigConvert[DataType] =
ConfigConvert.viaNonEmptyStringTry[DataType](s => Try(DataType.fromDDL(s)), _.sql)

implicit val structTypeConvert: ConfigConvert[StructType] =
ConfigConvert.viaNonEmptyStringTry[StructType](s => Try(StructType.fromDDL(s)), _.toDDL)

implicit val metadataConvert: ConfigConvert[Metadata] =
ConfigConvert.viaNonEmptyStringTry[Metadata](s => Try(Metadata.fromJson(s)), _.json)
}

0 comments on commit dd5f7ab

Please sign in to comment.