Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greatly improved linking performance #400

Open
robvadai opened this issue Jun 23, 2022 · 0 comments
Open

Greatly improved linking performance #400

robvadai opened this issue Jun 23, 2022 · 0 comments
Labels
Hacktoberfest https://hacktoberfest.digitalocean.com/

Comments

@robvadai
Copy link

robvadai commented Jun 23, 2022

Is your feature request related to a problem? Please describe.
Linking performance on large datasets is not terrible but I was sure we could do better. In my case, 20M queries to be performed on 3M Lucene indexed records. It took 30 mins using 4 m4.4xlarge EC2 instances using AWS EMR.

Describe the solution you'd like
I have an RDD with record IDs that I can relate back to my record metadata. All I care about is a record ID and the Lucene query for every single record ID, that will link that record to a Lucene document (ie find a match).

Describe alternatives you've considered
ElasticSearch (given it's Lucene based) but I like the ephemeral nature of Spark jobs, ie no database maintenance, support and overhead.

Implementation

This alternative link method implementation reduced the above runtime to 6 mins from 30 (!).

  def link2[T1: ClassTag](other: RDD[(String, T1)], topK: Int = DefaultTopK)
  : RDD[(Iterable[T1], List[Row])] = {

    val topKMonoid = new TopKMonoid[Row](topK)(SparkScoreDoc.descending)

    val collectedQueries = other.groupByKey().collect()
    val queriesB = partitionsRDD.context.broadcast(collectedQueries)

    val resultsByPart = partitionsRDD.mapPartitions { partitions =>
      partitions.flatMap { partition =>
        queriesB.value.map { case (qr, identifiers) =>
          (identifiers, topKMonoid.build(partition.query(qr, topK)))
        }
      }
    }

    resultsByPart.reduceByKey(topKMonoid.plus).mapValues(_.items)
  }

Basically our 'other' RDD is now a pair of Lucene query strings and whatever metadata, in my case just simply Long values (IDs of some records that are stored elsewhere).

Given many records can share the same Lucene query string, what you get back is a pair RDD, where left side has all the metadata's as Iterable (again, in my case, just a collection of record IDs) and the right side is the matches.

Basically this makes the developer to come up with identifiers so no need to use .zipWithIndex: https://github.com/zouzias/spark-lucenerdd/blob/develop/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala#L217

This is early day implementation but please consider it adding it to the codebase.

@zouzias zouzias added the Hacktoberfest https://hacktoberfest.digitalocean.com/ label Oct 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Hacktoberfest https://hacktoberfest.digitalocean.com/
Projects
None yet
Development

No branches or pull requests

2 participants