From 6e149fa3bd88a2347e635f03ab9ae5913e03beee Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:36:36 -0700 Subject: [PATCH 1/7] Fix splitPrefixSuffixPairs --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 30 ++++++++----------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 21 ++++++------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index cbb514d467b4b..b70ff9815adc8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -58,7 +58,7 @@ class PrefixSpan private ( */ def setMinSupport(minSupport: Double): this.type = { require(minSupport >= 0 && minSupport <= 1, - "The minimum support value must be between 0 and 1, including 0 and 1.") + "The minimum support value must be in [0, 1].") this.minSupport = minSupport this } @@ -126,23 +126,17 @@ class PrefixSpan private ( private def splitPrefixSuffixPairs( prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = { - val suffixSizeMap = prefixSuffixPairs - .map(x => (x._1, x._2.length)) - .reduceByKey(_ + _) - .map(x => (x._2 <= maxProjectedDBSizeBeforeLocalProcessing, Set(x._1))) - .reduceByKey(_ ++ _) - .collect - .toMap - val small = if (suffixSizeMap.contains(true)) { - prefixSuffixPairs.filter(x => suffixSizeMap(true).contains(x._1)) - } else { - prefixSuffixPairs.filter(x => false) - } - val large = if (suffixSizeMap.contains(false)) { - prefixSuffixPairs.filter(x => suffixSizeMap(false).contains(x._1)) - } else { - prefixSuffixPairs.filter(x => false) - } + val prefixToSuffixSize = prefixSuffixPairs + .aggregateByKey(0)( + seqOp = { case (count, suffix) => count + suffix.length }, + combOp = { _ + _ }) + val smallPrefixes = prefixToSuffixSize + .filter(_._2 <= maxProjectedDBSizeBeforeLocalProcessing) + .map(_._1) + .collect() + .toSet + val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } + val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) } (small, large) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 9f107c89f6d80..6dd2dc926acc5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -44,13 +44,6 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val rdd = sc.parallelize(sequences, 2).cache() - def compareResult( - expectedValue: Array[(Array[Int], Long)], - actualValue: Array[(Array[Int], Long)]): Boolean = { - expectedValue.map(x => (x._1.toSeq, x._2)).toSet == - actualValue.map(x => (x._1.toSeq, x._2)).toSet - } - val prefixspan = new PrefixSpan() .setMinSupport(0.33) .setMaxPatternLength(50) @@ -76,7 +69,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, 5), 2L), (Array(5), 3L) ) - assert(compareResult(expectedValue1, result1.collect())) + assert(compareResults(expectedValue1, result1.collect())) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) @@ -87,7 +80,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4), 4L), (Array(5), 3L) ) - assert(compareResult(expectedValue2, result2.collect())) + assert(compareResults(expectedValue2, result2.collect())) prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) @@ -107,6 +100,14 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, 5), 2L), (Array(5), 3L) ) - assert(compareResult(expectedValue3, result3.collect())) + assert(compareResults(expectedValue3, result3.collect())) + } + + private def compareResults( + expectedValue: Array[(Array[Int], Long)], + actualValue: Array[(Array[Int], Long)]): Boolean = { + expectedValue.map(x => (x._1.toSeq, x._2)).toSet == + actualValue.map(x => (x._1.toSeq, x._2)).toSet } + } From 01c9ae9aa3f09aa4ec058db024f6a2cb482570bb Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:39:42 -0700 Subject: [PATCH 2/7] Add getters --- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index b70ff9815adc8..5e6322f2a05a1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -53,6 +53,12 @@ class PrefixSpan private ( */ def this() = this(0.1, 10) + /** + * Get the minimal support (i.e. the frequency of occurrence before a pattern is considered + * frequent). + */ + def getMinSupport(): Double = this.minSupport + /** * Sets the minimal support level (default: `0.1`). */ @@ -63,10 +69,16 @@ class PrefixSpan private ( this } + /** + * Gets the maximal pattern length (i.e. the length of the longest sequential pattern to consider. + */ + def getMaxPatternLength(): Double = this.maxPatternLength + /** * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { + // TODO: support unbounded pattern length when maxPatternLength = 0 require(maxPatternLength >= 1, "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength From cb2a4fc71d4874f3bb3cf0d8b0331e5b41f7cf45 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:50:31 -0700 Subject: [PATCH 3/7] Inline code for readability --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 ++++++++----------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 5e6322f2a05a1..5c563262e184d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -97,14 +97,31 @@ class PrefixSpan private ( if (sequences.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } - val minCount = getMinCount(sequences) - val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences) - val prefixSuffixPairs = getPrefixSuffixPairs( - lengthOnePatternsAndCounts.map(_._1).collect(), sequences) + + // Convert min support to a min number of transactions for this dataset + val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong + + val itemCounts = sequences + .flatMap(_.distinct.map((_, 1L))) + .reduceByKey(_ + _) + .filter(_._2 >= minCount) + + val prefixSuffixPairs = { + val frequentItems = itemCounts.map(_._1).collect() + val candidates = sequences.map { p => + p.filter (frequentItems.contains(_) ) + } + candidates.flatMap { x => + frequentItems.map { y => + val sub = LocalPrefixSpan.getSuffix(y, x) + (ArrayBuffer(y), sub) + }.filter(_._2.nonEmpty) + } + } prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2)) - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = - splitPrefixSuffixPairs(prefixSuffixPairs) + + var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2)) + var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) @@ -115,6 +132,7 @@ class PrefixSpan private ( smallPrefixSuffixPairs ++= smallerPairsPart allPatternAndCounts ++= nextPatternAndCounts } + if (smallPrefixSuffixPairs.count() > 0) { val projectedDatabase = smallPrefixSuffixPairs .map(x => (x._1.toSeq, x._2)) @@ -189,29 +207,6 @@ class PrefixSpan private ( (patternAndCounts, nextPrefixSuffixPairs) } - /** - * Get the minimum count (sequences count * minSupport). - * @param sequences input data set, contains a set of sequences, - * @return minimum count, - */ - private def getMinCount(sequences: RDD[Array[Int]]): Long = { - if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - } - - /** - * Generates frequent items by filtering the input data using minimal count level. - * @param minCount the absolute minimum count - * @param sequences original sequences data - * @return array of item and count pair - */ - private def getFreqItemAndCounts( - minCount: Long, - sequences: RDD[Array[Int]]): RDD[(Int, Long)] = { - sequences.flatMap(_.distinct.map((_, 1L))) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - } - /** * Get the frequent prefixes and suffix pairs. * @param frequentPrefixes frequent prefixes @@ -221,15 +216,6 @@ class PrefixSpan private ( private def getPrefixSuffixPairs( frequentPrefixes: Array[Int], sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = { - val filteredSequences = sequences.map { p => - p.filter (frequentPrefixes.contains(_) ) - } - filteredSequences.flatMap { x => - frequentPrefixes.map { y => - val sub = LocalPrefixSpan.getSuffix(y, x) - (ArrayBuffer(y), sub) - }.filter(_._2.nonEmpty) - } } /** From da0091b3d4d9e9d7f058b645272e70e3256c1ac7 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:21:06 -0700 Subject: [PATCH 4/7] Use lists for prefixes to reuse data --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 5c563262e184d..79f8b651f83b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -102,9 +102,10 @@ class PrefixSpan private ( val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong val itemCounts = sequences - .flatMap(_.distinct.map((_, 1L))) + .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) .filter(_._2 >= minCount) + var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2)) val prefixSuffixPairs = { val frequentItems = itemCounts.map(_._1).collect() @@ -114,14 +115,12 @@ class PrefixSpan private ( candidates.flatMap { x => frequentItems.map { y => val sub = LocalPrefixSpan.getSuffix(y, x) - (ArrayBuffer(y), sub) + (List(y), sub) }.filter(_._2.nonEmpty) } } - prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - - var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2)) var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) + while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) @@ -135,9 +134,9 @@ class PrefixSpan private ( if (smallPrefixSuffixPairs.count() > 0) { val projectedDatabase = smallPrefixSuffixPairs - .map(x => (x._1.toSeq, x._2)) + // TODO aggregateByKey .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) + .mapValues(_.toArray) val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts ++= nextPatternAndCounts } @@ -154,8 +153,8 @@ class PrefixSpan private ( * (RDD[prefix, suffix], RDD[prefix, suffix ]) */ private def splitPrefixSuffixPairs( - prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): - (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]): + (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, @@ -179,14 +178,14 @@ class PrefixSpan private ( */ private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): - (RDD[(ArrayBuffer[Int], Long)], RDD[(ArrayBuffer[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]): + (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { val prefixAndFrequentItemAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) .filter(_._2 >= minCount) val patternAndCounts = prefixAndFrequentItemAndCounts - .map { case ((prefix, item), count) => (prefix :+ item, count) } + .map { case ((prefix, item), count) => (item :: prefix, count) } val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts .keys .groupByKey() @@ -201,23 +200,12 @@ class PrefixSpan private ( frequentNextItems.flatMap { item => val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) if (suffix.isEmpty) None - else Some(prefix :+ item, suffix) + else Some(item :: prefix, suffix) } } (patternAndCounts, nextPrefixSuffixPairs) } - /** - * Get the frequent prefixes and suffix pairs. - * @param frequentPrefixes frequent prefixes - * @param sequences sequences data - * @return prefixes and suffix pairs. - */ - private def getPrefixSuffixPairs( - frequentPrefixes: Array[Int], - sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = { - } - /** * calculate the patterns in local. * @param minCount the absolute minimum count @@ -226,13 +214,13 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = { + data: RDD[(List[Int], Array[Array[Int]])]): RDD[(List[Int], Long)] = { data.flatMap { - case (prefix, projDB) => - LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) - .map { case (pattern: List[Int], count: Long) => - (pattern.toArray.reverse.to[ArrayBuffer], count) - } + case (prefix, projDB) => + LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) + .map { case (pattern: List[Int], count: Long) => + (pattern.reverse, count) + } } } } From 1235cfcc9367b546bcf564972a33b769f62da520 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:30:29 -0700 Subject: [PATCH 5/7] Use Iterable[Array[_]] over Array[Array[_]] for database --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 6 +-- .../apache/spark/mllib/fpm/PrefixSpan.scala | 37 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala index 7ead6327486cc..0ea792081086d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -40,7 +40,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { minCount: Long, maxPatternLength: Int, prefixes: List[Int], - database: Array[Array[Int]]): Iterator[(List[Int], Long)] = { + database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = { if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty val frequentItemAndCounts = getFreqItemAndCounts(minCount, database) val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains)) @@ -67,7 +67,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { } } - def project(database: Array[Array[Int]], prefix: Int): Array[Array[Int]] = { + def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = { database .map(getSuffix(prefix, _)) .filter(_.nonEmpty) @@ -81,7 +81,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { */ private def getFreqItemAndCounts( minCount: Long, - database: Array[Array[Int]]): mutable.Map[Int, Long] = { + database: Iterable[Array[Int]]): mutable.Map[Int, Long] = { // TODO: use PrimitiveKeyOpenHashMap val counts = mutable.Map[Int, Long]().withDefaultValue(0L) database.foreach { sequence => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 79f8b651f83b3..bbdc75532ae6f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -45,7 +45,11 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val maxProjectedDBSizeBeforeLocalProcessing: Long = 10000 + /** + * The maximum number of items allowed in a projected database before local processing. If a + * projected database exceeds this size, another iteration of distributed PrefixSpan is run. + */ + private val maxLocalProjDBSize: Long = 10000 /** * Constructs a default instance with default parameters @@ -63,8 +67,7 @@ class PrefixSpan private ( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { - require(minSupport >= 0 && minSupport <= 1, - "The minimum support value must be in [0, 1].") + require(minSupport >= 0 && minSupport <= 1, "The minimum support value must be in [0, 1].") this.minSupport = minSupport this } @@ -79,8 +82,7 @@ class PrefixSpan private ( */ def setMaxPatternLength(maxPatternLength: Int): this.type = { // TODO: support unbounded pattern length when maxPatternLength = 0 - require(maxPatternLength >= 1, - "The maximum pattern length value must be greater than 0.") + require(maxPatternLength >= 1, "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength this } @@ -119,13 +121,13 @@ class PrefixSpan private ( }.filter(_._2.nonEmpty) } } - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) + var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs) while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) largePrefixSuffixPairs.unpersist() - val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs(nextPrefixSuffixPairs) + val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) largePrefixSuffixPairs = largerPairsPart largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) smallPrefixSuffixPairs ++= smallerPairsPart @@ -136,7 +138,6 @@ class PrefixSpan private ( val projectedDatabase = smallPrefixSuffixPairs // TODO aggregateByKey .groupByKey() - .mapValues(_.toArray) val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts ++= nextPatternAndCounts } @@ -145,23 +146,21 @@ class PrefixSpan private ( /** - * Split prefix suffix pairs to two parts: - * Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and - * Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing + * Partitions the prefix-suffix pairs by projected database size. + * * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return small size prefix suffix pairs and big size prefix suffix pairs - * (RDD[prefix, suffix], RDD[prefix, suffix ]) + * @return prefix-suffix pairs partitioned by whether their projected database size is <= or + * greater than [[maxLocalProjDBSize]] */ - private def splitPrefixSuffixPairs( - prefixSuffixPairs: RDD[(List[Int], Array[Int])]): - (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { + private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])]) + : (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, combOp = { _ + _ }) val smallPrefixes = prefixToSuffixSize - .filter(_._2 <= maxProjectedDBSizeBeforeLocalProcessing) - .map(_._1) + .filter(_._2 <= maxLocalProjDBSize) + .keys .collect() .toSet val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } @@ -214,7 +213,7 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(List[Int], Array[Array[Int]])]): RDD[(List[Int], Long)] = { + data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = { data.flatMap { case (prefix, projDB) => LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) From c2caa5cb19e5c9e54dda288c9c1e7befb21feb64 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:54:30 -0700 Subject: [PATCH 6/7] Readability improvements and comments --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index bbdc75532ae6f..8a15a867910a2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -103,45 +103,49 @@ class PrefixSpan private ( // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - val itemCounts = sequences + // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold + val freqItemCounts = sequences .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) .filter(_._2 >= minCount) - var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2)) - val prefixSuffixPairs = { - val frequentItems = itemCounts.map(_._1).collect() - val candidates = sequences.map { p => - p.filter (frequentItems.contains(_) ) - } - candidates.flatMap { x => - frequentItems.map { y => - val sub = LocalPrefixSpan.getSuffix(y, x) - (List(y), sub) - }.filter(_._2.nonEmpty) + // Pairs of (length 1 prefix, suffix consisting of frequent items) + val itemSuffixPairs = { + val freqItems = freqItemCounts.keys.collect().toSet + sequences.flatMap { seq => + freqItems.flatMap { item => + val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_))) + candidateSuffix match { + case suffix if !suffix.isEmpty => Some((List(item), suffix)) + case _ => None + } + } } } - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs) - while (largePrefixSuffixPairs.count() != 0) { + // Accumulator for the computed results to be returned + var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) + + // Remaining work to be locally and distributively processed respectfully + var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs) + + // Continue processing until no pairs for distributed processing remain (i.e. all prefixes have + // projected database sizes <= `maxLocalProjDBSize`) + while (pairsForDistributed.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) - largePrefixSuffixPairs.unpersist() + getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed) + pairsForDistributed.unpersist() val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) - largePrefixSuffixPairs = largerPairsPart - largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - smallPrefixSuffixPairs ++= smallerPairsPart - allPatternAndCounts ++= nextPatternAndCounts + pairsForDistributed = largerPairsPart + pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK) + pairsForLocal ++= smallerPairsPart + resultsAccumulator ++= nextPatternAndCounts } - if (smallPrefixSuffixPairs.count() > 0) { - val projectedDatabase = smallPrefixSuffixPairs - // TODO aggregateByKey - .groupByKey() - val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) - allPatternAndCounts ++= nextPatternAndCounts - } - allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) } + // Process the small projected databases locally + resultsAccumulator ++= getPatternsInLocal(minCount, pairsForLocal.groupByKey()) + + resultsAccumulator.map { case (pattern, count) => (pattern.toArray, count) } } @@ -177,8 +181,8 @@ class PrefixSpan private ( */ private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixSuffixPairs: RDD[(List[Int], Array[Int])]): - (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]) + : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { val prefixAndFrequentItemAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) From 87fa021afaa184dfbf7eafcae0beb494697c40e2 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 16:19:48 -0700 Subject: [PATCH 7/7] Improve extend prefix readability --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 8a15a867910a2..5b8da9665366b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -103,7 +103,7 @@ class PrefixSpan private ( // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold + // (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold val freqItemCounts = sequences .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) @@ -113,8 +113,9 @@ class PrefixSpan private ( val itemSuffixPairs = { val freqItems = freqItemCounts.keys.collect().toSet sequences.flatMap { seq => + val filteredSeq = seq.filter(freqItems.contains(_)) freqItems.flatMap { item => - val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_))) + val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq) candidateSuffix match { case suffix if !suffix.isEmpty => Some((List(item), suffix)) case _ => None @@ -123,7 +124,8 @@ class PrefixSpan private ( } } - // Accumulator for the computed results to be returned + // Accumulator for the computed results to be returned, initialized to the frequent items (i.e. + // frequent length-one prefixes) var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) // Remaining work to be locally and distributively processed respectfully @@ -133,7 +135,7 @@ class PrefixSpan private ( // projected database sizes <= `maxLocalProjDBSize`) while (pairsForDistributed.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed) + extendPrefixes(minCount, pairsForDistributed) pairsForDistributed.unpersist() val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) pairsForDistributed = largerPairsPart @@ -151,7 +153,6 @@ class PrefixSpan private ( /** * Partitions the prefix-suffix pairs by projected database size. - * * @param prefixSuffixPairs prefix (length n) and suffix pairs, * @return prefix-suffix pairs partitioned by whether their projected database size is <= or * greater than [[maxLocalProjDBSize]] @@ -173,44 +174,57 @@ class PrefixSpan private ( } /** - * Get the pattern and counts, and prefix suffix pairs + * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes + * and remaining work. * @param minCount minimum count - * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return pattern (length n+1) and counts, and prefix (length n+1) and suffix pairs - * (RDD[pattern, count], RDD[prefix, suffix ]) + * @param prefixSuffixPairs prefix (length N) and suffix pairs, + * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended + * prefix, corresponding suffix) pairs. */ - private def getPatternCountsAndPrefixSuffixPairs( + private def extendPrefixes( minCount: Long, prefixSuffixPairs: RDD[(List[Int], Array[Int])]) : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { - val prefixAndFrequentItemAndCounts = prefixSuffixPairs + + // (length N prefix, item from suffix) pairs and their corresponding number of occurrences + // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport` + val prefixItemPairAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) .filter(_._2 >= minCount) - val patternAndCounts = prefixAndFrequentItemAndCounts - .map { case ((prefix, item), count) => (item :: prefix, count) } - val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts + + // Map from prefix to set of possible next items from suffix + val prefixToNextItems = prefixItemPairAndCounts .keys .groupByKey() .mapValues(_.toSet) .collect() .toMap - val nextPrefixSuffixPairs = prefixSuffixPairs - .filter(x => prefixToFrequentNextItemsMap.contains(x._1)) + + + // Frequent patterns with length N+1 and their corresponding counts + val extendedPrefixAndCounts = prefixItemPairAndCounts + .map { case ((prefix, item), count) => (item :: prefix, count) } + + // Remaining work, all prefixes will have length N+1 + val extendedPrefixAndSuffix = prefixSuffixPairs + .filter(x => prefixToNextItems.contains(x._1)) .flatMap { case (prefix, suffix) => - val frequentNextItems = prefixToFrequentNextItemsMap(prefix) - val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) - frequentNextItems.flatMap { item => - val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) - if (suffix.isEmpty) None - else Some(item :: prefix, suffix) + val frequentNextItems = prefixToNextItems(prefix) + val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) + frequentNextItems.flatMap { item => + LocalPrefixSpan.getSuffix(item, filteredSuffix) match { + case suffix if !suffix.isEmpty => Some(item :: prefix, suffix) + case _ => None + } + } } - } - (patternAndCounts, nextPrefixSuffixPairs) + + (extendedPrefixAndCounts, extendedPrefixAndSuffix) } /** - * calculate the patterns in local. + * Calculate the patterns in local. * @param minCount the absolute minimum count * @param data prefixes and projected sequences data data * @return patterns