Skip to content

Commit

Permalink
[SPARK-3569][SQL] Add metadata field to StructField
Browse files Browse the repository at this point in the history
Add `metadata: Metadata` to `StructField` to store extra information of columns. `Metadata` is a simple wrapper over `Map[String, Any]` with value types restricted to Boolean, Long, Double, String, Metadata, and arrays of those types. SerDe is via JSON.

Metadata is preserved through simple operations like `SELECT`.

marmbrus liancheng

Author: Xiangrui Meng <meng@databricks.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#2701 from mengxr/structfield-metadata and squashes the following commits:

dedda56 [Xiangrui Meng] merge remote
5ef930a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
c35203f [Xiangrui Meng] Merge pull request #1 from marmbrus/pr/2701
886b85c [Michael Armbrust] Expose Metadata and MetadataBuilder through the public scala and java packages.
589f314 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
1e2abcf [Xiangrui Meng] change default value of metadata to None in python
611d3c2 [Xiangrui Meng] move metadata from Expr to NamedExpr
ddfcfad [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
a438440 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
4266f4d [Xiangrui Meng] add StructField.toString back for backward compatibility
3f49aab [Xiangrui Meng] remove StructField.toString
24a9f80 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into structfield-metadata
473a7c5 [Xiangrui Meng] merge master
c9d7301 [Xiangrui Meng] organize imports
1fcbf13 [Xiangrui Meng] change metadata type in StructField for Scala/Java
60cc131 [Xiangrui Meng] add doc and header
60614c7 [Xiangrui Meng] add metadata
e42c452 [Xiangrui Meng] merge master
93518fb [Xiangrui Meng] support metadata in python
905bb89 [Xiangrui Meng] java conversions
618e349 [Xiangrui Meng] make tests work in scala
61b8e0f [Xiangrui Meng] merge master
7e5a322 [Xiangrui Meng] do not output metadata in StructField.toString
c41a664 [Xiangrui Meng] merge master
d8af0ed [Xiangrui Meng] move tests to SQLQuerySuite
67fdebb [Xiangrui Meng] add test on join
d65072e [Xiangrui Meng] remove Map.empty
367d237 [Xiangrui Meng] add test
c194d5e [Xiangrui Meng] add metadata field to StructField and Attribute
  • Loading branch information
mengxr authored and marmbrus committed Nov 1, 2014
1 parent 59e626c commit 1d4f355
Show file tree
Hide file tree
Showing 20 changed files with 573 additions and 56 deletions.
15 changes: 11 additions & 4 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,15 @@ class StructField(DataType):
"""

def __init__(self, name, dataType, nullable):
def __init__(self, name, dataType, nullable, metadata=None):
"""Creates a StructField
:param name: the name of this field.
:param dataType: the data type of this field.
:param nullable: indicates whether values of this field
can be null.
:param metadata: metadata of this field, which is a map from string
to simple type that can be serialized to JSON
automatically
>>> (StructField("f1", StringType, True)
... == StructField("f1", StringType, True))
Expand All @@ -330,6 +333,7 @@ def __init__(self, name, dataType, nullable):
self.name = name
self.dataType = dataType
self.nullable = nullable
self.metadata = metadata or {}

def __repr__(self):
return "StructField(%s,%s,%s)" % (self.name, self.dataType,
Expand All @@ -338,13 +342,15 @@ def __repr__(self):
def jsonValue(self):
return {"name": self.name,
"type": self.dataType.jsonValue(),
"nullable": self.nullable}
"nullable": self.nullable,
"metadata": self.metadata}

@classmethod
def fromJson(cls, json):
return StructField(json["name"],
_parse_datatype_json_value(json["type"]),
json["nullable"])
json["nullable"],
json["metadata"])


class StructType(DataType):
Expand Down Expand Up @@ -423,7 +429,8 @@ def _parse_datatype_json_string(json_string):
... StructField("simpleArray", simple_arraytype, True),
... StructField("simpleMap", simple_maptype, True),
... StructField("simpleStruct", simple_structtype, True),
... StructField("boolean", BooleanType(), False)])
... StructField("boolean", BooleanType(), False),
... StructField("withMeta", DoubleType(), False, {"name": "age"})])
>>> check_datatype(complex_structtype)
True
>>> # Complex ArrayType.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object ScalaReflection {
/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case Schema(s: StructType, _) =>
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
s.fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
}

