Skip to content

passionke/starry

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

What's Starry?

Starry brings amazingly 1-2ms response time to spark when you deploy spark application with local mode.

Why Starry

Since Spark supports complex SQL and if you want a memory db, with Starry, Spark will be a possible solution. We also use starry to deploy ML predict service.

maven repo

<dependency>
  <groupId>com.github.passionke</groupId>
  <artifactId>starry</artifactId>
  <version>1.0</version>
</dependency>

Quick tutorial

Starry has enhanced SparkContext and Spark SQL Engine, so you should use StarrySparkContext instead of original SparkContext.

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.execution.LocalBasedStrategies

val sparkConf = new SparkConf()
  sparkConf.setMaster("local[*]")
  sparkConf.setAppName("aloha")
  sparkConf
    .set("spark.default.parallelism", "1")
    .set("spark.sql.shuffle.partitions", "1")
    .set("spark.sql.codegen.wholeStage", "false")
    .set("spark.sql.extensions", "org.apache.spark.sql.StarrySparkSessionExtension")
  
  val sparkContext = new StarrySparkContext(sparkConf)
  // now you have got one enhanced sparkSession. Using it just as usual.
  val sparkSession: SparkSession =
    SparkSession.builder
      .sparkContext(sparkContext)
      .getOrCreate
  // Starry also provide some extraStrategies optimized for local mode. 
  // Using LocalBasedStrategies to register.
  LocalBasedStrategies.register(sparkSession)  

Tips

Use createDataSet(Seq[T]) instead of createDataSet(RDD[T]) to build your data Eg.

//do like this:
val strList = JSONArray.fromObject(param("data", "[]")).map(f => StringFeature(f.toString))
import sparkSession.implicits._
val res = sparkSession.createDataset(strList).selectExpr(sql).toJSON.collect().mkString(",")

//do not like this:
val strList = JSONArray.fromObject(param("data", "[]")).map(f => StringFeature(f.toString))
val rdd = sparkSession.sparkContext.parallelize(strList, perRequestCoreNum)
import sparkSession.implicits._
val res = sparkSession.createDataset(rdd).selectExpr(sql).toJSON.collect().mkString(",")

If you want to load data from HDFS ,you can try code like following:

// unRegistering dynamically starry strategies from SparkSession
// then load them in memory.
val df = sparkSession.read.load(tablePath)
LocalBasedStrategies.unRegister(sparkSession)
val rows = df.collectAsList()

// Register starry strategies  again 
LocalBasedStrategies.register(sparkSession)
// Create DataFrame using List not RDD 
sparkSession.createDataFrame(rows, df.schema).createOrReplaceTempView(tableName)

When you query the data again, you will find it amazing fast.

About

fast spark local mode

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published