Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

count-distinct using scramble table is slower than using original table. Using verdictDB is much slower than using spark. #400

Open
maxbeyond opened this issue Oct 16, 2021 · 0 comments

Comments

@maxbeyond
Copy link

I want to count distinct for one column. I use 3 different methods: spark SQL, verdictDB on original table, verdictDB on the scramble table. (I make a few change to the Hello.scala example. The code is in the end.) And the results are:

spark time: 0.06 seconds
verdictDB scramble time: 4.67 seconds
verdictDB original time: 2.04 seconds
verdictDB scramble time: 3.13 seconds

I don't know why verdictDB is even slower than original spark. Do you have any thoughts?

Also, there is some pull requests trying to support verdictDB on pyspark but they are not merged into master branch. I am wondering if there will be some support with pyspark.

Appreciate for your help!

package example

import org.apache.spark.sql.SparkSession
import org.verdictdb.VerdictContext
import org.verdictdb.connection.SparkConnection
import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType,StringType,StructType,StructField}

class MyTimer(val text_para: String) {
  var start_time = System.nanoTime()
  var text = text_para
  
  def stop() {
    val elapsed_time = (System.nanoTime() - start_time) / 1e9d
    printf("%s time: %.2f seconds\n", text, elapsed_time);
  }
}

object Hello extends App {
  val config = new SparkConf()
  config.set("spark.sql.storeAssignmentPolicy", "LEGACY")

  val spark = SparkSession
    .builder()
    .config(config)
    .appName("VerdictDB basic example")
    .enableHiveSupport()
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")

  import spark.implicits._
  val verdict = VerdictContext.fromSparkSession(spark)

  // prepare data
  prepareData(spark, verdict)

  val sqlDF = spark.sql("SELECT * FROM caida.sales")
  sqlDF.show(5)


  val spark_timer = new MyTimer("spark");
  val spark_rs = spark.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  spark_timer.stop()
  spark_rs.show()

val count_distinct_timer = new MyTimer("verdictDB scramble");
  val rs = verdict.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  count_distinct_timer.stop()
  // rs.show()

  verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
  val count_distinct_timer2 = new MyTimer("verdictDB original");
  val rs2 = verdict.sql("select groupby, count(distinct metric_value) from caida.sales GROUP BY groupby")
  count_distinct_timer2.stop()

  verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
  val count_distinct_timer3 = new MyTimer("verdictDB scramble");
  val rs3 = verdict.sql("select groupby, count(distinct metric_value) from caida.sales_scramble GROUP BY groupby")
  count_distinct_timer3.stop()
  
  def prepareData(spark: SparkSession, verdict: VerdictContext): Unit = {
    // create a schema and a table
    spark.sql("DROP SCHEMA IF EXISTS caida CASCADE")
    spark.sql("CREATE SCHEMA IF NOT EXISTS caida")
    spark.sql("CREATE TABLE IF NOT EXISTS caida.sales (groupby string, metric_value string)")

    verdict.sql("BYPASS DROP TABLE IF EXISTS caida.sales_scramble")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbmeta CASCADE")

    val input_files = "s3://sketch-public/input/1m.csv"

    val caida_schema = StructType(Array(
        StructField("srcip", StringType, true),
        StructField("dstip", StringType, true),
        StructField("proto", StringType, true),
        StructField("srcport", StringType, true),
        StructField("dstport", StringType, true),
        StructField("length", StringType, true)
    ))

    val df = spark.read.format("csv")
          .option("sep", ",")
          .schema(caida_schema)
          .option("header", "false")
          .load(input_files)
    
    df.createOrReplaceTempView("dfView")

    spark.sql("INSERT INTO caida.sales (SELECT dstip as groupby, CONCAT(srcip, '|', srcport, '|', dstport, '|', length) as metric_value FROM dfView)")

    val scramble_timer = new MyTimer("scramble");
    verdict.sql("CREATE SCRAMBLE caida.sales_scramble FROM caida.sales METHOD HASH HASHCOLUMN metric_value")
    scramble_timer.stop()
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant