Skip to content

Commit

Permalink
Improve extend prefix readability
Browse files Browse the repository at this point in the history
  • Loading branch information
Feynman Liang committed Jul 28, 2015
1 parent c2caa5c commit 87fa021
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PrefixSpan private (
// Convert min support to a min number of transactions for this dataset
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong

// Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
// (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
val freqItemCounts = sequences
.flatMap(seq => seq.distinct.map(item => (item, 1L)))
.reduceByKey(_ + _)
Expand All @@ -113,8 +113,9 @@ class PrefixSpan private (
val itemSuffixPairs = {
val freqItems = freqItemCounts.keys.collect().toSet
sequences.flatMap { seq =>
val filteredSeq = seq.filter(freqItems.contains(_))
freqItems.flatMap { item =>
val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_)))
val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq)
candidateSuffix match {
case suffix if !suffix.isEmpty => Some((List(item), suffix))
case _ => None
Expand All @@ -123,7 +124,8 @@ class PrefixSpan private (
}
}

// Accumulator for the computed results to be returned
// Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
// frequent length-one prefixes)
var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))

// Remaining work to be locally and distributively processed respectfully
Expand All @@ -133,7 +135,7 @@ class PrefixSpan private (
// projected database sizes <= `maxLocalProjDBSize`)
while (pairsForDistributed.count() != 0) {
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed)
extendPrefixes(minCount, pairsForDistributed)
pairsForDistributed.unpersist()
val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs)
pairsForDistributed = largerPairsPart
Expand All @@ -151,7 +153,6 @@ class PrefixSpan private (

/**
* Partitions the prefix-suffix pairs by projected database size.
*
* @param prefixSuffixPairs prefix (length n) and suffix pairs,
* @return prefix-suffix pairs partitioned by whether their projected database size is <= or
* greater than [[maxLocalProjDBSize]]
Expand All @@ -173,44 +174,57 @@ class PrefixSpan private (
}

/**
* Get the pattern and counts, and prefix suffix pairs
* Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes
* and remaining work.
* @param minCount minimum count
* @param prefixSuffixPairs prefix (length n) and suffix pairs,
* @return pattern (length n+1) and counts, and prefix (length n+1) and suffix pairs
* (RDD[pattern, count], RDD[prefix, suffix ])
* @param prefixSuffixPairs prefix (length N) and suffix pairs,
* @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended
* prefix, corresponding suffix) pairs.
*/
private def getPatternCountsAndPrefixSuffixPairs(
private def extendPrefixes(
minCount: Long,
prefixSuffixPairs: RDD[(List[Int], Array[Int])])
: (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {
val prefixAndFrequentItemAndCounts = prefixSuffixPairs

// (length N prefix, item from suffix) pairs and their corresponding number of occurrences
// Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
val prefixItemPairAndCounts = prefixSuffixPairs
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
.reduceByKey(_ + _)
.filter(_._2 >= minCount)
val patternAndCounts = prefixAndFrequentItemAndCounts
.map { case ((prefix, item), count) => (item :: prefix, count) }
val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts

// Map from prefix to set of possible next items from suffix
val prefixToNextItems = prefixItemPairAndCounts
.keys
.groupByKey()
.mapValues(_.toSet)
.collect()
.toMap
val nextPrefixSuffixPairs = prefixSuffixPairs
.filter(x => prefixToFrequentNextItemsMap.contains(x._1))


// Frequent patterns with length N+1 and their corresponding counts
val extendedPrefixAndCounts = prefixItemPairAndCounts
.map { case ((prefix, item), count) => (item :: prefix, count) }

// Remaining work, all prefixes will have length N+1
val extendedPrefixAndSuffix = prefixSuffixPairs
.filter(x => prefixToNextItems.contains(x._1))
.flatMap { case (prefix, suffix) =>
val frequentNextItems = prefixToFrequentNextItemsMap(prefix)
val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
frequentNextItems.flatMap { item =>
val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix)
if (suffix.isEmpty) None
else Some(item :: prefix, suffix)
val frequentNextItems = prefixToNextItems(prefix)
val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
frequentNextItems.flatMap { item =>
LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
case _ => None
}
}
}
}
(patternAndCounts, nextPrefixSuffixPairs)

(extendedPrefixAndCounts, extendedPrefixAndSuffix)
}

/**
* calculate the patterns in local.
* Calculate the patterns in local.
* @param minCount the absolute minimum count
* @param data prefixes and projected sequences data data
* @return patterns
Expand Down

0 comments on commit 87fa021

Please sign in to comment.