Skip to content

Commit

Permalink
remove minPatternsBeforeLocalProcessing, add maxSuffixesBeforeLocalPr…
Browse files Browse the repository at this point in the history
…ocessing.
  • Loading branch information
zhangjiajin committed Jul 18, 2015
1 parent b07e20c commit d2250b7
Showing 1 changed file with 46 additions and 9 deletions.
55 changes: 46 additions & 9 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PrefixSpan private (
private var minSupport: Double,
private var maxPatternLength: Int) extends Logging with Serializable {

private val minPatternsBeforeLocalProcessing: Int = 20
private val maxSuffixesBeforeLocalProcessing: Long = 10000

/**
* Constructs a default instance with default parameters
Expand Down Expand Up @@ -91,20 +91,25 @@ class PrefixSpan private (
lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
var patternsCount: Long = lengthOnePatternsAndCounts.count()
var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2))
var currentPrefixSuffixPairs = prefixSuffixPairs
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
splitPrefixSuffixPairs(prefixSuffixPairs)
largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
var patternLength: Int = 1
while (patternLength < maxPatternLength &&
patternsCount <= minPatternsBeforeLocalProcessing &&
currentPrefixSuffixPairs.count() != 0) {
largePrefixSuffixPairs.count() != 0) {
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
getPatternCountsAndPrefixSuffixPairs(minCount, currentPrefixSuffixPairs)
getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
patternsCount = nextPatternAndCounts.count()
currentPrefixSuffixPairs = nextPrefixSuffixPairs
largePrefixSuffixPairs.unpersist()
val splitedPrefixSuffixPairs = splitPrefixSuffixPairs(nextPrefixSuffixPairs)
largePrefixSuffixPairs = splitedPrefixSuffixPairs._2
largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ splitedPrefixSuffixPairs._1
allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts
patternLength = patternLength + 1
}
if (patternLength < maxPatternLength && patternsCount > 0) {
val projectedDatabase = currentPrefixSuffixPairs
if (smallPrefixSuffixPairs.count() > 0) {
val projectedDatabase = smallPrefixSuffixPairs
.map(x => (x._1.toSeq, x._2))
.groupByKey()
.map(x => (x._1.toArray, x._2.toArray))
Expand All @@ -114,6 +119,38 @@ class PrefixSpan private (
allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) }
}


/**
* Split prefix suffix pairs to two parts:
* suffixes' size less than maxSuffixesBeforeLocalProcessing and
* suffixes' size more than maxSuffixesBeforeLocalProcessing
* @param prefixSuffixPairs prefix (length n) and suffix pairs,
* @return small size prefix suffix pairs and big size prefix suffix pairs
* (RDD[prefix, suffix], RDD[prefix, suffix ])
*/
private def splitPrefixSuffixPairs(
prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]):
(RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = {
val suffixSizeMap = prefixSuffixPairs
.map(x => (x._1, x._2.length))
.reduceByKey(_ + _)
.map(x => (x._2 <= maxSuffixesBeforeLocalProcessing, Set(x._1)))
.reduceByKey(_ ++ _)
.collect
.toMap
val small = if (suffixSizeMap.contains(true)) {
prefixSuffixPairs.filter(x => suffixSizeMap(true).contains(x._1))
} else {
prefixSuffixPairs.filter(x => false)
}
val large = if (suffixSizeMap.contains(false)) {
prefixSuffixPairs.filter(x => suffixSizeMap(false).contains(x._1))
} else {
prefixSuffixPairs.filter(x => false)
}
(small, large)
}

/**
* Get the pattern and counts, and prefix suffix pairs
* @param minCount minimum count
Expand Down Expand Up @@ -205,7 +242,7 @@ class PrefixSpan private (
data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = {
data.flatMap {
case (prefix, projDB) =>
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB)
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
.map { case (pattern: List[Int], count: Long) =>
(pattern.toArray.reverse.to[ArrayBuffer], count)
}
Expand Down

0 comments on commit d2250b7

Please sign in to comment.