Skip to content

Commit

Permalink
add metadata field to StructField and Attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Sep 17, 2014
1 parent 5044e49 commit c194d5e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ abstract class NamedExpression extends Expression {
def name: String
def exprId: ExprId
def qualifiers: Seq[String]
def metadata: Map[String, Any] = Map.empty

def toAttribute: Attribute

Expand Down Expand Up @@ -112,9 +113,13 @@ case class Alias(child: Expression, name: String)
* qualified way. Consider the examples tableName.name, subQueryAlias.name.
* tableName and subQueryAlias are possible qualifiers.
*/
case class AttributeReference(name: String, dataType: DataType, nullable: Boolean = true)
(val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
extends Attribute with trees.LeafNode[Expression] {
case class AttributeReference(
name: String,
dataType: DataType,
nullable: Boolean = true,
override val metadata: Map[String, Any] = Map.empty)(
val exprId: ExprId = NamedExpression.newExprId,
val qualifiers: Seq[String] = Nil) extends Attribute with trees.LeafNode[Expression] {

override def references = AttributeSet(this :: Nil)

Expand All @@ -131,7 +136,8 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
h
}

override def newInstance = AttributeReference(name, dataType, nullable)(qualifiers = qualifiers)
override def newInstance =
AttributeReference(name, dataType, nullable, metadata)(qualifiers = qualifiers)

/**
* Returns a copy of this [[AttributeReference]] with changed nullability.
Expand All @@ -140,7 +146,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (nullable == newNullability) {
this
} else {
AttributeReference(name, dataType, newNullability)(exprId, qualifiers)
AttributeReference(name, dataType, newNullability, metadata)(exprId, qualifiers)
}
}

Expand All @@ -151,7 +157,7 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
if (newQualifiers == qualifiers) {
this
} else {
AttributeReference(name, dataType, nullable)(exprId, newQualifiers)
AttributeReference(name, dataType, nullable, metadata)(exprId, newQualifiers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,14 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
* @param name The name of this field.
* @param dataType The data type of this field.
* @param nullable Indicates if values of this field can be `null` values.
* @param metadata The metadata of this field, which is a map from string to simple type that can be
* serialized to JSON automatically.
*/
case class StructField(name: String, dataType: DataType, nullable: Boolean) {
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean,
metadata: Map[String, Any] = Map.empty) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
Expand All @@ -307,7 +313,7 @@ case class StructField(name: String, dataType: DataType, nullable: Boolean) {

object StructType {
protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
}

case class StructType(fields: Seq[StructField]) extends DataType {
Expand Down Expand Up @@ -342,7 +348,7 @@ case class StructType(fields: Seq[StructField]) extends DataType {
}

protected[sql] def toAttributes =
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
fields.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())

def treeString: String = {
val builder = new StringBuilder
Expand Down
13 changes: 4 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,15 @@ private[sql] object JsonRDD extends Logging {
}
}.flatMap(field => field).toSeq

StructType(
(topLevelFields ++ structFields).sortBy {
case StructField(name, _, _) => name
})
StructType((topLevelFields ++ structFields).sortBy(_.name))
}

makeStruct(resolved.keySet.toSeq, Nil)
}

private[sql] def nullTypeToStringType(struct: StructType): StructType = {
val fields = struct.fields.map {
case StructField(fieldName, dataType, nullable) => {
case StructField(fieldName, dataType, nullable, _) => {
val newType = dataType match {
case NullType => StringType
case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
Expand Down Expand Up @@ -158,9 +155,7 @@ private[sql] object JsonRDD extends Logging {
StructField(name, dataType, true)
}
}
StructType(newFields.toSeq.sortBy {
case StructField(name, _, _) => name
})
StructType(newFields.toSeq.sortBy(_.name))
}
case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
Expand Down Expand Up @@ -385,7 +380,7 @@ private[sql] object JsonRDD extends Logging {
// TODO: Reuse the row instead of creating a new one for every record.
val row = new GenericMutableRow(schema.fields.length)
schema.fields.zipWithIndex.foreach {
case (StructField(name, dataType, _), i) =>
case (StructField(name, dataType, _, _), i) =>
row.update(i, json.get(name).flatMap(v => Option(v)).map(
enforceCorrectType(_, dataType)).orNull)
}
Expand Down

0 comments on commit c194d5e

Please sign in to comment.