Skip to content

Commit

Permalink
send test changes again
Browse files Browse the repository at this point in the history
  • Loading branch information
sanchez575 committed Apr 10, 2015
1 parent 6675074 commit 4058bbf
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 431 deletions.
14 changes: 7 additions & 7 deletions deploy/README.md
Expand Up @@ -9,13 +9,13 @@ http://spark.apache.org/docs/latest/ec2-scripts.html), and can be called as
either:

```shell
./sampleclean-ec2 CREDENTIALS_DIRECTORY SPARK-EC2-ARG [SPARK-EC2-ARG ...]
./sampleclean-ec2.sh CREDENTIALS_DIRECTORY SPARK-EC2-ARG [SPARK-EC2-ARG ...]
```

or:

```shell
./sampleclean-ec2 SPARK-EC2-ARG [SPARK-EC2-ARG ...]
./sampleclean-ec2.sh SPARK-EC2-ARG [SPARK-EC2-ARG ...]
```

In the latter case, `CREDENTIALS_DIRECTORY` will be set to the value of the
Expand All @@ -32,7 +32,7 @@ credentials needed for using AWS and EC2:
* (optional) The `sampleclean1.eecs.berkeley.edu-san.key` file containing the
private key for the ssl certificate.

For help with `SPARK-EC2-ARGs`, run `./sampleclean-ec2 --help`.
For help with `SPARK-EC2-ARGs`, run `./sampleclean-ec2.sh --help`.


