Skip to content

Commit

Permalink
Collect small patterns to local
Browse files Browse the repository at this point in the history
  • Loading branch information
Feynman Liang committed Jul 30, 2015
1 parent 4ddf479 commit a61943d
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class PrefixSpan private (
}

// Process the small projected databases locally
val remainingResults = getPatternsInLocal(minCount, pairsForLocal.groupByKey())
val remainingResults = getPatternsInLocal(
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())

(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
.map { case (pattern, count) => (pattern.toArray, count) }
Expand All @@ -163,7 +164,7 @@ class PrefixSpan private (
* greater than [[maxLocalProjDBSize]]
*/
private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])])
: (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
: (Array[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
val prefixToSuffixSize = prefixSuffixPairs
.aggregateByKey(0)(
seqOp = { case (count, suffix) => count + suffix.length },
Expand All @@ -175,7 +176,7 @@ class PrefixSpan private (
.toSet
val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) }
val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) }
(small, large)
(small.collect(), large)
}

/**
Expand Down

0 comments on commit a61943d

Please sign in to comment.