Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
52bafa2
add gitignore
yegetables Jul 21, 2022
4cb5f6a
ADD: tikv/client-java
yegetables Jul 21, 2022
3b52a83
change shade package name
yegetables Jul 21, 2022
cad1c57
add TiConfigurationConverter
yegetables Jul 21, 2022
e3f4375
replace scala code use ClientSession(hold org.tikv.common.TiSession)
yegetables Jul 21, 2022
aa1eafb
fix compiler error
yegetables Jul 22, 2022
9903557
fix core test error
yegetables Jul 22, 2022
f5888ea
grpc exception
yegetables Jul 23, 2022
f3dce0e
delete pd
yegetables Jul 23, 2022
4441f5d
change package name ExtendedDateTime
yegetables Jul 23, 2022
617f600
use upstream exception
yegetables Jul 24, 2022
4a78253
remove catalogTest
yegetables Jul 25, 2022
056a53d
remove tisession init
yegetables Jul 25, 2022
4d8414b
Merge remote-tracking branch 'upstream/master' into normalize
yegetables Jul 26, 2022
a1ce3ed
merge master
yegetables Jul 26, 2022
d265afd
fix init ClientSession nullpoint
yegetables Jul 26, 2022
c64ecfe
replace tisession to clientSession
yegetables Jul 26, 2022
3694959
replace session/tisession to clientSession
yegetables Jul 26, 2022
5c409a5
merge master
yegetables Jul 26, 2022
e052330
restore ut ci
yegetables Jul 26, 2022
16c7b39
recover TIRDD args
yegetables Jul 26, 2022
563007b
fix nullpoint error
yegetables Jul 26, 2022
29078be
fix Handle
yegetables Jul 27, 2022
db5bcbb
Use RangeSplitter
yegetables Jul 27, 2022
6bcb59d
remove exception.
yegetables Jul 27, 2022
cd4d8dd
try not updateLeader
yegetables Jul 27, 2022
9dec854
Merge remote-tracking branch 'upstream/master' into normalize
yegetables Jul 28, 2022
b0c3eb2
remove exception.ExceptionName prefix
yegetables Jul 29, 2022
c50c1ce
fix create clientsession reuse old
yegetables Jul 29, 2022
870aece
fix txn sleep time
yegetables Jul 29, 2022
827395a
merge master
yegetables Jul 29, 2022
2c1b131
fix package shade name
yegetables Jul 29, 2022
5fe3fc1
fix merge error
yegetables Jul 29, 2022
d07e56a
remove twophaseCommitter
yegetables Jul 30, 2022
3b5f33d
fix timeout txn test bug
yegetables Aug 1, 2022
2c9b73d
fix fmt
yegetables Aug 1, 2022
b1ef700
merge master
yegetables Aug 4, 2022
bb7f3da
fix merge bug
yegetables Aug 4, 2022
e5e52ae
ignore batchGetRetryTest
yegetables Aug 4, 2022
8d79860
[Design] Tispark support insert using SQL (#2464)
TrafalgarRicardoLu Aug 8, 2022
d54cf57
remove unuse update
yegetables Aug 8, 2022
0b17797
remove unuse update
yegetables Aug 8, 2022
e3c383b
Merge branch 'master' into test-other
yegetables Aug 8, 2022
aa75619
remove unuse update
yegetables Aug 9, 2022
6653ce5
fix Ticonfiguration comments
yegetables Aug 11, 2022
2241cb3
add design-doc normalize-client
yegetables Aug 11, 2022
27cf6f5
Modify ClientSessionw to singleton mode
yegetables Aug 11, 2022
9d8fb87
change name ConvertUpstream -> ConvertUpstreamUtils
yegetables Aug 11, 2022
efe4d14
fix clientsession singleton mode
yegetables Aug 11, 2022
ec0521f
Update 2022-08-11-normalize-client.md
yegetables Aug 12, 2022
b661f4f
fix doc class name outdated
yegetables Aug 12, 2022
55a98ea
remove unnecessary tests
yegetables Aug 12, 2022
5faf820
fmt
yegetables Aug 12, 2022
b0d31b2
remove unusedconf
yegetables Aug 12, 2022
704d020
fix:doc
yegetables Aug 12, 2022
1c02974
fix:doc
yegetables Aug 12, 2022
d67098d
fix getTikvSession -> getTiKVSession
yegetables Aug 15, 2022
dfeb314
Only do auth check for tables in TiDB (#2489)
TrafalgarRicardoLu Aug 13, 2022
746c555
change method name minTiKVVersion to isTiKVVersionGreatEqualThanVer…
yegetables Aug 15, 2022
237c6d8
test why fmt error
yegetables Aug 15, 2022
1cb5e8c
Merge branch 'master' into test-other
yegetables Aug 15, 2022
d598c02
test why fmt error
yegetables Aug 15, 2022
817381f
fix fmt
yegetables Aug 15, 2022
102e280
fit fmt
yegetables Aug 15, 2022
b644476
Update docs/design/2022-08-11-normalize-client.md
yegetables Aug 15, 2022
096dd9c
Update docs/design/2022-08-11-normalize-client.md
yegetables Aug 15, 2022
b5cc4f6
Update docs/design/2022-08-11-normalize-client.md
yegetables Aug 15, 2022
ed5e6fe
fit fmt
yegetables Aug 15, 2022
68d4f3a
megre mastre
yegetables Aug 18, 2022
833be1f
Merge branch 'master' into test-other
xuanyu66 Aug 23, 2022
8c342bf
Merge remote-tracking branch 'upstream/master' into test-other
yegetables Aug 23, 2022
b9f8eb7
Update docs/design/2022-08-11-normalize-client.md
yegetables Aug 23, 2022
4d45e54
Modify some documentation about configuration items
yegetables Aug 23, 2022
a839b80
Add note about testing "batch get retry test"
yegetables Aug 23, 2022
cadbec8
fix build error
yegetables Aug 24, 2022
51a41e6
ClientSession add getCatalog method
yegetables Aug 24, 2022
a301ba8
remove tikv.client tests
yegetables Aug 24, 2022
ffa6f92
remove tikv.client tests
yegetables Aug 24, 2022
435f0b6
Merge branch 'master' into test-other
Daemonxiao Aug 30, 2022
7f3590a
merge
yegetables Aug 30, 2022
81f5426
Merge branch 'master' into test-other
shiyuhang0 Sep 5, 2022
3799480
flush tiflash cache
yegetables Sep 6, 2022
b669eda
Merge remote-tracking branch 'upstream/master' into test-other
yegetables Sep 8, 2022
2701a36
fix merge import
yegetables Sep 8, 2022
9b6f7ed
Merge branch 'master' into test-other
yegetables Sep 8, 2022
d4294ba
Merge branch 'master' into test-other
Daemonxiao Sep 13, 2022
ae6c445
fix bug
Daemonxiao Sep 15, 2022
d0315fe
Update ClientSession construct
Daemonxiao Sep 15, 2022
a413c04
fmt
Daemonxiao Sep 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.pingcap.tikv.hostmap;

import com.pingcap.tikv.util.HostMapping;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
Expand All @@ -25,6 +24,7 @@
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.HostMapping;

public class UriHostMapping implements HostMapping {

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiPartition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.pingcap.tispark

import com.pingcap.tikv.util.RangeSplitter.RegionTask
import org.tikv.common.util.RangeSplitter.RegionTask
import org.apache.spark.Partition

class TiPartition(val idx: Int, val tasks: Seq[RegionTask], val appId: String) extends Partition {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/pingcap/tispark/TiSparkInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.pingcap.tispark

import com.pingcap.tikv.exception.TiInternalException
import org.slf4j.LoggerFactory
import org.tikv.common.exception.TiInternalException

object TiSparkInfo {
private final val logger = LoggerFactory.getLogger(getClass.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
package com.pingcap.tispark.accumulator

import java.util

import com.pingcap.tikv.event.CacheInvalidateEvent
import org.apache.spark.util.AccumulatorV2
import org.tikv.common.event.CacheInvalidateEvent

import scala.collection.JavaConversions._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

package com.pingcap.tispark.handler

import com.pingcap.tikv.event.CacheInvalidateEvent
import com.pingcap.tikv.event.CacheInvalidateEvent.CacheType
import com.pingcap.tikv.region.RegionManager
import org.tikv.common.event.CacheInvalidateEvent.CacheType
import com.pingcap.tispark.listener.CacheInvalidateListener
import org.slf4j.LoggerFactory
import org.tikv.common.event.CacheInvalidateEvent
import org.tikv.common.region.RegionManager

/**
* A CacheInvalidateEventHandler as it's name indicates what this class will do.
Expand All @@ -44,8 +44,8 @@ class CacheInvalidateEventHandler(regionManager: RegionManager) {
case CacheType.REGION_STORE =>
// Used for updating region/store cache in the given regionManager
if (event.shouldUpdateRegion()) {
logger.info(s"Invalidating region ${event.getRegion.getId} cache at driver.")
val region = regionManager.getRegionByKey(event.getRegion.getStartKey)
logger.info(s"Invalidating region ${event.getRegionId} cache at driver.")
val region = regionManager.getRegionById(event.getRegionId)
if (region != null) {
regionManager.invalidateRegion(region)
}
Expand All @@ -58,16 +58,16 @@ class CacheInvalidateEventHandler(regionManager: RegionManager) {
case CacheType.LEADER =>
// Used for updating leader information cached in the given regionManager
logger.info(
s"Invalidating leader of region:${event.getRegion.getId} store:${event.getStoreId} cache at driver.")
val region = regionManager.getRegionByKey(event.getRegion.getStartKey)
s"Invalidating leader of region:${event.getRegionId} store:${event.getStoreId} cache at driver.")
val region = regionManager.getRegionById(event.getRegionId)
if (region != null) {
regionManager.updateLeader(region, event.getStoreId)
regionManager.invalidateRegion(region)
}

case CacheType.REQ_FAILED =>
logger.info(s"Request failed cache invalidation for region ${event.getRegion.getId}")
val region = regionManager.getRegionByKey(event.getRegion.getStartKey)
logger.info(s"Request failed cache invalidation for region ${event.getRegionId}")
val region = regionManager.getRegionById(event.getRegionId)
if (region != null) {
regionManager.onRequestFail(region)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package com.pingcap.tispark.listener

import com.pingcap.tikv.event.CacheInvalidateEvent
import com.pingcap.tikv.region.RegionManager
import com.pingcap.tispark.accumulator.CacheInvalidateAccumulator
import com.pingcap.tispark.handler.CacheInvalidateEventHandler
import org.apache.spark.SparkContext
import org.slf4j.LoggerFactory
import org.tikv.common.event.CacheInvalidateEvent
import org.tikv.common.region.RegionManager

class CacheInvalidateListener()
extends Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package com.pingcap.tispark.statistics

import com.google.common.primitives.UnsignedLong
import org.tikv.shade.com.google.common.primitives.UnsignedLong
import com.pingcap.tikv.expression.{ByItem, ColumnRef, ComparisonBinaryExpression, Constant}
import com.pingcap.tikv.key.Key
import com.pingcap.tikv.meta.TiDAGRequest.PushDownType
Expand All @@ -27,6 +27,7 @@ import com.pingcap.tikv.row.Row
import com.pingcap.tikv.statistics._
import com.pingcap.tikv.types.{BytesType, IntegerType}
import org.slf4j.LoggerFactory
import org.tikv.common.meta.TiTimestamp

import scala.collection.JavaConversions._
import scala.collection.mutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@

package com.pingcap.tispark.statistics

import com.google.common.cache.CacheBuilder
import com.pingcap.tikv.catalog.Catalog
import com.pingcap.tikv.meta.{TiColumnInfo, TiDAGRequest, TiIndexInfo, TiTableInfo}
import com.pingcap.tikv.row.Row
import com.pingcap.tikv.statistics._
import com.pingcap.tikv.types.DataType
import com.pingcap.tikv.{Snapshot, TiSession}
import com.pingcap.tikv.{ClientSession, Snapshot}
import com.pingcap.tispark.statistics.StatisticsHelper.shouldUpdateHistogram
import com.pingcap.tispark.statistics.estimate.{DefaultTableSizeEstimator, TableSizeEstimator}
import org.slf4j.LoggerFactory
import org.tikv.shade.com.google.common.cache.CacheBuilder

import scala.collection.JavaConversions._
import scala.collection.mutable
Expand Down Expand Up @@ -70,7 +70,7 @@ object StatisticsManager {
.newBuilder()
.build[java.lang.Long, TableStatistics]
protected var initialized: Boolean = false
private var session: TiSession = _
private var clientSession: ClientSession = _
private var snapshot: Snapshot = _
private var catalog: Catalog = _
private var dbPrefix: String = _
Expand Down Expand Up @@ -157,7 +157,7 @@ object StatisticsManager {
// load count, modify_count, version info
loadMetaToTblStats(tblId, tblStatistic)
val req = StatisticsHelper
.buildHistogramsRequest(histTable, tblId, session.getTimestamp)
.buildHistogramsRequest(histTable, tblId, clientSession.getTiKVSession.getTimestamp)

val rows = readDAGRequest(req, histTable.getId)
if (rows.isEmpty) return
Expand Down Expand Up @@ -201,7 +201,10 @@ object StatisticsManager {

private def loadMetaToTblStats(tableId: Long, tableStatistics: TableStatistics): Unit = {
val req =
StatisticsHelper.buildMetaRequest(metaTable, tableId, session.getTimestamp)
StatisticsHelper.buildMetaRequest(
metaTable,
tableId,
clientSession.getTiKVSession.getTimestamp)

val rows = readDAGRequest(req, metaTable.getId)
if (rows.isEmpty) return
Expand All @@ -219,7 +222,10 @@ object StatisticsManager {
tableId: Long,
requests: Seq[StatisticsDTO]): Seq[StatisticsResult] = {
val req =
StatisticsHelper.buildBucketRequest(bucketTable, tableId, session.getTimestamp)
StatisticsHelper.buildBucketRequest(
bucketTable,
tableId,
clientSession.getTiKVSession.getTimestamp)

val rows = readDAGRequest(req, bucketTable.getId)
if (rows.isEmpty) return Nil
Expand Down Expand Up @@ -260,28 +266,28 @@ object StatisticsManager {

def setEstimator(estimator: TableSizeEstimator): Unit = tableSizeEstimator = estimator

def initStatisticsManager(tiSession: TiSession): Unit =
def initStatisticsManager(clientSession: ClientSession): Unit =
if (!initialized) {
synchronized {
if (!initialized) {
initialize(tiSession)
initialize(clientSession)
initialized = true
}
}
}

protected def initialize(tiSession: TiSession): Unit = {
session = tiSession
snapshot = tiSession.createSnapshot()
catalog = tiSession.getCatalog
dbPrefix = tiSession.getConf.getDBPrefix
protected def initialize(clientSession: ClientSession): Unit = {
this.clientSession = clientSession
this.snapshot = clientSession.createSnapshot()
this.catalog = clientSession.getCatalog
this.dbPrefix = clientSession.getConf.getDBPrefix
// An estimator used to calculate table size.
tableSizeEstimator = DefaultTableSizeEstimator
this.tableSizeEstimator = DefaultTableSizeEstimator
val mysqlDB = catalog.getDatabaseFromCache(s"${dbPrefix}mysql")
metaTable = catalog.getTableFromCache(mysqlDB, "stats_meta")
histTable = catalog.getTableFromCache(mysqlDB, "stats_histograms")
bucketTable = catalog.getTableFromCache(mysqlDB, "stats_buckets")
statisticsMap.invalidateAll()
this.metaTable = catalog.getTableFromCache(mysqlDB, "stats_meta")
this.histTable = catalog.getTableFromCache(mysqlDB, "stats_histograms")
this.bucketTable = catalog.getTableFromCache(mysqlDB, "stats_buckets")
this.statisticsMap.invalidateAll()
}

def reset(): Unit = initialized = false
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/scala/com/pingcap/tispark/telemetry/TeleMsg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package com.pingcap.tispark.telemetry

import com.pingcap.tikv.util.ConcreteBackOffer
import com.pingcap.tikv.{TiConfiguration, TiSession, TwoPhaseCommitter}
import com.pingcap.tikv.{ClientSession, TiConfiguration}
import com.pingcap.tispark.utils.{SystemInfoUtil, TiUtil}
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import org.tikv.common.util.ConcreteBackOffer
import org.tikv.txn.TwoPhaseCommitter
import java.util.UUID

/**
Expand All @@ -45,15 +46,15 @@ class TeleMsg(sparkSession: SparkSession) {
try {
val conf = TiConfiguration.createDefault(pdAddr.get)
TiUtil.sparkConfToTiConfWithoutPD(SparkSession.active.sparkContext.getConf, conf)
val tiSession = TiSession.getInstance(conf)
val snapShot = tiSession.createSnapshot()
val clientSession = ClientSession.getInstance(conf)
val snapShot = clientSession.createSnapshot()
val value = snapShot.get(TRACK_ID.getBytes("UTF-8"))

if (value.nonEmpty)
return new String(value, "UTF-8")

val uuid = TRACK_ID_PREFIX + UUID.randomUUID().toString
putKeyValue(TRACK_ID, uuid, conf, tiSession)
putKeyValue(TRACK_ID, uuid, clientSession)
uuid
} catch {
case e: Throwable =>
Expand All @@ -62,14 +63,10 @@ class TeleMsg(sparkSession: SparkSession) {
}
}

private def putKeyValue(
key: String,
value: String,
conf: TiConfiguration,
tiSession: TiSession): Unit = {
val startTS = tiSession.getTimestamp.getVersion
private def putKeyValue(key: String, value: String, clientSession: ClientSession): Unit = {
val startTS = clientSession.getTiKVSession.getTimestamp.getVersion
try {
val twoPhaseCommitter = new TwoPhaseCommitter(conf, startTS)
val twoPhaseCommitter = new TwoPhaseCommitter(clientSession.getTiKVSession, startTS)
val backOffer = ConcreteBackOffer.newCustomBackOff(1000)
twoPhaseCommitter.prewritePrimaryKey(
backOffer,
Expand All @@ -78,7 +75,7 @@ class TeleMsg(sparkSession: SparkSession) {
twoPhaseCommitter.commitPrimaryKey(
backOffer,
key.getBytes("UTF-8"),
tiSession.getTimestamp.getVersion)
clientSession.getTiKVSession.getTimestamp.getVersion)
} catch {
case e: Throwable =>
logger.warn("Failed to set telemetry ID to TiKV.", e.getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.pingcap.tispark.telemetry

import com.pingcap.tispark.utils.{HttpClientUtil, TiUtil}
import com.pingcap.tispark.TiSparkVersion
import org.apache.spark.sql.SparkSession
import com.fasterxml.jackson.databind.ObjectMapper
import com.pingcap.tikv.TiConfiguration
import com.pingcap.tispark.TiSparkVersion
import com.pingcap.tispark.auth.TiAuthorization
import com.pingcap.tispark.utils.{HttpClientUtil, TiUtil}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.slf4j.LoggerFactory
import scalaj.http.HttpResponse
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package com.pingcap.tispark.utils
import com.pingcap.tikv.TiConfiguration
import com.pingcap.tikv.datatype.TypeMapping
import com.pingcap.tikv.hostmap.UriHostMapping
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo, TiTimestamp}
import com.pingcap.tikv.region.TiStoreType
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo}
import com.pingcap.tikv.types._
import com.pingcap.tispark._
import org.apache.commons.lang3.StringUtils
Expand All @@ -30,6 +29,9 @@ import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, sql}
import org.slf4j.LoggerFactory
import org.tikv.common.meta
import org.tikv.common.meta.TiTimestamp
import org.tikv.common.region.TiStoreType
import org.tikv.kvproto.Kvrpcpb.{CommandPri, IsolationLevel}

import java.time.{Instant, LocalDate, ZoneId}
Expand Down
Loading