Skip to content

Commit

Permalink
Merge branch 'master' into achit/LOCO-Test-Bug-Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm authored Jan 9, 2020
2 parents 92c1e43 + a6aceb6 commit b4a2116
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 114 deletions.
23 changes: 0 additions & 23 deletions .github/workflows/main.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ trait RichNumericFeature {
f.transformWith(new SqrtTransformer[I]())

/**
* Square root transformer
* Log transformer
* @return transformed feature
*/
def log(base: Double): FeatureLike[Real] =
f.transformWith(new LogTransformer[I](base = base))

/**
* Square root transformer
* Power transformer
* @return transformed feature
*/
def power(power: Double): FeatureLike[Real] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import com.twitter.algebird._
import com.twitter.algebird.Operators._
import org.apache.spark.mllib.feature.HashingTF
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Formats}
import org.json4s.{DefaultFormats, FieldSerializer, Formats}

import scala.util.Try

Expand Down Expand Up @@ -192,8 +192,13 @@ object FeatureDistribution {
override def plus(l: FeatureDistribution, r: FeatureDistribution): FeatureDistribution = l.reduce(r)
}

val FeatureDistributionSerializer = FieldSerializer[FeatureDistribution](
FieldSerializer.ignore("cardEstimate")
)

implicit val formats: Formats = DefaultFormats +
EnumEntrySerializer.json4s[FeatureDistributionType](FeatureDistributionType)
EnumEntrySerializer.json4s[FeatureDistributionType](FeatureDistributionType) +
FeatureDistributionSerializer

/**
* Feature distributions to json
Expand Down Expand Up @@ -277,11 +282,11 @@ object FeatureDistribution {
* @return TextStats object containing a Map from a value to its frequency (histogram)
*/
private def cardinalityValues(values: ProcessedSeq): TextStats = {
val population = values match {
case Left(seq) => seq
case Right(seq) => seq.map(_.toString)
}
TextStats(population.groupBy(identity).map{case (key, value) => (key, value.size)})
TextStats(countStringValues(values.left.getOrElse(values.right.get)), Map.empty)
}

