Skip to content

Commit

Permalink
Cherry pick to 1.0 (#329)
Browse files Browse the repository at this point in the history
* [TISPARK-27]Fix KV error handler logic when encounter store zero store id problem. (#324)

* Refix stale epoch handling (#325)

* Merge master

* Adjust retry policy factor and cache invalidation (#326)

* Fix map single table (#319)

* [TISPARK-25]Improve downgrade logic and efficiency (#310)
  • Loading branch information
Novemser authored and ilovesoup committed Apr 25, 2018
1 parent f538882 commit 017d439
Show file tree
Hide file tree
Showing 67 changed files with 976 additions and 348 deletions.
27 changes: 24 additions & 3 deletions README.md
Expand Up @@ -31,12 +31,33 @@ Uses as below
import org.apache.spark.sql.TiContext
val ti = new TiContext(spark)
// Mapping all TiDB tables from database tpch as Spark SQL tables
// Map all TiDB tables from database tpch as Spark SQL tables
ti.tidbMapDatabase("tpch")
spark.sql("select count(*) from lineitem").show
```

## Metadata loading
If you are using spark-shell, you need to manually load schema information as decribed above.

If you have too many tables, you might choose to disable histogram preparison and loading will be faster.

```
ti.tidbMapDatabase("tpch", autoLoadStatistics = true)
```

If you have two tables with same name in different databases, you might choose to append database name as prefix for table name:

```
ti.tidbMapDatabase("tpch", dbNameAsPrefix = true)
```

If you have too many tables and use only some of them, to speed up meta loading process, you might manually load only tables you use:

```
ti.tidbTable("tpch", "lineitem")
```

## Current Version
```
ti.version
Expand All @@ -58,13 +79,13 @@ Below configurations can be put together with spark-defaults.conf or passed in t
| spark.tispark.meta.reload_period_in_sec | 60 | Metastore reload period in seconds |
| spark.tispark.plan.allow_agg_pushdown | true | If allow aggregation pushdown (in case of busy TiKV nodes) |
| spark.tispark.plan.allow_index_read | false | If allow index read (which might cause heavy pressure on TiKV) |
| spark.tispark.index.scan_batch_size | 2000000 | How many row key in batch for concurrent index scan |
| spark.tispark.index.scan_batch_size | 20000 | How many row key in batch for concurrent index scan |
| spark.tispark.index.scan_concurrency | 5 | Maximal threads for index scan retrieving row keys (shared among tasks inside each JVM) |
| spark.tispark.table.scan_concurrency | 512 | Maximal threads for table scan (shared among tasks inside each JVM) |
| spark.tispark.request.command.priority | "Low" | "Low", "Normal", "High" which impacts resource to get in TiKV. Low is recommended for not disturbing OLTP workload |
| spark.tispark.coprocess.streaming | false | Whether to use streaming for response fetching |
| spark.tispark.plan.unsupported_pushdown_exprs | "" | A comma separated list of expressions. In case you have very old version of TiKV, you might disable some of the expression push-down if not supported |
| spark.tispark.plan.downgrade.index_threshold | 100000 | If index scan handles for one region exceeds this limit in original request, downgrade the request to a full table scan rather than original planned index scan |
| spark.tispark.plan.downgrade.index_threshold | 10000 | If index scan ranges on one region exceeds this limit in original request, downgrade this region's request to table scan rather than original planned index scan |
| spark.tispark.type.unsupported_mysql_types | "time,enum,set,year,json" | A comma separated list of mysql types TiSpark does not support currently, refer to `Unsupported MySQL Type List` below |
| spark.tispark.request.timezone.offset | Local Timezone offset | An integer, represents timezone offset to UTC time(like 28800, GMT+8), this value will be added to requests issued to TiKV |

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/com/pingcap/tispark/TiDBRelation.scala
Expand Up @@ -18,7 +18,6 @@ package com.pingcap.tispark
import com.pingcap.tikv.TiSession
import com.pingcap.tikv.exception.TiClientInternalException
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo, TiTimestamp}
import com.pingcap.tispark.statistics.StatisticsManager
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/com/pingcap/tispark/TiSessionCache.scala
Expand Up @@ -18,7 +18,7 @@ package com.pingcap.tispark
import java.util.HashMap

import com.pingcap.tikv.{TiConfiguration, TiSession}
import com.pingcap.tispark.listener.CacheListenerManager
import com.pingcap.tispark.listener.CacheInvalidateListener

object TiSessionCache {
private val sessionCache: HashMap[String, TiSession] = new HashMap[String, TiSession]()
Expand All @@ -27,7 +27,6 @@ object TiSessionCache {
val session = sessionCache.get(appId)
if (session == null) {
val newSession = TiSession.create(conf)
newSession.injectCallBackFunc(CacheListenerManager.CACHE_ACCUMULATOR_FUNCTION)
sessionCache.put(appId, newSession)
newSession
} else {
Expand Down
12 changes: 3 additions & 9 deletions core/src/main/scala/com/pingcap/tispark/TiUtils.scala
Expand Up @@ -25,7 +25,7 @@ import com.pingcap.tikv.meta.{TiColumnInfo, TiDAGRequest, TiTableInfo}
import com.pingcap.tikv.region.RegionStoreClient.RequestTypes
import com.pingcap.tikv.types._
import com.pingcap.tikv.{TiConfiguration, TiSession}
import com.pingcap.tispark.listener.CacheListenerManager
import com.pingcap.tispark.listener.CacheInvalidateListener
import com.pingcap.tispark.statistics.StatisticsManager
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal, NamedExpression}
Expand Down Expand Up @@ -209,20 +209,14 @@ object TiUtils {
val priority = CommandPri.valueOf(conf.get(TiConfigConst.REQUEST_COMMAND_PRIORITY))
tiConf.setCommandPriority(priority)
}

if (conf.contains(TiConfigConst.REGION_INDEX_SCAN_DOWNGRADE_THRESHOLD)) {
tiConf.setRegionIndexScanDowngradeThreshold(
conf.get(TiConfigConst.REGION_INDEX_SCAN_DOWNGRADE_THRESHOLD).toLong
)
}
tiConf
}

def sessionInitialize(session: SparkSession, tiSession: TiSession): Unit = {
session.experimental.extraStrategies ++= Seq(new TiStrategy(session.sqlContext))
session.udf.register("ti_version", () => TiSparkVersion.version)
CacheListenerManager.initCacheListener(session.sparkContext, tiSession.getRegionManager)
tiSession.injectCallBackFunc(CacheListenerManager.CACHE_ACCUMULATOR_FUNCTION)
CacheInvalidateListener.initCacheListener(session.sparkContext, tiSession.getRegionManager)
tiSession.injectCallBackFunc(CacheInvalidateListener.getInstance())
StatisticsManager.initStatisticsManager(tiSession, session)
}

Expand Down
Expand Up @@ -20,7 +20,7 @@ package com.pingcap.tispark.handler
import com.pingcap.tikv.event.CacheInvalidateEvent
import com.pingcap.tikv.event.CacheInvalidateEvent.CacheType
import com.pingcap.tikv.region.RegionManager
import com.pingcap.tispark.listener.CacheListenerManager.CACHE_INVALIDATE_ACCUMULATOR
import com.pingcap.tispark.listener.CacheInvalidateListener
import org.slf4j.LoggerFactory

/**
Expand Down Expand Up @@ -67,7 +67,7 @@ class CacheInvalidateEventHandler(regionManager: RegionManager) {
logger.error(s"Updating cache failed:${e.getMessage}")
return
}
CACHE_INVALIDATE_ACCUMULATOR.remove(event)
CacheInvalidateListener.getInstance().CACHE_INVALIDATE_ACCUMULATOR.remove(event)
}
}

Expand Down
Expand Up @@ -21,56 +21,63 @@ import com.pingcap.tikv.event.CacheInvalidateEvent
import com.pingcap.tikv.region.RegionManager
import com.pingcap.tispark.accumulator.CacheInvalidateAccumulator
import com.pingcap.tispark.handler.CacheInvalidateEventHandler
import com.pingcap.tispark.listener.CacheListenerManager._
import org.apache.log4j.Logger
import org.apache.spark.SparkContext

/**
* Initialize cache invalidation frame work for the given session.
*
* @param sc The spark SparkContext used for attaching a cache listener.
* @param regionManager The RegionManager to invalidate local cache.
*/
private class CacheListenerManager(sc: SparkContext, regionManager: RegionManager) {
def init(): Unit = {
if (sc != null && regionManager != null) {
sc.register(CACHE_INVALIDATE_ACCUMULATOR, CACHE_ACCUMULATOR_NAME)
sc.addSparkListener(
new PDCacheInvalidateListener(
CACHE_INVALIDATE_ACCUMULATOR,
CacheInvalidateEventHandler(regionManager)
)
)
}
class CacheInvalidateListener()
extends Serializable
with java.util.function.Function[CacheInvalidateEvent, Void] {
final val CACHE_ACCUMULATOR_NAME = "CacheInvalidateAccumulator"
final val CACHE_INVALIDATE_ACCUMULATOR = new CacheInvalidateAccumulator

override def apply(t: CacheInvalidateEvent): Void = {
// this operation shall be executed in executor nodes
CACHE_INVALIDATE_ACCUMULATOR.add(t)
null
}
}

object CacheListenerManager {
private var manager: CacheListenerManager = _
object CacheInvalidateListener {
private var manager: CacheInvalidateListener = _
private final val logger = Logger.getLogger(getClass.getName)
final val CACHE_ACCUMULATOR_NAME = "CacheInvalidateAccumulator"
final val CACHE_INVALIDATE_ACCUMULATOR = new CacheInvalidateAccumulator
final var CACHE_ACCUMULATOR_FUNCTION =
new java.util.function.Function[CacheInvalidateEvent, Void] {
override def apply(t: CacheInvalidateEvent): Void = {
// this operation shall be executed in executor nodes
CACHE_INVALIDATE_ACCUMULATOR.add(t)
null
}

def getInstance(): CacheInvalidateListener = {
if (manager == null) {
throw new RuntimeException("CacheListenerManager has not been initialized properly.")
}
manager
}

/**
* Initialize cache invalidation frame work for the given session.
*
* @param sc The spark SparkContext used for attaching a cache listener.
* @param regionManager The RegionManager to invalidate local cache.
*/
def initCacheListener(sc: SparkContext, regionManager: RegionManager): Unit = {
if (manager == null) {
synchronized {
if (manager == null) {
try {
manager = new CacheListenerManager(sc, regionManager)
manager.init()
manager = new CacheInvalidateListener()
init(sc, regionManager, manager)
} catch {
case e: Throwable => logger.trace(s"Init CacheListener failed:${e.getMessage}")
case e: Throwable => logger.error(s"Init CacheListener failed.", e)
}
}
}
}
}

def init(sc: SparkContext, regionManager: RegionManager, manager: CacheInvalidateListener): Unit = {
if (sc != null && regionManager != null) {
sc.register(manager.CACHE_INVALIDATE_ACCUMULATOR, manager.CACHE_ACCUMULATOR_NAME)
sc.addSparkListener(
new PDCacheInvalidateListener(
manager.CACHE_INVALIDATE_ACCUMULATOR,
CacheInvalidateEventHandler(regionManager)
)
)
}
}
}
Expand Up @@ -93,7 +93,7 @@ class StatisticsManager(tiSession: TiSession) {
private final lazy val logger = LoggerFactory.getLogger(getClass.getName)
private final val statisticsMap = CacheBuilder
.newBuilder()
.build[Object, Object]
.build[java.lang.Long, TableStatistics]

/**
* Load statistics information maintained by TiDB to TiSpark.
Expand Down Expand Up @@ -130,7 +130,7 @@ class StatisticsManager(tiSession: TiSession) {

// use cached one for incremental update
val tblStatistic = if (statisticsMap.asMap.containsKey(tblId)) {
statisticsMap.getIfPresent(tblId).asInstanceOf[TableStatistics]
statisticsMap.getIfPresent(tblId)
} else {
new TableStatistics(tblId)
}
Expand Down Expand Up @@ -161,7 +161,7 @@ class StatisticsManager(tiSession: TiSession) {
// Update cache
results.foreach { putOrUpdateTblStats(tblStatistic, _) }

statisticsMap.put(tblId.asInstanceOf[Object], tblStatistic.asInstanceOf[Object])
statisticsMap.put(tblId, tblStatistic)
}

private def putOrUpdateTblStats(tblStatistic: TableStatistics, result: StatisticsResult): Unit =
Expand Down Expand Up @@ -232,7 +232,7 @@ class StatisticsManager(tiSession: TiSession) {
}

def getTableStatistics(id: Long): TableStatistics = {
statisticsMap.getIfPresent(id).asInstanceOf[TableStatistics]
statisticsMap.getIfPresent(id)
}

/**
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/TiContext.scala
Expand Up @@ -129,13 +129,15 @@ class TiContext(val session: SparkSession) extends Serializable with Logging {
}
}

def tidbTable(dbName: String, tableName: String): DataFrame = {
// tidbMapTable does not do any check any meta information
// it just register table for later use
def tidbMapTable(dbName: String, tableName: String): Unit = {
val tiRelation = new TiDBRelation(
tiSession,
new TiTableReference(dbName, tableName),
meta
)(sqlContext)
sqlContext.baseRelationToDataFrame(tiRelation)
sqlContext.baseRelationToDataFrame(tiRelation).createTempView(tableName)
}

def tidbMapDatabase(dbName: String,
Expand Down

0 comments on commit 017d439

Please sign in to comment.