/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType, NativeType}
import org.apache.spark.sql.catalyst.util.Metadata

abstract class Expression extends TreeNode[Expression] {
self: Product =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class Generator extends Expression {
override type EvaluatedType = TraversableOnce[Row]

override lazy val dataType =
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable))))
ArrayType(StructType(output.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))))

override def nullable = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util.Metadata

object NamedExpression {
private val curId = new java.util.concurrent.atomic.AtomicLong()
Expand All @@ -43,6 +44,9 @@ abstract class NamedExpression extends Expression {

def toAttribute: Attribute

/** Returns the metadata when an expression is a reference to another expression with metadata. */
def metadata: Metadata = Metadata.empty

protected def typeSuffix =
if (resolved) {
dataType match {
Expand Down Expand Up @@ -88,10 +92,16 @@ case class Alias(child: Expression, name: String)

override def dataType = child.dataType
override def nullable = child.nullable
override def metadata: Metadata = {
child match {
case named: NamedExpression => named.metadata
case _ => Metadata.empty
}
}

override def toAttribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable)(exprId, qualifiers)
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifiers)
} else {
UnresolvedAttribute(name)
}
Expand All @@ -108,15 +118,20 @@ case class Alias(child: Expression, name: String)
* @param name The name of this attribute, should only be used during analysis or for debugging.
* @param dataType The [[DataType]] of this attribute.
* @param nullable True if null is a valid value for this attribute.
* @param metadata The metadata of this attribute.
* @param exprId A globally unique id used to check if different AttributeReferences refer to the
* same attribute.
* @param qualifiers a list of strings that can be used to referred to this attribute in a fully
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
*/
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
override val metadata: Metadata = Metadata.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def equals(other: Any) = other match {
case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
Expand All @@ -128,10 +143,12 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
var h = 17
h = h * 37 + exprId.hashCode()
h = h * 37 + dataType.hashCode()
h = h * 37 + metadata.hashCode()
h
}

override def newInstance() = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
override def newInstance() =
AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -140,7 +157,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers)
}
}

Expand All @@ -159,7 +176,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (newQualifiers.toSet == qualifiers.toSet) {
this
} else {
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
import scala.util.parsing.combinator.RegexParsers

import org.json4s.JsonAST.JValue
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.catalyst.ScalaReflectionLock
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.Metadata
import org.apache.spark.util.Utils


object DataType {
def fromJson(json: String): DataType = parseDataType(parse(json))

Expand Down Expand Up @@ -70,10 +70,11 @@ object DataType {

private def parseStructField(json: JValue): StructField = json match {
case JSortedObject(
("metadata", metadata: JObject),
("name", JString(name)),
("nullable", JBool(nullable)),
("type", dataType: JValue)) =>
StructField(name, parseDataType(dataType), nullable)
StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
}

@deprecated("Use DataType.fromJson instead", "1.2.0")
Expand Down Expand Up @@ -388,24 +389,34 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* @param name The name of this field.
* @param dataType The data type of this field.
* @param nullable Indicates if values of this field can be `null` values.
* @param metadata The metadata of this field. The metadata should be preserved during
* transformation if the content of the column is not modified, e.g, in selection.
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
metadata: Metadata = Metadata.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
DataType.buildFormattedString(dataType, s"$prefix |", builder)
}

// override the default toString to be compatible with legacy parquet files.
override def toString: String = s"StructField($name,$dataType,$nullable)"

private[sql] def jsonValue: JValue = {
("name" -> name) ~
("type" -> dataType.jsonValue) ~
("nullable" -> nullable)
("nullable" -> nullable) ~
("metadata" -> metadata.jsonValue)
}
}

object StructType {
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}

case class StructType(fields: Seq[StructField]) extends DataType {
Expand Down Expand Up @@ -439,7 +450,7 @@ case class StructType(fields: Seq[StructField]) extends DataType {
}

protected[sql] def toAttributes =
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())

def treeString: String = {
val builder = new StringBuilder
Expand Down
Loading

0 comments on commit 1d4f355

Please sign in to comment.