private def countStringValues[T](seq: Seq[T]): Map[String, Long] = {
seq.groupBy(identity).map { case (k, valSeq) => k.toString -> valSeq.size.toLong }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class SmartTextMapVectorizer[T <: OPMap[String]]
textMap: T#Value, shouldCleanKeys: Boolean, shouldCleanValues: Boolean
): TextMapStats = {
val keyValueCounts = textMap.map{ case (k, v) =>
cleanTextFn(k, shouldCleanKeys) -> TextStats(Map(cleanTextFn(v, shouldCleanValues) -> 1))
cleanTextFn(k, shouldCleanKeys) ->
TextStats(Map(cleanTextFn(v, shouldCleanValues) -> 1), Map(cleanTextFn(v, shouldCleanValues).length -> 1))
}
TextMapStats(keyValueCounts)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SmartTextVectorizer[T <: Text](uid: String = UID[SmartTextVectorizer[T]])(
extends SequenceEstimator[T, OPVector](operationName = "smartTxtVec", uid = uid)
with PivotParams with CleanTextFun with SaveOthersParams
with TrackNullsParam with MinSupportParam with TextTokenizerParams with TrackTextLenParam
with HashingVectorizerParams with HashingFun with OneHotFun with MaxCardinalityParams {
with HashingVectorizerParams with HashingFun with OneHotFun with MaxCardinalityParams with MinLengthStdDevParams {

private implicit val textStatsSeqEnc: Encoder[Array[TextStats]] = ExpressionEncoder[Array[TextStats]]()

Expand All @@ -81,23 +81,28 @@ class SmartTextVectorizer[T <: Text](uid: String = UID[SmartTextVectorizer[T]])(
require(!dataset.isEmpty, "Input dataset cannot be empty")

val maxCard = $(maxCardinality)
val minLenStdDev = $(minLengthStdDev)
val shouldCleanText = $(cleanText)

implicit val testStatsMonoid: Semigroup[TextStats] = TextStats.monoid(maxCard)
val valueStats: Dataset[Array[TextStats]] = dataset.map(_.map(computeTextStats(_, shouldCleanText)).toArray)
val aggregatedStats: Array[TextStats] = valueStats.reduce(_ + _)

val (isCategorical, topValues) = aggregatedStats.map { stats =>
val isCategorical = stats.valueCounts.size <= maxCard
val (vectorizationMethods, topValues) = aggregatedStats.map { stats =>
val vecMethod: TextVectorizationMethod = stats match {
case _ if stats.valueCounts.size <= maxCard => TextVectorizationMethod.Pivot
case _ if stats.lengthStdDev <= minLenStdDev => TextVectorizationMethod.Ignore
case _ => TextVectorizationMethod.Hash
}
val topValues = stats.valueCounts
.filter { case (_, count) => count >= $(minSupport) }
.toSeq.sortBy(v => -v._2 -> v._1)
.take($(topK)).map(_._1)
isCategorical -> topValues
(vecMethod, topValues)
}.unzip

val smartTextParams = SmartTextVectorizerModelArgs(
isCategorical = isCategorical,
vectorizationMethods = vectorizationMethods,
topValues = topValues,
shouldCleanText = shouldCleanText,
shouldTrackNulls = $(trackNulls),
Expand All @@ -117,36 +122,40 @@ class SmartTextVectorizer[T <: Text](uid: String = UID[SmartTextVectorizer[T]])(
}

private def computeTextStats(text: T#Value, shouldCleanText: Boolean): TextStats = {
val valueCounts = text match {
case Some(v) => Map(cleanTextFn(v, shouldCleanText) -> 1)
case None => Map.empty[String, Int]
val (valueCounts, lengthCounts) = text match {
case Some(v) => (Map(cleanTextFn(v, shouldCleanText) -> 1L), Map(cleanTextFn(v, shouldCleanText).length -> 1L))
case None => (Map.empty[String, Long], Map.empty[Int, Long])
}
TextStats(valueCounts)
TextStats(valueCounts, lengthCounts)
}

private def makeVectorMetadata(smartTextParams: SmartTextVectorizerModelArgs): OpVectorMetadata = {
require(inN.length == smartTextParams.isCategorical.length)
require(inN.length == smartTextParams.vectorizationMethods.length)

val (categoricalFeatures, textFeatures) =
SmartTextVectorizer.partition[TransientFeature](inN, smartTextParams.isCategorical)
val groups = inN.toArray.zip(smartTextParams.vectorizationMethods).groupBy(_._2)
val textToPivot = groups.getOrElse(TextVectorizationMethod.Pivot, Array.empty).map(_._1)
val textToIgnore = groups.getOrElse(TextVectorizationMethod.Ignore, Array.empty).map(_._1)
val textToHash = groups.getOrElse(TextVectorizationMethod.Hash, Array.empty).map(_._1)
val allTextFeatures = textToHash ++ textToIgnore

// build metadata describing output
val shouldTrackNulls = $(trackNulls)
val shouldTrackLen = $(trackTextLen)
val unseen = Option($(unseenName))

val categoricalColumns = if (categoricalFeatures.nonEmpty) {
makeVectorColumnMetadata(shouldTrackNulls, unseen, smartTextParams.categoricalTopValues, categoricalFeatures)
val categoricalColumns = if (textToPivot.nonEmpty) {
makeVectorColumnMetadata(shouldTrackNulls, unseen, smartTextParams.categoricalTopValues, textToPivot)
} else Array.empty[OpVectorColumnMetadata]
val textColumns = if (textFeatures.nonEmpty) {

val textColumns = if (allTextFeatures.nonEmpty) {
if (shouldTrackLen) {
makeVectorColumnMetadata(textFeatures, makeHashingParams()) ++
textFeatures.map(_.toColumnMetaData(descriptorValue = OpVectorColumnMetadata.TextLenString)) ++
textFeatures.map(_.toColumnMetaData(isNull = true))
makeVectorColumnMetadata(textToHash, makeHashingParams()) ++
allTextFeatures.map(_.toColumnMetaData(descriptorValue = OpVectorColumnMetadata.TextLenString)) ++
allTextFeatures.map(_.toColumnMetaData(isNull = true))
}
else {
makeVectorColumnMetadata(textFeatures, makeHashingParams()) ++
textFeatures.map(_.toColumnMetaData(isNull = true))
makeVectorColumnMetadata(textToHash, makeHashingParams()) ++
allTextFeatures.map(_.toColumnMetaData(isNull = true))
}
} else Array.empty[OpVectorColumnMetadata]

Expand All @@ -156,53 +165,84 @@ class SmartTextVectorizer[T <: Text](uid: String = UID[SmartTextVectorizer[T]])(
}

object SmartTextVectorizer {
val MaxCardinality = 100
val MaxCardinality: Int = 100
val MinTextLengthStdDev: Double = 0
private[op] def partition[T: ClassTag](input: Array[T], condition: Array[Boolean]): (Array[T], Array[T]) = {
val all = input.zip(condition)
(all.collect { case (item, true) => item }.toSeq.toArray, all.collect { case (item, false) => item }.toSeq.toArray)
(all.collect { case (item, true) => item }, all.collect { case (item, false) => item })
}
}

/**
* Summary statistics of a text feature
*
* @param valueCounts counts of feature values
* @param valueCounts counts of feature values
* @param lengthCounts counts of token lengths
*/
private[op] case class TextStats(valueCounts: Map[String, Int]) extends JsonLike
private[op] case class TextStats(
valueCounts: Map[String, Long],
lengthCounts: Map[Int, Long]
) extends JsonLike {

val lengthSize = lengthCounts.values.sum
val lengthMean: Double = lengthCounts.foldLeft(0.0)((acc, el) => acc + el._1 * el._2) / lengthSize
val lengthVariance: Double = lengthCounts.foldLeft(0.0)(
(acc, el) => acc + el._2 * (el._1 - lengthMean) * (el._1 - lengthMean)
)
val lengthStdDev: Double = math.sqrt(lengthVariance / lengthSize)
}

private[op] object TextStats {
/**
* Helper function to add two maps subject to a max cardinality restriction on the number of unique values
*
* @param totalMap Current accumulated map
* @param mapToAdd Additional map to add the to accumulated one
* @param maxCardinality Maximum number of unique keys to keep track of (stop counting once this is hit)
* @tparam T Type parameter for the keys
* @return Newly accumulated map subject to the key cardinality constraints
*/
def additionHelper[T](totalMap: Map[T, Long], mapToAdd: Map[T, Long], maxCardinality: Int): Map[T, Long] = {
if (totalMap.size > maxCardinality) totalMap
else if (mapToAdd.size > maxCardinality) mapToAdd
else totalMap + mapToAdd
}

def monoid(maxCardinality: Int): Monoid[TextStats] = new Monoid[TextStats] {
override def plus(l: TextStats, r: TextStats): TextStats = {
if (l.valueCounts.size > maxCardinality) l
else if (r.valueCounts.size > maxCardinality) r
else TextStats(l.valueCounts + r.valueCounts)
val newValueCounts = additionHelper(l.valueCounts, r.valueCounts, maxCardinality)
val newLengthCounts = additionHelper(l.lengthCounts, r.lengthCounts, maxCardinality)
TextStats(newValueCounts, newLengthCounts)
}

override def zero: TextStats = TextStats.empty
}

def empty: TextStats = TextStats(Map.empty)
def empty: TextStats = TextStats(Map.empty, Map.empty)
}

/**
* Arguments for [[SmartTextVectorizerModel]]
*
* @param isCategorical is feature a categorical or not
* @param topValues top values to each feature
* @param shouldCleanText should clean text value
* @param shouldTrackNulls should track nulls
* @param hashingParams hashing function params
* @param vectorizationMethods method to use for text vectorization (either pivot, hashing, or ignoring)
* @param isCategorical is feature a categorical or not
* @param isIgnorable is a text feature that we think is ignorable? high cardinality + low length variance
* @param topValues top values to each feature
* @param shouldCleanText should clean text value
* @param shouldTrackNulls should track nulls
* @param hashingParams hashing function params
*/
case class SmartTextVectorizerModelArgs
(
isCategorical: Array[Boolean],
vectorizationMethods: Array[TextVectorizationMethod],
topValues: Array[Seq[String]],
shouldCleanText: Boolean,
shouldTrackNulls: Boolean,
hashingParams: HashingFunctionParams
) extends JsonLike {
def categoricalTopValues: Array[Seq[String]] =
topValues.zip(isCategorical).collect { case (top, true) => top }
def categoricalTopValues: Array[Seq[String]] = {
topValues.zip(vectorizationMethods.map(_ == TextVectorizationMethod.Pivot)).collect { case (top, true) => top }
}
}

final class SmartTextVectorizerModel[T <: Text] private[op]
Expand All @@ -222,19 +262,28 @@ final class SmartTextVectorizerModel[T <: Text] private[op]
shouldTrackNulls = args.shouldTrackNulls
)
(row: Seq[Text]) => {
val (rowCategorical, rowText) = SmartTextVectorizer.partition[Text](row.toArray, args.isCategorical)
val categoricalVector: OPVector = categoricalPivotFn(rowCategorical)
val textTokens: Seq[TextList] = rowText.map(tokenize(_).tokens)
val groups = row.toArray.zip(args.vectorizationMethods).groupBy(_._2)
val textToPivot = groups.getOrElse(TextVectorizationMethod.Pivot, Array.empty).map(_._1)
val textToIgnore = groups.getOrElse(TextVectorizationMethod.Ignore, Array.empty).map(_._1)
val textToHash = groups.getOrElse(TextVectorizationMethod.Hash, Array.empty).map(_._1)

val categoricalVector: OPVector = categoricalPivotFn(textToPivot)
val textTokens: Seq[TextList] = textToHash.map(tokenize(_).tokens)
val ignorableTextTokens: Seq[TextList] = textToIgnore.map(tokenize(_).tokens)
val textVector: OPVector = hash[TextList](textTokens, getTextTransientFeatures, args.hashingParams)
val textNullIndicatorsVector = if (args.shouldTrackNulls) getNullIndicatorsVector(textTokens) else OPVector.empty
val textLenVector = if ($(trackTextLen)) getLenVector(textTokens) else OPVector.empty
val textNullIndicatorsVector = if (args.shouldTrackNulls) {
getNullIndicatorsVector(textTokens ++ ignorableTextTokens)
} else OPVector.empty
val textLenVector = if ($(trackTextLen)) getLenVector(textTokens ++ ignorableTextTokens) else OPVector.empty

categoricalVector.combine(textVector, textLenVector, textNullIndicatorsVector)
}
}

private def getTextTransientFeatures: Array[TransientFeature] =
SmartTextVectorizer.partition[TransientFeature](getTransientFeatures(), args.isCategorical)._2
getTransientFeatures().zip(args.vectorizationMethods).collect {
case (tf, method) if method != TextVectorizationMethod.Pivot => tf
}

private def getNullIndicatorsVector(textTokens: Seq[TextList]): OPVector = {
val nullIndicators = textTokens.map { tokens =>
Expand All @@ -261,3 +310,15 @@ trait MaxCardinalityParams extends Params {
final def getMaxCardinality: Int = $(maxCardinality)
setDefault(maxCardinality -> SmartTextVectorizer.MaxCardinality)
}

trait MinLengthStdDevParams extends Params {
final val minLengthStdDev = new DoubleParam(
parent = this, name = "minLengthStdDev",
doc = "minimum standard deviation of the lengths of tokens in a text field for it to be hashed instead " +
"of ignored",
isValid = ParamValidators.inRange(lowerBound = 0, upperBound = 100)
)
final def setMinLengthStdDev(v: Double): this.type = set(minLengthStdDev, v)
final def getMinLengthStdDev: Double = $(minLengthStdDev)
setDefault(minLengthStdDev -> SmartTextVectorizer.MinTextLengthStdDev)
}
Loading

0 comments on commit b4a2116

Please sign in to comment.