From 4058bbfd433eef1f0164c217bfa3bf31fa0a12e1 Mon Sep 17 00:00:00 2001 From: Juan Manuel Sanchez Date: Thu, 9 Apr 2015 17:15:51 -0700 Subject: [PATCH] send test changes again --- deploy/README.md | 14 +- .../scala/sampleclean/LocalSCContext.scala | 53 ++- .../sampleclean/blockerMatcherSuite.scala | 99 ++--- .../sampleclean/entityResolutionSuite.scala | 163 ++++---- .../scala/sampleclean/featurizerSuite.scala | 2 +- src/test/scala/sampleclean/joinsSuite.scala | 381 +++++++++--------- .../scala/sampleclean/recordDedupSuite.scala | 168 ++++---- 7 files changed, 449 insertions(+), 431 deletions(-) diff --git a/deploy/README.md b/deploy/README.md index c0d553e..67a2549 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -9,13 +9,13 @@ http://spark.apache.org/docs/latest/ec2-scripts.html), and can be called as either: ```shell -./sampleclean-ec2 CREDENTIALS_DIRECTORY SPARK-EC2-ARG [SPARK-EC2-ARG ...] +./sampleclean-ec2.sh CREDENTIALS_DIRECTORY SPARK-EC2-ARG [SPARK-EC2-ARG ...] ``` or: ```shell -./sampleclean-ec2 SPARK-EC2-ARG [SPARK-EC2-ARG ...] +./sampleclean-ec2.sh SPARK-EC2-ARG [SPARK-EC2-ARG ...] ``` In the latter case, `CREDENTIALS_DIRECTORY` will be set to the value of the @@ -32,7 +32,7 @@ credentials needed for using AWS and EC2: * (optional) The `sampleclean1.eecs.berkeley.edu-san.key` file containing the private key for the ssl certificate. -For help with `SPARK-EC2-ARGs`, run `./sampleclean-ec2 --help`. +For help with `SPARK-EC2-ARGs`, run `./sampleclean-ec2.sh --help`. Launching a cluster @@ -42,11 +42,11 @@ For example, to launch a cluster named sampleclean with 8 slaves, then run the crowd server: ```shell # Alternatively, use a pre-saved ami with --master-ami AMI_ID -./sampleclean-ec2 ~/.ssh/aws/sampleclean/ -s 8 -t m1.large launch sampleclean +./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ -s 8 -t m1.large launch sampleclean # ... lots of output ... -./sampleclean-ec2 ~/.ssh/aws/sampleclean/ get-master sampleclean +./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ get-master sampleclean # ... get the master's hostname ... -./sampleclean-ec2 ~/.ssh/aws/sampleclean/ login sampleclean +./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ login sampleclean > workon sampleclean > cd $PROJECT_HOME > ./run.sh -d @@ -59,7 +59,7 @@ Running code on the cluster To actually get code running on a cluster you've launched, you'll need to: * ssh into the cluster: - `./sampleclean-ec2 ~/.ssh/aws/sampleclean/ login CLUSTER_NAME` + `./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ login CLUSTER_NAME` * Copy data to the master's hdfs instance: `/root/ephemeral-hdfs/bin/hadoop dfs -put LOCAL/PATH/TO/data HDFS/PATH/TO/data` diff --git a/src/test/scala/sampleclean/LocalSCContext.scala b/src/test/scala/sampleclean/LocalSCContext.scala index f4465a0..b62635f 100644 --- a/src/test/scala/sampleclean/LocalSCContext.scala +++ b/src/test/scala/sampleclean/LocalSCContext.scala @@ -1,18 +1,65 @@ package sampleclean import org.apache.spark.{SparkConf, SparkContext} +import sampleclean.api.SampleCleanContext /** * Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped * after each test. */ -trait LocalSCContext { +trait LocalSCContext extends Serializable{ /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ - def withSpark[T](f: SparkContext => T): T = { + def withSampleCleanContext[T](f: SampleCleanContext => T): T = { val conf = new SparkConf() + .set("spark.driver.allowMultipleContexts","true") val sc = new SparkContext("local", "test", conf) + val scc = new SampleCleanContext(sc) try { - f(sc) + f(scc) + } finally { + sc.stop() + } + } + + def withSingleAttribute[T](sample:Int,f: SampleCleanContext => T): T = { + val conf = new SparkConf() + .set("spark.driver.allowMultipleContexts","true") + val sc = new SparkContext("local", "test", conf) + val scc = new SampleCleanContext(sc) + val context = List("id", "col0") + val contextString = context.mkString(" String,") + " String" + + val hiveContext = scc.getHiveContext() + scc.closeHiveSession() + hiveContext.hql("DROP TABLE IF EXISTS test") + hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString)) + hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dupsAttr' OVERWRITE INTO TABLE test") + scc.initializeConsistent("test", "test_sample", "id", sample) + + try { + f(scc) + } finally { + sc.stop() + } + } + + def withFullRecords[T](sample:Int, f: SampleCleanContext => T): T = { + val conf = new SparkConf() + .set("spark.driver.allowMultipleContexts","true") + val sc = new SparkContext("local", "test", conf) + val scc = new SampleCleanContext(sc) + val context = List("id") ++ (0 until 20).toList.map("col" + _.toString) + + val contextString = context.mkString(" String,") + " String" + val hiveContext = scc.getHiveContext() + scc.closeHiveSession() + hiveContext.hql("DROP TABLE IF EXISTS test") + hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString)) + hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dups' OVERWRITE INTO TABLE test") + scc.initializeConsistent("test", "test_sample", "id", sample) + + try { + f(scc) } finally { sc.stop() } diff --git a/src/test/scala/sampleclean/blockerMatcherSuite.scala b/src/test/scala/sampleclean/blockerMatcherSuite.scala index 064b8ca..cda88f2 100644 --- a/src/test/scala/sampleclean/blockerMatcherSuite.scala +++ b/src/test/scala/sampleclean/blockerMatcherSuite.scala @@ -2,84 +2,71 @@ package sampleclean import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.FunSuite -import sampleclean.api.SampleCleanContext import sampleclean.clean.deduplication.join.{BlockerMatcherJoinSequence, BlockerMatcherSelfJoinSequence, BroadcastJoin} import sampleclean.clean.deduplication.matcher.{AllMatcher, Matcher} import sampleclean.clean.featurize.AnnotatedSimilarityFeaturizer.WeightedJaccardSimilarity import sampleclean.clean.featurize.Tokenizer.DelimiterTokenizer -class blockerMatcherSuite extends FunSuite with Serializable { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("SCUnitTest") - .set("spark.driver.allowMultipleContexts","true") - val sc = new SparkContext(conf) - val scc = new SampleCleanContext(sc) - - - test("matcher"){ - - val row1 = Row("a","b","c") - val row2 = Row("d","e","f") - var m:Matcher = null +class BlockerMatcherSuite extends FunSuite with LocalSCContext { + val colNames = (0 until 20).toList.map("col" + _.toString) + val tok = new DelimiterTokenizer(" ") + val sampleTableName = "test_sample" - m = new AllMatcher(scc, "test_sample") + test("matcher") { + withSampleCleanContext { scc => + val row1 = Row("a", "b", "c") + val row2 = Row("d", "e", "f") + var m: Matcher = null - // does not include equal rows - val cartesian = Set((row2,row1),(row1,row2)) - assert(m.selfCartesianProduct(Set(row1,row1)) == List()) - assert(m.selfCartesianProduct(Set(row1,row2)).toSet == cartesian) + m = new AllMatcher(scc, sampleTableName) + // does not include equal rows + val cartesian = Set((row2, row1), (row1, row2)) + assert(m.selfCartesianProduct(Set(row1, row1)) == List()) + assert(m.selfCartesianProduct(Set(row1, row2)).toSet == cartesian) - val candidates1: RDD[Set[Row]] = sc.parallelize(Seq(Set(row1,row2))) - val candidates2: RDD[(Row,Row)] = sc.parallelize(Seq((row1,row2))) - // TODO should they be the same? - //assert(m.matchPairs(candidates1).collect() == m.matchPairs(candidates2).collect()) - // TODO asynchronous matchers + val candidates1: RDD[Set[Row]] = scc.getSparkContext().parallelize(Seq(Set(row1, row2))) + val candidates2: RDD[(Row, Row)] = scc.getSparkContext().parallelize(Seq((row1, row2))) + // TODO should they be the same? + //assert(m.matchPairs(candidates1).collect() == m.matchPairs(candidates2).collect()) + // TODO asynchronous matchers + } } - val context = (0 until 20).toList.map("col" + _.toString) - val colNames = context - val colNamesString = context.mkString(" String,") + " String" - val hiveContext = scc.getHiveContext() - scc.closeHiveSession() - hiveContext.hql("DROP TABLE IF EXISTS test") - hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(colNamesString)) - hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dups' OVERWRITE INTO TABLE test") - val tok = new DelimiterTokenizer(" ") - test("self join sequence"){ - scc.initializeConsistent("test", "test_sample", "col0", 1) - val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5) - val bJoin = new BroadcastJoin(sc,blocker,false) - val rdd = scc.getFullTable("test_sample") - val matcher = new AllMatcher(scc, "test_sample") + test("self join sequence") { + withFullRecords (1,{ scc => - val blockMatch = new BlockerMatcherSelfJoinSequence(scc, "test_sample",bJoin,List(matcher)) - assert(blockMatch.blockAndMatch(rdd).count() == 100) + val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) + val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false) + val rdd = scc.getFullTable(sampleTableName) + val matcher = new AllMatcher(scc, sampleTableName) - } + val blockMatch = new BlockerMatcherSelfJoinSequence(scc, sampleTableName, bJoin, List(matcher)) + assert(blockMatch.blockAndMatch(rdd).count() == 100) + }) + +} - test("join sequence"){ - scc.closeHiveSession() - val (clean,dirty) = scc.initializeConsistent("test", "test_sample2", "col0", 2) + test("sample join sequence") { + withFullRecords(2, { scc => - val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.465) - val bJoin = new BroadcastJoin(sc,blocker,false) - val rdd1 = scc.getFullTable("test_sample2") - val rdd2 = dirty - val matcher = new AllMatcher(scc, "test_sample2") + val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.465) + val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false) + val rdd1 = scc.getFullTable(sampleTableName) + val rdd2 = scc.getCleanSample(sampleTableName) + val matcher = new AllMatcher(scc, sampleTableName) - val blockMatch = new BlockerMatcherJoinSequence(scc, "test_sample2",bJoin,List(matcher)) - assert(blockMatch.blockAndMatch(rdd2,rdd1).count() >= 40 * 2 + rdd2.count()) - blockMatch.updateContext(context.map(x => x + x)) - assert(blocker.context == context.map(x => x + x)) + val blockMatch = new BlockerMatcherJoinSequence(scc, sampleTableName, bJoin, List(matcher)) + assert(blockMatch.blockAndMatch(rdd2, rdd1).count() >= 40 * 2 + rdd2.count()) + blockMatch.updateContext(scc.getTableContext(sampleTableName).map(x => x + x)) + assert(blocker.context == scc.getTableContext(sampleTableName).map(x => x + x)) + }) } } diff --git a/src/test/scala/sampleclean/entityResolutionSuite.scala b/src/test/scala/sampleclean/entityResolutionSuite.scala index b0cd33b..a651df7 100644 --- a/src/test/scala/sampleclean/entityResolutionSuite.scala +++ b/src/test/scala/sampleclean/entityResolutionSuite.scala @@ -15,123 +15,116 @@ import sampleclean.clean.featurize.AnnotatedSimilarityFeaturizer.WeightedJaccard import sampleclean.clean.featurize.Tokenizer.DelimiterTokenizer -class entityResolutionSuite extends FunSuite with Serializable{ - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("SCUnitTest") - .set("spark.driver.allowMultipleContexts","true") - val sc = new SparkContext(conf) - val scc = new SampleCleanContext(sc) - val context = List("id","col0") +class EntityResolutionSuite extends FunSuite with LocalSCContext { val attr = "col0" val colNames = List(attr) - val contextString = context.mkString(" String,") + " String" - val hiveContext = scc.getHiveContext() - scc.closeHiveSession() - hiveContext.hql("DROP TABLE IF EXISTS test") - hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString)) - hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dupsAttr' OVERWRITE INTO TABLE test") - val sampleTableName = "test_sample" - scc.initializeConsistent("test", sampleTableName, "id", 1) val tok = new DelimiterTokenizer(" ") + val sampleTableName = "test_sample" + - def defaultBM(sim: AnnotatedSimilarityFeaturizer): BlockerMatcherSelfJoinSequence = { - val bJoin = new BroadcastJoin(sc,sim,false) - val matcher = new AllMatcher(scc, sampleTableName) - new BlockerMatcherSelfJoinSequence(scc, sampleTableName,bJoin,List(matcher)) + def defaultBM(scc:SampleCleanContext,sim: AnnotatedSimilarityFeaturizer): BlockerMatcherSelfJoinSequence = { + val bJoin = new BroadcastJoin(scc.getSparkContext(), sim, false) + val matcher = new AllMatcher(scc, sampleTableName) + new BlockerMatcherSelfJoinSequence(scc, sampleTableName, bJoin, List(matcher)) } - test("function validations"){ - // Initialize algorithm - val similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) - val params = new AlgorithmParameters() - params.put("attr",attr) - params.put("mergeStrategy","mostFrequent") + test("exec") { + withSingleAttribute (1,{scc => + // Initialize algorithm + val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) + val params = new AlgorithmParameters() + params.put("attr", attr) + params.put("mergeStrategy", "mostFrequent") - val ER = new EntityResolution(params,scc,sampleTableName,defaultBM(similarity)) + val ER = new EntityResolution(params, scc, sampleTableName, defaultBM(scc, similarity)) - // Within exec() function - ER.setTableParameters(sampleTableName) - assert(ER.attrCol == 3 && ER.hashCol == 0) - val sampleTableRDD = scc.getCleanSample(sampleTableName) - val attrCountGroup = sampleTableRDD.map(x => - (x(ER.attrCol).asInstanceOf[String], - x(ER.hashCol).asInstanceOf[String])). - groupByKey() - val attrCountRdd = attrCountGroup.map(x => Row(x._1, x._2.size.toLong)) - val vertexRDD = attrCountGroup.map(x => (x._1.hashCode().toLong, - (x._1, x._2.toSet))) + // Within exec() function + ER.setTableParameters(sampleTableName) + assert(ER.attrCol == 3 && ER.hashCol == 0) + val sampleTableRDD = scc.getCleanSample(sampleTableName) + val attrCountGroup = sampleTableRDD.map(x => + (x(ER.attrCol).asInstanceOf[String], + x(ER.hashCol).asInstanceOf[String])). + groupByKey() + val attrCountRdd = attrCountGroup.map(x => Row(x._1, x._2.size.toLong)) + val vertexRDD = attrCountGroup.map(x => (x._1.hashCode().toLong, + (x._1, x._2.toSet))) - ER.components.updateContext(List(attr,"count")) + ER.components.updateContext(List(attr, "count")) - val edgeRDD: RDD[(Long, Long, Double)] = scc.getSparkContext().parallelize(List()) - ER.graphXGraph = GraphXInterface.buildGraph(vertexRDD, edgeRDD) + val edgeRDD: RDD[(Long, Long, Double)] = scc.getSparkContext().parallelize(List()) + ER.graphXGraph = GraphXInterface.buildGraph(vertexRDD, edgeRDD) - assert(sampleTableRDD.count() == 201) - assert(attrCountGroup.count() == 201) - assert(attrCountRdd.count() == 201) - assert(vertexRDD.count() == 201) - assert(edgeRDD.count() == 0) + assert(sampleTableRDD.count() == 201) + assert(attrCountGroup.count() == 201) + assert(attrCountRdd.count() == 201) + assert(vertexRDD.count() == 201) + assert(edgeRDD.count() == 0) - ER.components.printPipeline() - ER.components.setOnReceiveNewMatches(ER.apply) + ER.components.printPipeline() + ER.components.setOnReceiveNewMatches(ER.apply) - val candidates = ER.components.blockAndMatch(attrCountRdd) - assert(candidates.count() == 100) - ER.apply(candidates) + val candidates = ER.components.blockAndMatch(attrCountRdd) + assert(candidates.count() == 100) + ER.apply(candidates) + + assert(scc.getCleanSampleAttr(sampleTableName, "col0").map(x => (x.getString(1), x.getString(0))).groupByKey().count() == 101) + }) - assert(scc.getCleanSampleAttr(sampleTableName,"col0").map(x => (x.getString(1),x.getString(0))).groupByKey().count() == 101) - } - test("execution validation"){ - // Initialize algorithm - scc.resetSample(sampleTableName) - val params = new AlgorithmParameters() - params.put("attr",attr) - params.put("mergeStrategy","mostFrequent") + test("api") { + withSingleAttribute (1,{ scc => + // Initialize algorithm + //scc.resetSample(sampleTableName) + val params = new AlgorithmParameters() + params.put("attr", attr) + params.put("mergeStrategy", "mostFrequent") - val similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) + val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) - val ER = new EntityResolution(params,scc,sampleTableName,defaultBM(similarity)) + val ER = new EntityResolution(params, scc, sampleTableName, defaultBM(scc, similarity)) - assert(scc.getCleanSampleAttr(sampleTableName,"col0").map(x => (x.getString(1),x.getString(0))).groupByKey().count() == 201) - ER.exec() - assert(scc.getCleanSampleAttr(sampleTableName,"col0").map(x => (x.getString(1),x.getString(0))).groupByKey().count() == 101) + assert(scc.getCleanSampleAttr(sampleTableName, "col0").map(x => (x.getString(1), x.getString(0))).groupByKey().count() == 201) + ER.exec() + assert(scc.getCleanSampleAttr(sampleTableName, "col0").map(x => (x.getString(1), x.getString(0))).groupByKey().count() == 101) + }) } - test("overhead"){ - scc.resetSample(sampleTableName) - val params = new AlgorithmParameters() - - var t0 = System.nanoTime() - params.put("attr",attr) - params.put("mergeStrategy","mostFrequent") - val similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) - val ER = new EntityResolution(params,scc,sampleTableName,defaultBM(similarity)) - val t01 = System.nanoTime() - ER.exec() - var t1 = System.nanoTime() + test("overhead") { + withSingleAttribute (1,{ scc => + //scc.resetSample(sampleTableName) + val params = new AlgorithmParameters() - println("Exec() in algorithm lasted " + (t1-t01).toDouble/1000000000 + " seconds.") - println("Whole cleaning algorithm lasted " + (t1-t0).toDouble/1000000000 + " seconds.") + var t0 = System.nanoTime() + params.put("attr", attr) + params.put("mergeStrategy", "mostFrequent") + val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName).drop(2), tok, 0.5) + val ER = new EntityResolution(params, scc, sampleTableName, defaultBM(scc, similarity)) + val t01 = System.nanoTime() + ER.exec() + var t1 = System.nanoTime() - val rowRDDLarge = sc.textFile("./src/test/resources/csvJaccard100dupsAttr").map(x => Row.fromSeq(x.split(",", -1).toSeq)) + println("Exec() in algorithm lasted " + (t1 - t01).toDouble / 1000000000 + " seconds.") + println("Whole cleaning algorithm lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.") - t0 = System.nanoTime() - val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5) - val bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - t1 = System.nanoTime() + val rowRDDLarge = scc.getSparkContext().textFile("./src/test/resources/csvJaccard100dupsAttr").map(x => Row.fromSeq(x.split(",", -1).toSeq)) - println("Join lasted " + (t1-t0).toDouble/1000000000 + " seconds.") + t0 = System.nanoTime() + val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName).drop(2), tok, 0.5) + val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + t1 = System.nanoTime() + println("Join lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.") + }) } + } diff --git a/src/test/scala/sampleclean/featurizerSuite.scala b/src/test/scala/sampleclean/featurizerSuite.scala index 9b13fac..c63a320 100644 --- a/src/test/scala/sampleclean/featurizerSuite.scala +++ b/src/test/scala/sampleclean/featurizerSuite.scala @@ -7,7 +7,7 @@ import sampleclean.clean.featurize.Tokenizer._ import sampleclean.clean.featurize._ -class featurizerSuite extends FunSuite with Serializable { +class FeaturizerSuite extends FunSuite with Serializable { test("Tokenizer"){ val str = """ a`b~c!d@e#f$g%h^i&j*k(l)m_n-o+p=q[r}s\tñu|v;w:x"y'z/aa?bb>cc Row.fromSeq(r.split(":").toSeq)) - - // 100 duplicates with Jaccard similarity = 0.5 - rowRDDLarge = sc.textFile(path + "/dirtyJaccard100dups").map(Row(_)) - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.51) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // weighted Jaccard is ~0.465 - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.466) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.465) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - - // 100 duplicates with Overlap similarity = 5 - rowRDDLarge = sc.textFile(path + "/dirtyOverlap100dups").map(Row(_)) - blocker = new WeightedOverlapSimilarity(colNames,context,tok,6) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedOverlapSimilarity(colNames,context,tok,5) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // weighted Overlap is 10 - blocker = new WeightedOverlapSimilarity(colNames,context,tok,10.1) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedOverlapSimilarity(colNames,context,tok,10) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // 100 duplicates with Dice similarity = 0.8 - rowRDDLarge = sc.textFile(path + "/dirtyDice100dups").map(Row(_)) - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.81) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.8) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // weighted Dice is ~0.776 - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.777) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.776) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // 100 duplicates with Cosine similarity = 0.5 - rowRDDLarge = sc.textFile(path + "/dirtyCosine100dups").map(Row(_)) - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.51) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.5) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // weighted Cosine is ~0.473 - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.474) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 0) - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.473) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - - // 100 duplicates with Edit (Levenshtein) similarity = 10 - rowRDDLarge = sc.textFile(path + "/dirtyEdit100dups").map(Row(_)) - blocker = new EditBlocking(colNames,context,tok,9) - bJoin = new PassJoin(sc,blocker) - val p = bJoin.join(rowRDDLarge,rowRDDLarge) - assert(p.count() == 0) - blocker = new EditBlocking(colNames,context,tok,10) - bJoin = new PassJoin(sc,blocker) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) + withSampleCleanContext { scc => + val sc = scc.getSparkContext() + val context = List("record") + val colNames = List("record") + val tok = new DelimiterTokenizer(" ") + val path = "./src/test/resources" + var blocker: AnnotatedSimilarityFeaturizer = null + var bJoin: SimilarityJoin = null + var rowRDDLarge: RDD[Row] = null + //rowRDDLarge = sc.textFile(path + "/dirtyJaccard100dups").map(r => Row.fromSeq(r.split(":").toSeq)) + + // 100 duplicates with Jaccard similarity = 0.5 + rowRDDLarge = sc.textFile(path + "/dirtyJaccard100dups").map(Row(_)) + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.51) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.5) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // weighted Jaccard is ~0.465 + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.466) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.465) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + + // 100 duplicates with Overlap similarity = 5 + rowRDDLarge = sc.textFile(path + "/dirtyOverlap100dups").map(Row(_)) + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 6) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 5) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // weighted Overlap is 10 + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 10.1) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 10) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // 100 duplicates with Dice similarity = 0.8 + rowRDDLarge = sc.textFile(path + "/dirtyDice100dups").map(Row(_)) + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.81) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.8) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // weighted Dice is ~0.776 + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.777) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.776) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // 100 duplicates with Cosine similarity = 0.5 + rowRDDLarge = sc.textFile(path + "/dirtyCosine100dups").map(Row(_)) + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.51) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.5) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // weighted Cosine is ~0.473 + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.474) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 0) + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.473) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + + // 100 duplicates with Edit (Levenshtein) similarity = 10 + rowRDDLarge = sc.textFile(path + "/dirtyEdit100dups").map(Row(_)) + blocker = new EditBlocking(colNames, context, tok, 9) + bJoin = new PassJoin(sc, blocker) + val p = bJoin.join(rowRDDLarge, rowRDDLarge) + assert(p.count() == 0) + blocker = new EditBlocking(colNames, context, tok, 10) + bJoin = new PassJoin(sc, blocker) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + } } test("broadcast sample join accuracy"){ - val context = List("record") - val colNames = List("record") - val tok = new DelimiterTokenizer(" ") - val path = "./src/test/resources" - var blocker: AnnotatedSimilarityFeaturizer = null - var bJoin: SimilarityJoin = null - var rowRDDLarge: RDD[Row] = null - var rowRDDSmall: RDD[Row] = null - - - // 100 duplicates with Jaccard similarity = 0.5 - rowRDDLarge = sc.textFile(path + "/dirtyJaccard100dups").map(Row(_)) - rowRDDSmall = rowRDDLarge.sample(false,0.5).cache() - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.51) - bJoin = new BroadcastJoin(sc,blocker,false) - // returns 0 dups + original pairs - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5) - bJoin = new BroadcastJoin(sc,blocker,false) - // returns ~50 dups * 2 + original pairs - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - // weighted Jaccard is ~0.465 - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.466) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.465) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - - // 100 duplicates with Overlap similarity = 5 - rowRDDLarge = sc.textFile(path + "/dirtyOverlap100dups").map(Row(_)) - rowRDDSmall = rowRDDLarge.sample(false,0.5).cache() - blocker = new WeightedOverlapSimilarity(colNames,context,tok,6) - bJoin = new BroadcastJoin(sc,blocker,false) - // about half of self-pairs won't be considered duplicates - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= (rowRDDSmall.count()/2 - 8)) - blocker = new WeightedOverlapSimilarity(colNames,context,tok,5) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()/2) - - // weighted Overlap is 10 - blocker = new WeightedOverlapSimilarity(colNames,context,tok,10.1) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= (rowRDDSmall.count()/2 - 8)) - blocker = new WeightedOverlapSimilarity(colNames,context,tok,10) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()/2) - - // 100 duplicates with Dice similarity = 0.8 - rowRDDLarge = sc.textFile(path + "/dirtyDice100dups").map(Row(_)) - rowRDDSmall = rowRDDLarge.sample(false,0.5).cache() - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.81) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.8) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - // weighted Dice is ~0.776 - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.777) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedDiceSimilarity(colNames,context,tok,0.776) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - // 100 duplicates with Cosine similarity = 0.5 - rowRDDLarge = sc.textFile(path + "/dirtyCosine100dups").map(Row(_)) - rowRDDSmall = rowRDDLarge.sample(false,0.5).cache() - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.51) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.5) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - // weighted Cosine is ~0.473 - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.474) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new WeightedCosineSimilarity(colNames,context,tok,0.473) - bJoin = new BroadcastJoin(sc,blocker,true) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) - - // 100 duplicates with Edit (Levenshtein) similarity = 10 - rowRDDLarge = sc.textFile(path + "/dirtyEdit100dups").map(Row(_)) - rowRDDSmall = rowRDDLarge.sample(false,0.5).cache() - blocker = new EditBlocking(colNames,context,tok,9) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() == rowRDDSmall.count()) - blocker = new EditBlocking(colNames,context,tok,10) - bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDSmall,rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + withSampleCleanContext { scc => + val sc = scc.getSparkContext() + val context = List("record") + val colNames = List("record") + val tok = new DelimiterTokenizer(" ") + val path = "./src/test/resources" + var blocker: AnnotatedSimilarityFeaturizer = null + var bJoin: SimilarityJoin = null + var rowRDDLarge: RDD[Row] = null + var rowRDDSmall: RDD[Row] = null + + + // 100 duplicates with Jaccard similarity = 0.5 + rowRDDLarge = sc.textFile(path + "/dirtyJaccard100dups").map(Row(_)) + rowRDDSmall = rowRDDLarge.sample(false, 0.5).cache() + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.51) + bJoin = new BroadcastJoin(sc, blocker, false) + // returns 0 dups + original pairs + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.5) + bJoin = new BroadcastJoin(sc, blocker, false) + // returns ~50 dups * 2 + original pairs + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + // weighted Jaccard is ~0.465 + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.466) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedJaccardSimilarity(colNames, context, tok, 0.465) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + + // 100 duplicates with Overlap similarity = 5 + rowRDDLarge = sc.textFile(path + "/dirtyOverlap100dups").map(Row(_)) + rowRDDSmall = rowRDDLarge.sample(false, 0.5).cache() + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 6) + bJoin = new BroadcastJoin(sc, blocker, false) + // about half of self-pairs won't be considered duplicates + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= (rowRDDSmall.count() / 2 - 8)) + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 5) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count() / 2) + + // weighted Overlap is 10 + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 10.1) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= (rowRDDSmall.count() / 2 - 8)) + blocker = new WeightedOverlapSimilarity(colNames, context, tok, 10) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count() / 2) + + // 100 duplicates with Dice similarity = 0.8 + rowRDDLarge = sc.textFile(path + "/dirtyDice100dups").map(Row(_)) + rowRDDSmall = rowRDDLarge.sample(false, 0.5).cache() + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.81) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.8) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + // weighted Dice is ~0.776 + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.777) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedDiceSimilarity(colNames, context, tok, 0.776) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + // 100 duplicates with Cosine similarity = 0.5 + rowRDDLarge = sc.textFile(path + "/dirtyCosine100dups").map(Row(_)) + rowRDDSmall = rowRDDLarge.sample(false, 0.5).cache() + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.51) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.5) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + // weighted Cosine is ~0.473 + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.474) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new WeightedCosineSimilarity(colNames, context, tok, 0.473) + bJoin = new BroadcastJoin(sc, blocker, true) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + + // 100 duplicates with Edit (Levenshtein) similarity = 10 + rowRDDLarge = sc.textFile(path + "/dirtyEdit100dups").map(Row(_)) + rowRDDSmall = rowRDDLarge.sample(false, 0.5).cache() + blocker = new EditBlocking(colNames, context, tok, 9) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() == rowRDDSmall.count()) + blocker = new EditBlocking(colNames, context, tok, 10) + bJoin = new BroadcastJoin(sc, blocker, false) + assert(bJoin.join(rowRDDSmall, rowRDDLarge).count() >= 40 * 2 + rowRDDSmall.count()) + } } diff --git a/src/test/scala/sampleclean/recordDedupSuite.scala b/src/test/scala/sampleclean/recordDedupSuite.scala index c402641..85f8587 100644 --- a/src/test/scala/sampleclean/recordDedupSuite.scala +++ b/src/test/scala/sampleclean/recordDedupSuite.scala @@ -14,106 +14,100 @@ import sampleclean.clean.featurize.AnnotatedSimilarityFeaturizer.WeightedJaccard import sampleclean.clean.featurize.Tokenizer.DelimiterTokenizer -class recordDedupSuite extends FunSuite with Serializable { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName("SCUnitTest") - .set("spark.driver.allowMultipleContexts","true") - val sc = new SparkContext(conf) - val scc = new SampleCleanContext(sc) - val context = List("id") ++ (0 until 20).toList.map("col" + _.toString) - val colNames = context.drop(1) - val contextString = context.mkString(" String,") + " String" - val hiveContext = scc.getHiveContext() - scc.closeHiveSession() - hiveContext.hql("DROP TABLE IF EXISTS test") - hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString)) - hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dups' OVERWRITE INTO TABLE test") +class RecordDedupSuite extends FunSuite with LocalSCContext { + + val colNames = (0 until 20).toList.map("col" + _.toString) val sampleTableName = "test_sample" - scc.initializeConsistent("test", sampleTableName, "id", 1) val tok = new DelimiterTokenizer(" ") - def defaultBM(sim: AnnotatedSimilarityFeaturizer): BlockerMatcherJoinSequence = { - val bJoin = new BroadcastJoin(sc,sim,false) + def defaultBM(scc: SampleCleanContext,sim: AnnotatedSimilarityFeaturizer): BlockerMatcherJoinSequence = { + val bJoin = new BroadcastJoin(scc.getSparkContext(),sim,false) val matcher = new AllMatcher(scc, sampleTableName) new BlockerMatcherJoinSequence(scc, sampleTableName,bJoin,List(matcher)) } - test("function validations"){ - // Initialize algorithm - val similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) - val blockerMatcher = defaultBM(similarity) - val params = new AlgorithmParameters() - - val RD = new RecordDeduplication(params,scc,sampleTableName,blockerMatcher) - RD.setTableParameters(sampleTableName) - assert(RD.hashCol == 0) - - // Within exec() function - val sampleTableRDD = scc.getCleanSample(sampleTableName) - val fullTableRDD = scc.getFullTable(sampleTableName) - assert(sampleTableRDD.count() == 201 && fullTableRDD.count() == 201) - - val filteredPairs = blockerMatcher.blockAndMatch(sampleTableRDD,fullTableRDD) - assert(filteredPairs.count() == 100) - - val dupCounts = filteredPairs.map{case (fullRow, sampleRow) => - (sampleRow(RD.hashCol).asInstanceOf[String],1)} - .reduceByKey(_ + _) - .map(x => (x._1,x._2+1)) - - scc.updateTableDuplicateCounts(sampleTableName, dupCounts) - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 100) + test("exec"){ + withFullRecords (1,{ scc => + // Initialize algorithm + val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) + val blockerMatcher = defaultBM(scc, similarity) + val params = new AlgorithmParameters() + + val RD = new RecordDeduplication(params, scc, sampleTableName, blockerMatcher) + RD.setTableParameters(sampleTableName) + assert(RD.hashCol == 0) + + // Within exec() function + val sampleTableRDD = scc.getCleanSample(sampleTableName) + val fullTableRDD = scc.getFullTable(sampleTableName) + assert(sampleTableRDD.count() == 201 && fullTableRDD.count() == 201) + + val filteredPairs = blockerMatcher.blockAndMatch(sampleTableRDD, fullTableRDD) + assert(filteredPairs.count() == 100) + + val dupCounts = filteredPairs.map { case (fullRow, sampleRow) => + (sampleRow(RD.hashCol).asInstanceOf[String], 1) + } + .reduceByKey(_ + _) + .map(x => (x._1, x._2 + 1)) + + scc.updateTableDuplicateCounts(sampleTableName, dupCounts) + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 100) + }) } - test("execution validation"){ - // Initialize algorithm - scc.resetSample(sampleTableName) - val params = new AlgorithmParameters() - - var similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) - var RD = new RecordDeduplication(params,scc,sampleTableName,defaultBM(similarity)) - RD.setTableParameters(sampleTableName) - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 0) - RD.synchronousExecAndRead() - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 100) - - scc.resetSample(sampleTableName) - similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.51) - RD = new RecordDeduplication(params,scc,sampleTableName,defaultBM(similarity)) - RD.setTableParameters(sampleTableName) - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 0) - RD.synchronousExecAndRead() - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 0) + test("api"){ + withFullRecords (1,{ scc => + // Initialize algorithm + scc.resetSample(sampleTableName) + val params = new AlgorithmParameters() + + var similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) + var RD = new RecordDeduplication(params, scc, sampleTableName, defaultBM(scc, similarity)) + RD.setTableParameters(sampleTableName) + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 0) + RD.synchronousExecAndRead() + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 100) + + scc.resetSample(sampleTableName) + similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.51) + RD = new RecordDeduplication(params, scc, sampleTableName, defaultBM(scc, similarity)) + RD.setTableParameters(sampleTableName) + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 0) + RD.synchronousExecAndRead() + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 0) + }) } test("overhead"){ - scc.resetSample(sampleTableName) - val params = new AlgorithmParameters() - - var t0 = System.nanoTime() - val similarity = new WeightedJaccardSimilarity(colNames,scc.getTableContext(sampleTableName),tok,0.5) - val RD = new RecordDeduplication(params,scc,sampleTableName,defaultBM(similarity)) - RD.setTableParameters(sampleTableName) - val t01 = System.nanoTime() - RD.synchronousExecAndRead() - val t02 = System.nanoTime() - assert(scc.getCleanSampleAttr(sampleTableName,"dup").filter(x => x.getInt(1) > 1).count() == 100) - var t1 = System.nanoTime() - - println("Exec() in algorithm lasted " + (t02-t01).toDouble/1000000000 + " seconds.") - println("Whole cleaning algorithm lasted " + (t1-t0).toDouble/1000000000 + " seconds.") - - val rowRDDLarge = sc.textFile("./src/test/resources/csvJaccard100dups").map(x => Row.fromSeq(x.split(",", -1).toSeq)) - - t0 = System.nanoTime() - val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5) - val bJoin = new BroadcastJoin(sc,blocker,false) - assert(bJoin.join(rowRDDLarge,rowRDDLarge).count() == 100) - t1 = System.nanoTime() - - println("Join lasted " + (t1-t0).toDouble/1000000000 + " seconds.") + withFullRecords (1,{ scc => + scc.resetSample(sampleTableName) + val params = new AlgorithmParameters() + + var t0 = System.nanoTime() + val similarity = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5) + val RD = new RecordDeduplication(params, scc, sampleTableName, defaultBM(scc, similarity)) + RD.setTableParameters(sampleTableName) + val t01 = System.nanoTime() + RD.synchronousExecAndRead() + val t02 = System.nanoTime() + assert(scc.getCleanSampleAttr(sampleTableName, "dup").filter(x => x.getInt(1) > 1).count() == 100) + var t1 = System.nanoTime() + + println("Exec() in algorithm lasted " + (t02 - t01).toDouble / 1000000000 + " seconds.") + println("Whole cleaning algorithm lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.") + + val rowRDDLarge = scc.getSparkContext().textFile("./src/test/resources/csvJaccard100dups").map(x => Row.fromSeq(x.split(",", -1).toSeq)) + + t0 = System.nanoTime() + val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName).drop(2), tok, 0.5) + val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false) + assert(bJoin.join(rowRDDLarge, rowRDDLarge).count() == 100) + t1 = System.nanoTime() + + println("Join lasted " + (t1 - t0).toDouble / 1000000000 + " seconds.") + })