Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix: issue 150 #151

Merged
merged 8 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ You can link against this library (for Spark 1.4+) in your program at the follow
Using SBT:

```
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.4"
libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.3.5"
```

Using Maven:
Expand All @@ -57,15 +57,15 @@ Using Maven:
<dependency>
<groupId>org.zouzias</groupId>
<artifactId>spark-lucenerdd_2.11</artifactId>
<version>0.3.4</version>
<version>0.3.5</version>
</dependency>
```

This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
For example, to include it when starting the spark shell:

```
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.4
$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.11:0.3.5
```

Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
Expand All @@ -76,7 +76,8 @@ The project has the following compatibility with Apache Spark:

Artifact | Release Date | Spark compatibility | Notes | Status
------------------------- | --------------- | -------------------------- | ----- | ----
0.3.5-SNAPSHOT | | >= 2.3.1, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
0.3.6-SNAPSHOT | | >= 2.3.1, JVM 8 | [develop](https://github.com/zouzias/spark-lucenerdd/tree/develop) | Under Development
0.3.5 | 2019-02-7 | >= 2.4.0, JVM 8 | [tag v0.3.5](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.5) | Released
0.3.4 | 2018-11-27 | >= 2.4.0, JVM 8 | [tag v0.3.4](https://github.com/zouzias/spark-lucenerdd/tree/v0.3.4) | Released
0.2.8 | 2017-05-30 | 2.1.x, JVM 7 | [tag v0.2.8](https://github.com/zouzias/spark-lucenerdd/tree/v0.2.8) | Released
0.1.0 | 2016-09-26 | 1.4.x, 1.5.x, 1.6.x| [tag v0.1.0](https://github.com/zouzias/spark-lucenerdd/tree/v0.1.0) | Cross-released with 2.10/2.11
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -496,17 +496,18 @@ object LuceneRDD extends Versionable
"Query Partition columns must be non-empty for block linkage")


val partColumn = "__PARTITION_COLUMN__"
val partColumnLeft = "__PARTITION_COLUMN_LEFT__"
val partColumnRight = "__PARTITION_COLUMN_RIGHT__"

// Prepare input DataFrames for cogroup operation.
// Keyed them on queryPartColumns and entityPartColumns
// I.e., Query/Entity DataFrame are now of type (String, Row)
val blocked = entities.withColumn(partColumn,
concat(entityPartColumns.map(entities.col): _*))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
val blockedQueries = queries.withColumn(partColumn,
val blocked = entities.withColumn(partColumnLeft,
concat(entityPartColumns.map(entities.col): _*))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumn)))
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnLeft)))
val blockedQueries = queries.withColumn(partColumnRight,
concat(queryPartColumns.map(queries.col): _*)).drop(queryPartColumns: _*)
.rdd.keyBy(x => x.getString(x.fieldIndex(partColumnRight)))

// Cogroup queries and entities. Map over each
// CoGrouped partition and instantiate Lucene index on partitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,21 @@ class BlockingLinkageSpec extends FlatSpec
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val people: Array[Person] = Array("fear", "death", "water", "fire", "house")
val peopleLeft: Array[Person] = Array("fear", "death", "water", "fire", "house")
.zipWithIndex.map { case (str, index) =>
val email = if (index % 2 == 0) "yes@gmail.com" else "no@gmail.com"
Person(str, index, email)
}
val df = sc.parallelize(people).repartition(2).toDF()

val peopleRight: Array[Person] = Array("fear", "death", "water", "fire", "house")
.zipWithIndex.map { case (str, index) =>
val email = if (index % 2 == 0) "yes@gmail.com" else "no@gmail.com"
Person(str, index, email)
}

val leftDF = sc.parallelize(peopleLeft).repartition(2).toDF()
val rightDF = sc.parallelize(peopleRight).repartition(3).toDF()


val linker: Row => String = { row =>
val name = row.getString(row.fieldIndex("name"))
Expand All @@ -51,10 +60,10 @@ class BlockingLinkageSpec extends FlatSpec
}


val linked = LuceneRDD.blockEntityLinkage(df, df, linker,
val linked = LuceneRDD.blockEntityLinkage(leftDF, rightDF, linker,
Array("email"), Array("email"))

val linkedCount, dfCount = (linked.count, df.count())
val linkedCount, dfCount = (linked.count, leftDF.count())

linkedCount should equal(dfCount)

Expand Down