Skip to content

Commit

Permalink
Merge pull request delta-io#11 from delta-io/master
Browse files Browse the repository at this point in the history
update fork
  • Loading branch information
JassAbidi committed Jul 9, 2021
2 parents cad3c3d + 86bbe99 commit 9e96cd7
Show file tree
Hide file tree
Showing 49 changed files with 2,297 additions and 1,267 deletions.
9 changes: 7 additions & 2 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ dlog () {

acquire_sbt_jar () {
SBT_VERSION=`awk -F "=" '/sbt\.version/ {print $2}' ./project/build.properties`
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
# Download sbt from mirror URL if the environment variable is provided
if [[ "${SBT_VERSION}" == "0.13.18" ]] && [[ -n "${SBT_MIRROR_JAR_URL}" ]]; then
URL1="${SBT_MIRROR_JAR_URL}"
else
URL1="https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar"
fi
JAR=build/sbt-launch-${SBT_VERSION}.jar

sbt_jar=$JAR
Expand All @@ -47,7 +52,7 @@ acquire_sbt_jar () {
# Download sbt launch jar if it hasn't been downloaded yet
if [ ! -f "${JAR}" ]; then
# Download
printf "Attempting to fetch sbt\n"
printf 'Attempting to fetch sbt from %s\n' "${URL1}"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
curl --fail --location --silent ${URL1} > "${JAR_DL}" &&\
Expand Down
8 changes: 2 additions & 6 deletions contribs/src/main/scala/io/delta/storage/IBMCOSLogStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,10 @@ class IBMCOSLogStore(sparkConf: SparkConf, hadoopConf: Configuration)
if (exists && overwrite == false) {
throw new FileAlreadyExistsException(path.toString)
} else {
// create is atomic
// write is atomic when overwrite == false
val stream = fs.create(path, overwrite)
try {
var writeSize = 0L
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(action => {
stream.write(action)
writeSize += action.length
})
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
stream.close()
} catch {
case e: IOException if isPreconditionFailure(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
new DeltaSqlParser(parser)
}
extensions.injectResolutionRule { session =>
new DeltaAnalysis(session, session.sessionState.conf)
new DeltaAnalysis(session)
}
extensions.injectCheckRule { session =>
new DeltaUnsupportedOperationsCheck(session)
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import scala.collection.JavaConverters._
import scala.collection.Map

import org.apache.spark.sql.delta.{DeltaErrors, PreprocessTableMerge}
import org.apache.spark.sql.delta.DeltaViewHelper
import org.apache.spark.sql.delta.commands.MergeIntoCommand
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.AnalysisHelper

Expand All @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.internal.SQLConf

/**
* Builder to specify how to merge data from source DataFrame into the target Delta table.
Expand Down Expand Up @@ -217,8 +220,13 @@ class DeltaMergeBuilder private(
if (!resolvedMergeInto.resolved) {
throw DeltaErrors.analysisException("Failed to resolve\n", plan = Some(resolvedMergeInto))
}
val strippedMergeInto = resolvedMergeInto.copy(
target = DeltaViewHelper.stripTempViewForMerge(resolvedMergeInto.target, SQLConf.get)
)
// Preprocess the actions and verify
val mergeIntoCommand = PreprocessTableMerge(sparkSession.sessionState.conf)(resolvedMergeInto)
val mergeIntoCommand =
PreprocessTableMerge(sparkSession.sessionState.conf)(strippedMergeInto)
.asInstanceOf[MergeIntoCommand]
sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand)
mergeIntoCommand.run(sparkSession)
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ object DeltaTable {
new DeltaTable(
sparkSession.table(tableName),
DeltaTableV2(sparkSession, new Path(tbl.location), Some(tbl), Some(tableName)))
} else if (DeltaTableUtils.isValidPath(tableId)) {
forPath(sparkSession, tableId.table)
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(table = Some(tableId)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import java.util.Locale

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -227,7 +228,9 @@ case class DeltaMergeInto(
condition: Expression,
matchedClauses: Seq[DeltaMergeIntoMatchedClause],
notMatchedClauses: Seq[DeltaMergeIntoInsertClause],
migrateSchema: Boolean) extends Command with SupportsSubquery {
migrateSchema: Boolean,
finalSchema: Option[StructType])
extends Command with SupportsSubquery {

(matchedClauses ++ notMatchedClauses).foreach(_.verifyActions())

Expand Down Expand Up @@ -271,12 +274,13 @@ object DeltaMergeInto {
condition,
whenClauses.collect { case x: DeltaMergeIntoMatchedClause => x },
whenClauses.collect { case x: DeltaMergeIntoInsertClause => x },
migrateSchema = false)
migrateSchema = false,
finalSchema = Some(target.schema))
}

def resolveReferences(merge: DeltaMergeInto, conf: SQLConf)(
resolveExpr: (Expression, LogicalPlan) => Expression): DeltaMergeInto = {
val DeltaMergeInto(target, source, condition, matchedClauses, notMatchedClause, _) = merge
val DeltaMergeInto(target, source, condition, matchedClauses, notMatchedClause, _, _) = merge

// We must do manual resolution as the expressions in different clauses of the MERGE have
// visibility of the source, the target or both. Additionally, the resolution logic operates
Expand Down Expand Up @@ -403,10 +407,24 @@ object DeltaMergeInto {
}
val containsStarAction =
(matchedClauses ++ notMatchedClause).flatMap(_.actions).exists(_.isInstanceOf[UnresolvedStar])

val migrateSchema = canAutoMigrate && containsStarAction

val finalSchema = if (migrateSchema) {
// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
// enforces Parquet-level write compatibility, which would mean an INT source can't be merged
// into a LONG target.
SchemaMergingUtils.mergeSchemas(target.schema, source.schema, allowImplicitConversions = true)
} else {
target.schema
}

val resolvedMerge = DeltaMergeInto(
target, source, resolvedCond,
resolvedMatchedClauses, resolvedNotMatchedClause,
migrateSchema = canAutoMigrate && containsStarAction)
migrateSchema = migrateSchema,
finalSchema = Some(finalSchema))

// Its possible that pre-resolved expressions (e.g. `sourceDF("key") = targetDF("key")`) have
// attribute references that are not present in the output attributes of the children (i.e.,
Expand Down
Loading

0 comments on commit 9e96cd7

Please sign in to comment.