Skip to content

Commit

Permalink
restore original version
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjiajin committed Jul 31, 2015
1 parent 09dc409 commit a5d649d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 291 deletions.
123 changes: 14 additions & 109 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
minCount: Long,
maxPatternLength: Int,
prefixes: List[Int],
<<<<<<< HEAD
database: Array[(Array[Int], Int)]): Iterator[(List[Int], Long)] = {
if ((prefixes.nonEmpty && prefixes.filter(_ != -1).length == maxPatternLength) ||
database.length < minCount) { return Iterator.empty }
val frequentItemAndCounts = getFreqPrefixAndCounts(minCount, prefixes, database)
frequentItemAndCounts.iterator.flatMap { case (prefix, count) =>
val newProjected = project(database, prefix)
Iterator.single((prefix, count)) ++
run(minCount, maxPatternLength, prefix, newProjected)
=======
database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = {
if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty
val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
Expand All @@ -59,131 +49,46 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
val newProjected = project(filteredDatabase, item)
Iterator.single((newPrefixes, count)) ++
run(minCount, maxPatternLength, newPrefixes, newProjected)
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
}
}

/**
* Calculate suffix sequence immediately after the first occurrence of an item.
* @param element the last element of prefix
* @param sequenceAndFlag sequence to extract suffix from
* @param item item to get suffix after
* @param sequence sequence to extract suffix from
* @return suffix sequence
*/
def getSuffix(
element: Array[Int],
sequenceAndFlag: (Array[Int], Int)): (Array[Int], Int) = {
val (originalSequence, flag) = sequenceAndFlag
val sequence =
if (element.length > 1 && flag == 1) {
element.take(element.length - 1) ++ originalSequence
} else if (element.length == 1 && flag == 1) {
val firstPosition = originalSequence.indexOf( -1 )
if (firstPosition != -1) {
originalSequence.drop(firstPosition + 1)
} else {
return (Array(), 0)
}
} else {
originalSequence
}
var found = false
var currentIndex = -1
var nextIndex = 0
while (nextIndex != -1 && !found) {
nextIndex = sequence.indexOf(-1, currentIndex + 1)
found = element.toSet.subsetOf(
sequence.slice(currentIndex + 1, nextIndex).toSet)
if (!found) currentIndex = nextIndex
}
if (found) {
val itemPosition = sequence.indexOf(element.last, currentIndex)
if (sequence.apply(itemPosition + 1) == -1) {
(sequence.drop(itemPosition + 2), 0)
} else {
(sequence.drop(itemPosition + 1), 1)
}
def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = {
val index = sequence.indexOf(item)
if (index == -1) {
Array()
} else {
(Array(), 0)
sequence.drop(index + 1)
}
}

<<<<<<< HEAD
private def project(
database: Array[(Array[Int], Int)],
prefix: List[Int]): Array[(Array[Int], Int)] = {
val lastElement = prefix.toArray.drop(prefix.lastIndexOf(-1) + 1)
=======
def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = {
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
database
.map(getSuffix(lastElement, _))
.filter(_._1.nonEmpty)
.map(getSuffix(prefix, _))
.filter(_.nonEmpty)
}

/**
* Generates frequent prefixes by filtering the input data using minimal count level.
* Generates frequent items by filtering the input data using minimal count level.
* @param minCount the minimum count for an item to be frequent
* @param prefix prefix
* @param suffixes suffixes
* @return freq prefix to count map
* @param database database of sequences
* @return freq item to count map
*/
private def getFreqPrefixAndCounts(
private def getFreqItemAndCounts(
minCount: Long,
<<<<<<< HEAD
prefix: List[Int],
suffixes: Array[(Array[Int], Int)]): mutable.Map[List[Int], Long] = {
val counts = mutable.Map[List[Int], Long]().withDefaultValue(0L)
val singleItemSet = suffixes.map { case (suffix, flag) =>
if (flag == 0) suffix else suffix.drop(suffix.indexOf(-1) + 1)
}.flatMap(_.filter(_ != -1).distinct)
.groupBy(item => item).map(x => (x._1, x._2.length.toLong))
singleItemSet.filter(_._2 >= minCount).foreach { case (item, count) =>
if (prefix.nonEmpty) counts(prefix :+ -1 :+ item) = count else counts(List(item)) = count
}
if (prefix.nonEmpty) {
val lastElement = prefix.drop(prefix.lastIndexOf(-1) + 1).toArray
val multiItemSet = mutable.Map[Int, Long]().withDefaultValue(0L)
suffixes.map { case (suffix, flag) =>
if (flag == 0) suffix else lastElement ++ suffix
}.foreach { suffix =>
singleItemSet.keys.foreach { item =>
if (!lastElement.contains(item)) {
val element = lastElement :+ item
if (isSubElement(suffix, element)) {
multiItemSet(item) += 1L
}
}
}
=======
database: Iterable[Array[Int]]): mutable.Map[Int, Long] = {
// TODO: use PrimitiveKeyOpenHashMap
val counts = mutable.Map[Int, Long]().withDefaultValue(0L)
database.foreach { sequence =>
sequence.distinct.foreach { item =>
counts(item) += 1L
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
}
multiItemSet.filter(_._2 >= minCount).foreach { case (item, count) =>
if (prefix.nonEmpty) {
counts(prefix :+ item) = count
} else {
counts(List(item)) = count
}
}
}
counts
}

private def isSubElement(sequence: Array[Int], element: Array[Int]): Boolean = {
var found = false
var currentIndex = -1
var nextIndex = 0
while (nextIndex != -1 && !found) {
nextIndex = sequence.indexOf(-1, currentIndex + 1)
found = element.toSet.subsetOf(
sequence.slice(currentIndex + 1, nextIndex).toSet)
if (!found) currentIndex = nextIndex
}
found
counts.filter(_._2 >= minCount)
}
}
68 changes: 0 additions & 68 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,43 +102,6 @@ class PrefixSpan private (
if (sequences.getStorageLevel == StorageLevel.NONE) {
logWarning("Input data is not cached.")
}
<<<<<<< HEAD
val sortedSequences = sortSequences(sequences)
val minCount = getMinCount(sortedSequences)
val lengthOnePatternsAndCounts =
getFreqItemAndCounts(minCount, sortedSequences).collect()
val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase(
lengthOnePatternsAndCounts.map(_._1), sortedSequences)
val groupedProjectedDatabase = prefixAndProjectedDatabase
.map(x => (x._1.toSeq, x._2))
.groupByKey()
.map(x => (x._1.toArray, x._2.toArray))
val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase)
val lengthOnePatternsAndCountsRdd =
sequences.sparkContext.parallelize(
lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)))
val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns
allPatterns
}

private def sortSequences(sequences: RDD[Array[Int]]): RDD[Array[Int]] = {
sequences.map { sequence =>
val sortedArray: ArrayBuffer[Int] = ArrayBuffer()
var currentIndex = -1
var nextIndex = 0
while (nextIndex != -1) {
nextIndex = sequence.indexOf(-1, currentIndex + 1)
if (nextIndex != -1) {
sortedArray ++= sequence.slice(currentIndex, nextIndex).sorted
} else {
sortedArray ++= sequence.drop(currentIndex).sorted
}
currentIndex = nextIndex
}
sortedArray.toArray
}
}
=======

// Convert min support to a min number of transactions for this dataset
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong
Expand Down Expand Up @@ -193,7 +156,6 @@ class PrefixSpan private (
.map { case (pattern, count) => (pattern.toArray, count) }
}

>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0

/**
* Partitions the prefix-suffix pairs by projected database size.
Expand Down Expand Up @@ -227,38 +189,16 @@ class PrefixSpan private (
*/
private def extendPrefixes(
minCount: Long,
<<<<<<< HEAD
sequences: RDD[Array[Int]]): RDD[(Int, Long)] = {
sequences.flatMap(_.filter(_ != -1).distinct.map((_, 1L)))
=======
prefixSuffixPairs: RDD[(List[Int], Array[Int])])
: (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {

// (length N prefix, item from suffix) pairs and their corresponding number of occurrences
// Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
val prefixItemPairAndCounts = prefixSuffixPairs
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
.reduceByKey(_ + _)
.filter(_._2 >= minCount)

<<<<<<< HEAD
/**
* Get the frequent prefixes' projected database.
* @param frequentPrefixes frequent prefixes
* @param sequences sequences data
* @return prefixes and projected database
*/
private def getPrefixAndProjectedDatabase(
frequentPrefixes: Array[Int],
sequences: RDD[Array[Int]]): RDD[(Array[Int], (Array[Int], Int))] = {
sequences.flatMap { sequence =>
frequentPrefixes.map { item =>
val sub = LocalPrefixSpan.getSuffix(Array(item), (sequence, 0))
(Array(item), sub)
}.filter(_._2._1.nonEmpty)
}
=======
// Map from prefix to set of possible next items from suffix
val prefixToNextItems = prefixItemPairAndCounts
.keys
Expand Down Expand Up @@ -287,7 +227,6 @@ class PrefixSpan private (
}

(extendedPrefixAndCounts, extendedPrefixAndSuffix)
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
}

/**
Expand All @@ -298,20 +237,13 @@ class PrefixSpan private (
*/
private def getPatternsInLocal(
minCount: Long,
<<<<<<< HEAD
data: RDD[(Array[Int], Array[(Array[Int], Int)])]): RDD[(Array[Int], Long)] = {
data.flatMap { case (prefix, projDB) =>
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB)
.map { case (pattern: List[Int], count: Long) => (pattern.toArray, count) }
=======
data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = {
data.flatMap {
case (prefix, projDB) =>
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
.map { case (pattern: List[Int], count: Long) =>
(pattern.reverse, count)
}
>>>>>>> 83670fc9e6fc9c7a6ae68dfdd3f9335ea72f4ab0
}
}
}
Loading

0 comments on commit a5d649d

Please sign in to comment.