Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support index upsert #819

Merged
merged 18 commits into from
Jun 26, 2019
Merged

Support index upsert #819

merged 18 commits into from
Jun 26, 2019

Conversation

zhexuany
Copy link
Contributor

In this PR, we support pk and index upsert.

@marsishandsome
Copy link
Collaborator

marsishandsome commented Jun 14, 2019

the following test fails

class TestSuite extends BaseDataSourceTest("test_datasource_basic2") {
  private val row3 = Row(3)
  private val row4 = Row(4)

  private val schema = StructType(
    List(
      StructField("i", IntegerType)
    )
  )

  override def beforeAll(): Unit = {
    super.beforeAll()

    dropTable()
    jdbcUpdate(s"create table $dbtable(i int primary key)")
  }

  test("Test Write Append") {
    val data: RDD[Row] = sc.makeRDD(List(row3, row4))
    val df = sqlContext.createDataFrame(data, schema)

    df.write
      .format("tidb")
      .options(tidbOptions)
      .option("database", database)
      .option("table", table)
      .mode("append")
      .save()

    testTiDBSelect(Seq(row3, row4))
  }

  override def afterAll(): Unit =
    try {
      dropTable()
    } finally {
      super.afterAll()
    }
}

error message

== Results ==
!== Correct Answer - 2 ==   == Spark Answer - 1 ==
![3]                        [4]
![4]                        

@zhexuany
Copy link
Contributor Author

@marsishandsome The test case is due to a bug in encodeRow.
Before

  // We could not set nil value into kv.
    if (values.length == 0) {
      return new byte[] {Codec.NULL_FLAG};
    }

After

  // We could not set nil value into kv.
    if (cdo.toBytes().length == 0) {
      return new byte[] {Codec.NULL_FLAG};
    }

@zhexuany zhexuany changed the title Support index upset Support index upsert Jun 17, 2019
@zhexuany zhexuany force-pushed the support_index_upset branch 2 times, most recently from c5b699d to 46ed424 Compare June 19, 2019 04:28
@zhexuany
Copy link
Contributor Author

/run-all-tests

@zhexuany zhexuany force-pushed the support_index_upset branch 2 times, most recently from bc90f7a to b657a9a Compare June 19, 2019 17:02
Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebase #843 to let build success

Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls update documents in the same PR

@zhexuany
Copy link
Contributor Author

/run-all-tests

Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add some test in DeduplicateSuite to test the following code

 uniqueIndices.foreach { index =>
      {
        mutableRdd = mutableRdd
          .map { wrappedRow =>
            val indexKey = buildUniqueIndexKey(wrappedRow.row, index)
            (indexKey, wrappedRow)
          }
          .groupByKey()
          .map(_._2.head)
      }
    }

Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would u pls provide a test design document?

@@ -428,6 +471,39 @@ class TiBatchWrite(@transient val df: DataFrame,
}
}

private def generateDataToBeRemovedRdd(rdd: RDD[WrappedRow], startTs: TiTimestamp) = {
rdd
.map { wrappedRow =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change rdd.map to rdd.mapPartitions, cause if use rdd.map the following code will be called rdd.size times

val snapshot = TiSessionCache.getSession(tiConf).createSnapshot(startTs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is planned to be done when we support batch get.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be done without batch get.

core/src/main/scala/com/pingcap/tispark/TiBatchWrite.scala Outdated Show resolved Hide resolved
Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly LGTM


If `replace` is false, then
* if primary key or unique index exists in db, data having conflicts expects an expection.
* if no same primary key or unique index exists, data will be inserted.

| SaveMode | Support | Semantics |
| -------- | ------- | --------- |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls update this line

TiSpark's `Append` means upsert. If primary key is same, data will be updated; if no same primary key exists, data will be inserted.

Copy link
Collaborator

@marsishandsome marsishandsome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@zhexuany
Copy link
Contributor Author

/run-all-tests

@windtalker
Copy link
Collaborator

LGTM

@zhexuany zhexuany merged commit 65926ce into pingcap:master Jun 26, 2019
@zhexuany zhexuany deleted the support_index_upset branch June 26, 2019 07:18
birdstorm pushed a commit that referenced this pull request Jun 26, 2019
wfxxh pushed a commit to wanfangdata/tispark that referenced this pull request Jun 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants