diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 17c21f6e3a0e9..45f5da387692e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -747,7 +747,19 @@ class DataFrame private[sql]( * Returns a new [[DataFrame]] by adding a column. * @group dfops */ - def withColumn(colName: String, col: Column): DataFrame = select(Column("*"), col.as(colName)) + def withColumn(colName: String, col: Column): DataFrame = { + val resolver = sqlContext.analyzer.resolver + val replaced = schema.exists(f => resolver(f.name, colName)) + if (replaced) { + val colNames = schema.map { field => + val name = field.name + if (resolver(name, colName)) col.as(colName) else Column(name) + } + select(colNames :_*) + } else { + select(Column("*"), col.as(colName)) + } + } /** * Returns a new [[DataFrame]] with a column renamed. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3250ab476aeb4..b9b6a400ae195 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -473,6 +473,14 @@ class DataFrameSuite extends QueryTest { assert(df.schema.map(_.name).toSeq === Seq("key", "value", "newCol")) } + test("replace column using withColumn") { + val df2 = TestSQLContext.sparkContext.parallelize(Array(1, 2, 3)).toDF("x") + val df3 = df2.withColumn("x", df2("x") + 1) + checkAnswer( + df3.select("x"), + Row(2) :: Row(3) :: Row(4) :: Nil) + } + test("withColumnRenamed") { val df = testData.toDF().withColumn("newCol", col("key") + 1) .withColumnRenamed("value", "valueRenamed")