Launching a cluster
Expand All @@ -42,11 +42,11 @@ For example, to launch a cluster named sampleclean with 8 slaves, then run the
crowd server:
```shell
# Alternatively, use a pre-saved ami with --master-ami AMI_ID
./sampleclean-ec2 ~/.ssh/aws/sampleclean/ -s 8 -t m1.large launch sampleclean
./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ -s 8 -t m1.large launch sampleclean
# ... lots of output ...
./sampleclean-ec2 ~/.ssh/aws/sampleclean/ get-master sampleclean
./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ get-master sampleclean
# ... get the master's hostname ...
./sampleclean-ec2 ~/.ssh/aws/sampleclean/ login sampleclean
./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ login sampleclean
> workon sampleclean
> cd $PROJECT_HOME
> ./run.sh -d
Expand All @@ -59,7 +59,7 @@ Running code on the cluster
To actually get code running on a cluster you've launched, you'll need to:

* ssh into the cluster:
`./sampleclean-ec2 ~/.ssh/aws/sampleclean/ login CLUSTER_NAME`
`./sampleclean-ec2.sh ~/.ssh/aws/sampleclean/ login CLUSTER_NAME`

* Copy data to the master's hdfs instance:
`/root/ephemeral-hdfs/bin/hadoop dfs -put LOCAL/PATH/TO/data HDFS/PATH/TO/data`
Expand Down
53 changes: 50 additions & 3 deletions src/test/scala/sampleclean/LocalSCContext.scala
@@ -1,18 +1,65 @@
package sampleclean

import org.apache.spark.{SparkConf, SparkContext}
import sampleclean.api.SampleCleanContext

/**
* Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
* after each test.
*/
trait LocalSCContext {
trait LocalSCContext extends Serializable{
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T): T = {
def withSampleCleanContext[T](f: SampleCleanContext => T): T = {
val conf = new SparkConf()
.set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext("local", "test", conf)
val scc = new SampleCleanContext(sc)
try {
f(sc)
f(scc)
} finally {
sc.stop()
}
}

def withSingleAttribute[T](sample:Int,f: SampleCleanContext => T): T = {
val conf = new SparkConf()
.set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext("local", "test", conf)
val scc = new SampleCleanContext(sc)
val context = List("id", "col0")
val contextString = context.mkString(" String,") + " String"

val hiveContext = scc.getHiveContext()
scc.closeHiveSession()
hiveContext.hql("DROP TABLE IF EXISTS test")
hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString))
hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dupsAttr' OVERWRITE INTO TABLE test")
scc.initializeConsistent("test", "test_sample", "id", sample)

try {
f(scc)
} finally {
sc.stop()
}
}

def withFullRecords[T](sample:Int, f: SampleCleanContext => T): T = {
val conf = new SparkConf()
.set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext("local", "test", conf)
val scc = new SampleCleanContext(sc)
val context = List("id") ++ (0 until 20).toList.map("col" + _.toString)

val contextString = context.mkString(" String,") + " String"
val hiveContext = scc.getHiveContext()
scc.closeHiveSession()
hiveContext.hql("DROP TABLE IF EXISTS test")
hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(contextString))
hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dups' OVERWRITE INTO TABLE test")
scc.initializeConsistent("test", "test_sample", "id", sample)

try {
f(scc)
} finally {
sc.stop()
}
Expand Down
99 changes: 43 additions & 56 deletions src/test/scala/sampleclean/blockerMatcherSuite.scala
Expand Up @@ -2,84 +2,71 @@ package sampleclean

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSuite
import sampleclean.api.SampleCleanContext
import sampleclean.clean.deduplication.join.{BlockerMatcherJoinSequence, BlockerMatcherSelfJoinSequence, BroadcastJoin}
import sampleclean.clean.deduplication.matcher.{AllMatcher, Matcher}
import sampleclean.clean.featurize.AnnotatedSimilarityFeaturizer.WeightedJaccardSimilarity
import sampleclean.clean.featurize.Tokenizer.DelimiterTokenizer


class blockerMatcherSuite extends FunSuite with Serializable {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("SCUnitTest")
.set("spark.driver.allowMultipleContexts","true")
val sc = new SparkContext(conf)
val scc = new SampleCleanContext(sc)


test("matcher"){

val row1 = Row("a","b","c")
val row2 = Row("d","e","f")
var m:Matcher = null
class BlockerMatcherSuite extends FunSuite with LocalSCContext {

val colNames = (0 until 20).toList.map("col" + _.toString)
val tok = new DelimiterTokenizer(" ")
val sampleTableName = "test_sample"

m = new AllMatcher(scc, "test_sample")
test("matcher") {
withSampleCleanContext { scc =>
val row1 = Row("a", "b", "c")
val row2 = Row("d", "e", "f")
var m: Matcher = null

// does not include equal rows
val cartesian = Set((row2,row1),(row1,row2))
assert(m.selfCartesianProduct(Set(row1,row1)) == List())
assert(m.selfCartesianProduct(Set(row1,row2)).toSet == cartesian)
m = new AllMatcher(scc, sampleTableName)

// does not include equal rows
val cartesian = Set((row2, row1), (row1, row2))
assert(m.selfCartesianProduct(Set(row1, row1)) == List())
assert(m.selfCartesianProduct(Set(row1, row2)).toSet == cartesian)

val candidates1: RDD[Set[Row]] = sc.parallelize(Seq(Set(row1,row2)))
val candidates2: RDD[(Row,Row)] = sc.parallelize(Seq((row1,row2)))
// TODO should they be the same?
//assert(m.matchPairs(candidates1).collect() == m.matchPairs(candidates2).collect())

// TODO asynchronous matchers
val candidates1: RDD[Set[Row]] = scc.getSparkContext().parallelize(Seq(Set(row1, row2)))
val candidates2: RDD[(Row, Row)] = scc.getSparkContext().parallelize(Seq((row1, row2)))
// TODO should they be the same?
//assert(m.matchPairs(candidates1).collect() == m.matchPairs(candidates2).collect())

// TODO asynchronous matchers
}
}

val context = (0 until 20).toList.map("col" + _.toString)
val colNames = context
val colNamesString = context.mkString(" String,") + " String"
val hiveContext = scc.getHiveContext()
scc.closeHiveSession()
hiveContext.hql("DROP TABLE IF EXISTS test")
hiveContext.hql("CREATE TABLE IF NOT EXISTS test(%s) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'".format(colNamesString))
hiveContext.hql("LOAD DATA LOCAL INPATH './src/test/resources/csvJaccard100dups' OVERWRITE INTO TABLE test")
val tok = new DelimiterTokenizer(" ")

test("self join sequence"){
scc.initializeConsistent("test", "test_sample", "col0", 1)

val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.5)
val bJoin = new BroadcastJoin(sc,blocker,false)
val rdd = scc.getFullTable("test_sample")
val matcher = new AllMatcher(scc, "test_sample")
test("self join sequence") {
withFullRecords (1,{ scc =>

val blockMatch = new BlockerMatcherSelfJoinSequence(scc, "test_sample",bJoin,List(matcher))
assert(blockMatch.blockAndMatch(rdd).count() == 100)
val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.5)
val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false)
val rdd = scc.getFullTable(sampleTableName)
val matcher = new AllMatcher(scc, sampleTableName)

}
val blockMatch = new BlockerMatcherSelfJoinSequence(scc, sampleTableName, bJoin, List(matcher))
assert(blockMatch.blockAndMatch(rdd).count() == 100)
})

}

test("join sequence"){
scc.closeHiveSession()
val (clean,dirty) = scc.initializeConsistent("test", "test_sample2", "col0", 2)
test("sample join sequence") {
withFullRecords(2, { scc =>

val blocker = new WeightedJaccardSimilarity(colNames,context,tok,0.465)
val bJoin = new BroadcastJoin(sc,blocker,false)
val rdd1 = scc.getFullTable("test_sample2")
val rdd2 = dirty
val matcher = new AllMatcher(scc, "test_sample2")
val blocker = new WeightedJaccardSimilarity(colNames, scc.getTableContext(sampleTableName), tok, 0.465)
val bJoin = new BroadcastJoin(scc.getSparkContext(), blocker, false)
val rdd1 = scc.getFullTable(sampleTableName)
val rdd2 = scc.getCleanSample(sampleTableName)
val matcher = new AllMatcher(scc, sampleTableName)

val blockMatch = new BlockerMatcherJoinSequence(scc, "test_sample2",bJoin,List(matcher))
assert(blockMatch.blockAndMatch(rdd2,rdd1).count() >= 40 * 2 + rdd2.count())
blockMatch.updateContext(context.map(x => x + x))
assert(blocker.context == context.map(x => x + x))
val blockMatch = new BlockerMatcherJoinSequence(scc, sampleTableName, bJoin, List(matcher))
assert(blockMatch.blockAndMatch(rdd2, rdd1).count() >= 40 * 2 + rdd2.count())
blockMatch.updateContext(scc.getTableContext(sampleTableName).map(x => x + x))
assert(blocker.context == scc.getTableContext(sampleTableName).map(x => x + x))
})
}
}

0 comments on commit 4058bbf

Please sign in to comment.