Skip to content

Commit

Permalink
add testing framework for larger dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchez575 committed Apr 10, 2015
1 parent 4058bbf commit 094ac53
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
17 changes: 17 additions & 0 deletions project/Build.scala
@@ -0,0 +1,17 @@
import sbt._
import sbt.Keys._


object ActivatorSparkBuild extends Build {

lazy val activatorspark = Project(
id = "Activator-Spark",
base = file("."),
settings = Defaults.coreDefaultSettings ++ Seq(
// Must run the examples and tests in separate JVMs to avoid mysterious
// scala.reflect.internal.MissingRequirementError errors. (TODO)
fork in Test := true,
// Must run Spark tests sequentially because they compete for port 4040!
parallelExecution in Test := false,
javaOptions in Test ++= Seq("-Xms1000M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")))
}
22 changes: 22 additions & 0 deletions src/test/scala/sampleclean/LocalSCContext.scala
Expand Up @@ -64,6 +64,28 @@ trait LocalSCContext extends Serializable{
sc.stop()
}
}

def withFullRecordsLarge[T](sample:Int, f: SampleCleanContext => T): T = {
val conf = new SparkConf()
.set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext("local", "test", conf)
val scc = new SampleCleanContext(sc)
val context = List("id") ++ (0 until 20).toList.map("col" + _.toString)

val contextString = context.mkString(" String,") + " String"
val hiveContext = scc.getHiveContext()
scc.closeHiveSession()
hiveContext.hql("DROP TABLE IF EXISTS test")
hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString))
hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100000dups' OVERWRITE INTO TABLE test")
scc.initializeConsistent("test", "test_sample", "id", sample)

try {
f(scc)
} finally {
sc.stop()
}
}
}


20 changes: 9 additions & 11 deletions src/test/scala/sampleclean/recordDedupSuite.scala
Expand Up @@ -2,7 +2,6 @@ package sampleclean

import org.apache.spark.SparkContext._
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSuite
import sampleclean.api.SampleCleanContext
import sampleclean.clean.algorithm.AlgorithmParameters
Expand Down Expand Up @@ -85,28 +84,27 @@ class RecordDedupSuite extends FunSuite with LocalSCContext {
scc.resetSample(sampleTableName)
val params = new AlgorithmParameters()

var t0 = System.nanoTime()
val t0 = System.nanoTime()
val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5)
val RD = new RecordDeduplication(params, scc, sampleTableName, defaultBM(scc, similarity))
RD.setTableParameters(sampleTableName)
val t01 = System.nanoTime()
val t1 = System.nanoTime()
RD.synchronousExecAndRead()
val t02 = System.nanoTime()
val t2 = System.nanoTime()
assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 100)
var t1 = System.nanoTime()

println("Exec() in algorithm lasted " + (t02 - t01).toDouble / 1000000000 + " seconds.")
println("Whole cleaning algorithm lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.")
val t3 = System.nanoTime()

val rowRDDLarge = scc.getSparkContext().textFile("./src/test/resources/csvJaccard100dups").map(x => Row.fromSeq(x.split(",", -1).toSeq))

t0 = System.nanoTime()
val t4 = System.nanoTime()
val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName).drop(2), tok, 0.5)
val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false)
assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100)
t1 = System.nanoTime()
val t5 = System.nanoTime()

println("Join lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.")
println("Exec() in algorithm lasted " + (t2 - t1).toDouble / 1000000000 + " seconds.")
println("Whole cleaning algorithm lasted " + (t3 - t0).toDouble / 1000000000 + " seconds.")
println("Join lasted " + (t5 - t4).toDouble / 1000000000 + " seconds.")
})


Expand Down

0 comments on commit 094ac53

Please sign in to comment.