Skip to content

Commit

Permalink
Working on temporal sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
Feynman Liang committed Jul 31, 2015
1 parent f1114b9 commit 6787716
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
maxPatternLength: Int,
prefixes: List[Int],
database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = {
if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty
if (prefixes.count(_ == DELIMITER) == maxPatternLength || database.isEmpty) {
return Iterator.empty
}
val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
val filteredDatabase = database.map { suffix =>
suffix.filter(item => item == DELIMITER || frequentItemAndCounts.contains(item))
}
frequentItemAndCounts.iterator.flatMap { case (item, count) =>
val newPrefixes = item :: prefixes
val newPrefixes = DELIMITER :: item :: prefixes
val newProjected = project(filteredDatabase, item)
Iterator.single((newPrefixes, count)) ++
run(minCount, maxPatternLength, newPrefixes, newProjected)
Expand All @@ -65,7 +67,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
if (index == -1) {
Array()
} else {
// drop until we get to the next delimiter (or end of sequence)
// in case index is inside an itemset, drop until we get to the next delimiter (or end of seq)
sequence.drop(index).dropWhile(_ != DELIMITER).drop(1)
}
}
Expand Down
26 changes: 13 additions & 13 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ class PrefixSpan private (

/**
* Find the complete set of sequential patterns in the input sequences.
* @param sequences a dataset of sequences. Items in a sequence are represented by non-negative
* integers and delimited by [[DELIMITER]]. Non-temporal sequences
* are supported by placing more than one item between delimiters.
* @param sequences ordered sequences of itemsets. Items are represented by non-negative integers.
* Each itemset has one or more items and is delimited by [[DELIMITER]].
* @return a set of sequential pattern pairs,
* the key of pair is pattern (a list of elements),
* the value of pair is the pattern's count.
Expand Down Expand Up @@ -124,22 +123,23 @@ class PrefixSpan private (
freqItems.flatMap { item =>
val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq)
candidateSuffix match {
case suffix if !suffix.isEmpty => Some((List(item), suffix))
case suffix if !suffix.isEmpty => Some((List(DELIMITER, item), suffix))
case _ => None
}
}
}
}
// Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
// frequent length-one prefixes)
var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))
var resultsAccumulator = freqItemCounts.map(x => (List(DELIMITER, x._1), x._2))

// Remaining work to be locally and distributively processed respectfully
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)

// Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
// projected database sizes <= `maxLocalProjDBSize`)
while (pairsForDistributed.count() != 0) {
// projected database sizes <= `maxLocalProjDBSize`) or `maxPatternLength` is reached
var patternLength = 1
while (pairsForDistributed.count() != 0 || patternLength < maxPatternLength) {
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
extendPrefixes(minCount, pairsForDistributed)
pairsForDistributed.unpersist()
Expand All @@ -148,14 +148,15 @@ class PrefixSpan private (
pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
pairsForLocal ++= smallerPairsPart
resultsAccumulator ++= nextPatternAndCounts.collect()
patternLength += 1 // pattern length grows one per iteration
}

// Process the small projected databases locally
val remainingResults = getPatternsInLocal(
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())

(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
.map { case (pattern, count) => (pattern.toArray, count) }
.map { case (pattern, count) => (pattern.reverse.toArray, count) }
}


Expand Down Expand Up @@ -209,10 +210,9 @@ class PrefixSpan private (
.collect()
.toMap


// Frequent patterns with length N+1 and their corresponding counts
val extendedPrefixAndCounts = prefixItemPairAndCounts
.map { case ((prefix, item), count) => (item :: prefix, count) }
.map { case ((prefix, item), count) => (DELIMITER :: item :: prefix, count) }

// Remaining work, all prefixes will have length N+1
val extendedPrefixAndSuffix = prefixSuffixPairs
Expand All @@ -222,7 +222,7 @@ class PrefixSpan private (
val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
frequentNextItems.flatMap { item =>
LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
case suffix if !suffix.isEmpty => Some(DELIMITER :: item :: prefix, suffix)
case _ => None
}
}
Expand All @@ -242,9 +242,9 @@ class PrefixSpan private (
data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = {
data.flatMap {
case (prefix, projDB) =>
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB)
.map { case (pattern: List[Int], count: Long) =>
(pattern.reverse, count)
(pattern, count)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(4), 4L),
(Array(4, 5), 2L),
(Array(5), 3L)
)
).map { case (seq, count) => (insertDelimiter(seq), count) }
compareResults(expectedValue1, result1.collect())

prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
Expand All @@ -79,7 +79,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(3, 4), 3L),
(Array(4), 4L),
(Array(5), 3L)
)
).map { case (seq, count) => (insertDelimiter(seq), count) }
compareResults(expectedValue2, result2.collect())

prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
Expand All @@ -99,15 +99,92 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(4), 4L),
(Array(4, 5), 2L),
(Array(5), 3L)
)
).map { case (seq, count) => (insertDelimiter(seq), count) }
compareResults(expectedValue3, result3.collect())
}

test("PrefixSpan non-temporal sequences") {
val sequences = Array(
"a,abc,ac,d,cf",
"ad,c,bc,ae",
"ef,ab,df,c,b",
"e,g,af,c,b,c")
val coder = Array('a', 'b', 'c', 'd', 'e', 'f', 'g').zip(Array(1, 2, 3, 4, 5, 6, 7)).toMap
val intSequences = sequences.map(_.split(",").flatMap(-1 +: _.toArray.map(coder)).drop(1))
val data = sc.parallelize(intSequences, 2).cache()
val prefixspan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)

val results = prefixspan.run(data)
val expectedValue4 = Array(
"a:4",
"b:4",
"c:4",
"d:3",
"e:3",
"f:3",
"a,a:2",
"a,b:4",
"a,bc:2",
"a,bc,a:2",
"a,b,a:2",
"a,b,c:2",
"ab:2",
"ab,c:2",
"ab,d:2",
"ab,d,c:2",
"ab,f:2",
"a,c:4",
"a,c,a:2",
"a,c,b:3",
"a,c,c:3",
"a,d:2",
"a,d,c:2",
"a,f:2",
"b,a:2",
"b,c:3",
"bc:2",
"bc,a:2",
"b,d:2",
"b,d,c:2",
"b,f:2",
"c,a:2",
"c,b:3",
"c,c:3",
"d,b:2",
"d,c:3",
"d,c,b:2",
"e,a:2",
"e,a,b:2",
"e,a,c:2",
"e,a,c,b:2",
"e,b:2",
"e,b,c:2",
"e,c:2",
"e,c,b:2",
"e,f:2",
"e,f,b:2",
"e,f,c:2",
"e,f,c,b:2",
"f,b:2",
"f,b,c:2",
"f,c:2",
"f,c,b:2")
val intExpectedValue = expectedValue4
.map(_.split(":"))
.map { x => (x(0).split(",").flatMap(-1 +: _.toArray.map(coder)), x(1).toLong) }
compareResults(intExpectedValue, results.collect())
}

private def compareResults(
expectedValue: Array[(Array[Int], Long)],
actualValue: Array[(Array[Int], Long)]): Unit = {
assert(expectedValue.map(x => (x._1.toSeq, x._2)).toSet ===
actualValue.map(x => (x._1.toSeq, x._2)).toSet)
expectedValue: Array[(Array[Int], Long)],
actualValue: Array[(Array[Int], Long)]): Unit = {
val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
println(s"missing expected:\n${expectedSet.diff(actualSet)}")
println(s"extra actual:\n${actualSet.diff(expectedSet)}")
assert(expectedSet === actualSet)
}

private def insertDelimiter(sequence: Array[Int]): Array[Int] = {
Expand Down

0 comments on commit 6787716

Please sign in to comment.