Skip to content

Commit

Permalink
[CARBONDATA-3628] Support alter hive table add array and map type column
Browse files Browse the repository at this point in the history
Support adding array and map data type column by ALTER TABLE

This closes apache#3529
  • Loading branch information
IceMimosa authored and h00424960 committed Dec 30, 2019
1 parent 7fabf9f commit c1db16b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,31 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
assert(exception.getMessage.contains("Unsupported alter operation on hive table"))
} else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
sql("insert into alter_hive select 'abc','banglore'")
sql("alter table alter_hive add columns (var map<string, string>)")
sql("insert into alter_hive select 'abc','banglore',map('age','10','birth','2020')")
checkAnswer(
sql("select * from alter_hive"),
Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020")))
)
}
}

test("Alter table add column for hive partitioned table for spark version above 2.1") {
sql("drop table if exists alter_hive")
sql("create table alter_hive(name string) stored as rcfile partitioned by (dt string)")
if (SparkUtil.isSparkVersionXandAbove("2.2")) {
sql("alter table alter_hive add columns(add string)")
sql("alter table alter_hive add columns (var map<string, string>)")
sql("alter table alter_hive add columns (loves array<string>)")
sql(
s"""
|insert into alter_hive partition(dt='par')
|select 'abc', 'banglore', map('age', '10', 'birth', '2020'), array('a', 'b', 'c')
""".stripMargin)
checkAnswer(
sql("select * from alter_hive where dt='par'"),
Seq(Row("abc", "banglore", Map("age" -> "10", "birth" -> "2020"), Seq("a", "b", "c"), "par"))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class SDKwriterTestCase extends QueryTest with BeforeAndAfterEach {
}

def deleteFile(path: String, extension: String): Unit = {
val file: CarbonFile = FileFactory
.getCarbonFile(path, FileFactory.getFileType(path))
val file: CarbonFile = FileFactory.getCarbonFile(path)

for (eachDir <- file.listFiles) {
if (!eachDir.isDirectory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ class QueryTest extends PlanTest with Suite {

protected def checkAnswer(carbon: String, hive: String, uniqueIdentifier: String): Unit = {
val path = TestQueryExecutor.hiveresultpath + "/" + uniqueIdentifier
if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
val objinp = new ObjectInputStream(FileFactory
.getDataInputStream(path, FileFactory.getFileType(path)))
if (FileFactory.isFileExist(path)) {
val objinp = new ObjectInputStream(FileFactory.getDataInputStream(path))
val rows = objinp.readObject().asInstanceOf[Array[Row]]
objinp.close()
QueryTest.checkAnswer(sql(carbon), rows) match {
case Some(errorMessage) => {
FileFactory.deleteFile(path, FileFactory.getFileType(path))
FileFactory.deleteFile(path)
writeAndCheckAnswer(carbon, hive, path)
}
case None =>
Expand All @@ -107,8 +106,7 @@ class QueryTest extends PlanTest with Suite {

private def writeAndCheckAnswer(carbon: String, hive: String, path: String): Unit = {
val rows = sql(hive).collect()
val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path, FileFactory
.getFileType(path)))
val obj = new ObjectOutputStream(FileFactory.getDataOutputStream(path))
obj.writeObject(rows)
obj.close()
checkAnswer(sql(carbon), rows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.carbondata.spark.rdd

import java.io.IOException
import java.util
import java.util.{Collections, List, Map}
import java.util.{Collections, List}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
Expand All @@ -32,10 +32,11 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.{CarbonException, SparkTypeConverter}
import org.apache.spark.sql.util.CarbonException

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
Expand Down Expand Up @@ -64,8 +65,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger._
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.MergeResult
import org.apache.carbondata.spark.load.{ByteArrayOrdering, DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
import org.apache.carbondata.spark.load.{DataLoadProcessBuilderOnSpark, PrimtiveOrdering, StringOrdering}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}

class CarbonMergerRDD[K, V](
@transient private val ss: SparkSession,
Expand Down Expand Up @@ -680,7 +681,7 @@ class CarbonMergerRDD[K, V](
partitionNames = null,
splits = allSplits)
val objectOrdering: Ordering[Object] = createOrderingForColumn(rangeColumn)
val sparkDataType = Util.convertCarbonToSparkDataType(dataType)
val sparkDataType = CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dataType)
// Change string type to support all types
val sampleRdd = scanRdd
.map(row => (row.get(0, sparkDataType), null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.strategy

import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand All @@ -35,11 +36,9 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CarbonReflectionUtils, DataMapUtil, FileUtils, SparkUtil}

import org.apache.carbondata.common.exceptions.DeprecatedFeatureException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.spark.util.Util

/**
* Carbon strategies for ddl commands
Expand Down Expand Up @@ -176,8 +175,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
val structField = (alterTableAddColumnsModel.dimCols ++ alterTableAddColumnsModel.msrCols)
.map {
a =>
StructField(a.column,
Util.convertCarbonToSparkDataType(DataTypeUtil.valueOf(a.dataType.get)))
StructField(a.column, CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(
DataTypeUtil.valueOf(a.dataType.get)))
}
val identifier = TableIdentifier(
alterTableAddColumnsModel.tableName,
Expand Down

0 comments on commit c1db16b

Please sign in to comment.