From 87fa021afaa184dfbf7eafcae0beb494697c40e2 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 16:19:48 -0700 Subject: [PATCH] Improve extend prefix readability --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 8a15a867910a2..5b8da9665366b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -103,7 +103,7 @@ class PrefixSpan private ( // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold + // (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold val freqItemCounts = sequences .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) @@ -113,8 +113,9 @@ class PrefixSpan private ( val itemSuffixPairs = { val freqItems = freqItemCounts.keys.collect().toSet sequences.flatMap { seq => + val filteredSeq = seq.filter(freqItems.contains(_)) freqItems.flatMap { item => - val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_))) + val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq) candidateSuffix match { case suffix if !suffix.isEmpty => Some((List(item), suffix)) case _ => None @@ -123,7 +124,8 @@ class PrefixSpan private ( } } - // Accumulator for the computed results to be returned + // Accumulator for the computed results to be returned, initialized to the frequent items (i.e. + // frequent length-one prefixes) var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) // Remaining work to be locally and distributively processed respectfully @@ -133,7 +135,7 @@ class PrefixSpan private ( // projected database sizes <= `maxLocalProjDBSize`) while (pairsForDistributed.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed) + extendPrefixes(minCount, pairsForDistributed) pairsForDistributed.unpersist() val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) pairsForDistributed = largerPairsPart @@ -151,7 +153,6 @@ class PrefixSpan private ( /** * Partitions the prefix-suffix pairs by projected database size. - * * @param prefixSuffixPairs prefix (length n) and suffix pairs, * @return prefix-suffix pairs partitioned by whether their projected database size is <= or * greater than [[maxLocalProjDBSize]] @@ -173,44 +174,57 @@ class PrefixSpan private ( } /** - * Get the pattern and counts, and prefix suffix pairs + * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes + * and remaining work. * @param minCount minimum count - * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return pattern (length n+1) and counts, and prefix (length n+1) and suffix pairs - * (RDD[pattern, count], RDD[prefix, suffix ]) + * @param prefixSuffixPairs prefix (length N) and suffix pairs, + * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended + * prefix, corresponding suffix) pairs. */ - private def getPatternCountsAndPrefixSuffixPairs( + private def extendPrefixes( minCount: Long, prefixSuffixPairs: RDD[(List[Int], Array[Int])]) : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { - val prefixAndFrequentItemAndCounts = prefixSuffixPairs + + // (length N prefix, item from suffix) pairs and their corresponding number of occurrences + // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport` + val prefixItemPairAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) .filter(_._2 >= minCount) - val patternAndCounts = prefixAndFrequentItemAndCounts - .map { case ((prefix, item), count) => (item :: prefix, count) } - val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts + + // Map from prefix to set of possible next items from suffix + val prefixToNextItems = prefixItemPairAndCounts .keys .groupByKey() .mapValues(_.toSet) .collect() .toMap - val nextPrefixSuffixPairs = prefixSuffixPairs - .filter(x => prefixToFrequentNextItemsMap.contains(x._1)) + + + // Frequent patterns with length N+1 and their corresponding counts + val extendedPrefixAndCounts = prefixItemPairAndCounts + .map { case ((prefix, item), count) => (item :: prefix, count) } + + // Remaining work, all prefixes will have length N+1 + val extendedPrefixAndSuffix = prefixSuffixPairs + .filter(x => prefixToNextItems.contains(x._1)) .flatMap { case (prefix, suffix) => - val frequentNextItems = prefixToFrequentNextItemsMap(prefix) - val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) - frequentNextItems.flatMap { item => - val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) - if (suffix.isEmpty) None - else Some(item :: prefix, suffix) + val frequentNextItems = prefixToNextItems(prefix) + val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) + frequentNextItems.flatMap { item => + LocalPrefixSpan.getSuffix(item, filteredSuffix) match { + case suffix if !suffix.isEmpty => Some(item :: prefix, suffix) + case _ => None + } + } } - } - (patternAndCounts, nextPrefixSuffixPairs) + + (extendedPrefixAndCounts, extendedPrefixAndSuffix) } /** - * calculate the patterns in local. + * Calculate the patterns in local. * @param minCount the absolute minimum count * @param data prefixes and projected sequences data data * @return patterns