diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 899078a759f31..e6752332cdeeb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -149,7 +149,8 @@ class PrefixSpan private ( } // Process the small projected databases locally - val remainingResults = getPatternsInLocal(minCount, pairsForLocal.groupByKey()) + val remainingResults = getPatternsInLocal( + minCount, sc.parallelize(pairsForLocal, 1).groupByKey()) (sc.parallelize(resultsAccumulator, 1) ++ remainingResults) .map { case (pattern, count) => (pattern.toArray, count) } @@ -163,7 +164,7 @@ class PrefixSpan private ( * greater than [[maxLocalProjDBSize]] */ private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])]) - : (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { + : (Array[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, @@ -175,7 +176,7 @@ class PrefixSpan private ( .toSet val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) } - (small, large) + (small.collect(), large) } /**