Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary sequence estimator #84

Merged
merged 8 commits into from
Aug 28, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,46 @@ case object FeatureSparkTypes {
}
}

def udf2N[I1 <: FeatureType : TypeTag, I2 <: FeatureType : TypeTag, O <: FeatureType : TypeTag]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs

(
f: (I1, Seq[I2]) => O
): UserDefinedFunction = {
val outputType = FeatureSparkTypes.sparkTypeOf[O]
// Converters MUST be defined outside the result function since they involve reflection calls
val convertI1 = FeatureTypeSparkConverter[I1]()
val convertI2 = FeatureTypeSparkConverter[I2]()
val func = (r: Row) => {
val arr = new ArrayBuffer[I2](r.length-1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add spaces r.length - 1

val i1: I1 = convertI1.fromSpark(r.get(0))
var i = 1
while (i < r.length) {
arr += convertI2.fromSpark(r.get(i))
i += 1
}
FeatureTypeSparkConverter.toSpark(f(i1, arr))
}
UserDefinedFunction(func, outputType, inputTypes = None)
}

def transform2N[I1 <: FeatureType : TypeTag, I2 <: FeatureType : TypeTag, O <: FeatureType: TypeTag]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs here as well

(
f: (I1, Seq[I2]) => O
): (Any, Array[Any]) => Any = {
// Converters MUST be defined outside the result function since they involve reflection calls
val convertI1 = FeatureTypeSparkConverter[I1]()
val convertI2 = FeatureTypeSparkConverter[I2]()
(in1: Any, r: Array[Any]) => {
val i1: I1 = convertI1.fromSpark(in1)
val arr = new ArrayBuffer[I2](r.length)
var i = 0
while (i < r.length) {
arr += convertI2.fromSpark(r(i))
i += 1
}
FeatureTypeSparkConverter.toSpark(f(i1, arr))
}
}

/**
* Create a sql [[Column]] instance of a feature
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,8 @@ trait HasInN {
final protected def inN: Array[TransientFeature] = getTransientFeatures()
}

trait HasIn1PlusN extends HasIn1 {
self: OpPipelineStageBase =>
final protected def inN: Array[TransientFeature] = getTransientFeatures.tail
}

Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,43 @@ trait OpPipelineStageN[I <: FeatureType, O <: FeatureType] extends OpPipelineSta
)(tto)
}

/**
* Pipeline stage of single Feature of type I1 with multiple Features of type I2 to output 1Feature of type O
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo 1Feature

*
* @tparam I1 input single feature type
* @tparam I2 input sequence feature type
* @tparam O output feature type
*/
trait OpPipelineStage2N[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType] extends OpPipelineStage[O]
with HasIn1PlusN {
self: PipelineStage =>

implicit val tto: TypeTag[O]
implicit val ttov: TypeTag[O#Value]

final override type InputFeatures = (FeatureLike[I1], Array[FeatureLike[I2]])

final override def checkInputLength(features: Array[_]): Boolean = features.length > 0

final override def inputAsArray(in: InputFeatures): Array[OPFeature] = {
Array(in._1) ++ in._2.asInstanceOf[Array[OPFeature]]
}

final def setInput(feature: FeatureLike[I1], features: FeatureLike[I2]*): this.type =
super.setInput(feature, features.toArray)

protected[op] override def outputFeatureUid: String = FeatureUID[O](uid)

override def getOutput(): FeatureLike[O] = new Feature[O](
uid = outputFeatureUid,
name = getOutputFeatureName,
originStage = this,
isResponse = outputIsResponse,
parents = getInputFeatures()
)(tto)
}


/**
* Trait to mix into transformers that indicates their transform functions can be combined into a single stage
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.stages.base.sequence

import com.salesforce.op.features.FeatureSparkTypes
import com.salesforce.op.features.types.{FeatureType, FeatureTypeSparkConverter}
import com.salesforce.op.stages._
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Dataset, Encoder, Encoders}
import org.apache.spark.util.ClosureUtils

import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

/**
* Takes an input feature of type I1 and a sequence of input features of type I2 and performs
* a fit operation in order to define a transformation for those (or similar) features. This
* abstract class should be extended when settable parameters are needed within
* the fit function
*
* @param operationName unique name of the operation this stage performs
* @param uid uid for instance
* @param tti1 type tag for input1
* @param tti2 type tag for input2
* @param tto type tag for input
* @param tti1v type tag for input1 value
* @param tti2v type tag for input2 value
* @param ttov type tag for output value
* @tparam I1 input single feature type
* @tparam I2 input sequence feature type
* @tparam O output feature type
*/
abstract class BinarySequenceEstimator[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType]
(
val operationName: String,
val uid: String
)(
implicit val tti1: TypeTag[I1],
val tti2: TypeTag[I2],
val tto: TypeTag[O],
val tti1v: TypeTag[I1#Value],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename tti1v -> ttiv1 and tti2v -> ttiv2 (since we have those in other classes as well) + update the docs

val tti2v: TypeTag[I2#Value],
val ttov: TypeTag[O#Value]
) extends Estimator[BinarySequenceModel[I1, I2, O]] with OpPipelineStage2N[I1, I2, O] {

// Encoders & converters
implicit val i1Encoder: Encoder[I1#Value] = FeatureSparkTypes.featureTypeEncoder[I1]
implicit val seqIEncoder: Encoder[Seq[I2#Value]] = Encoders.kryo[Seq[I2#Value]]
implicit val tupleEncoder = Encoders.tuple[I1#Value, Seq[I2#Value]](i1Encoder, seqIEncoder)
val convertI1 = FeatureTypeSparkConverter[I1]()
val seqIConvert = FeatureTypeSparkConverter[I2]()

/**
* Function that fits the sequence model
*/
def fitFn(dataset: Dataset[(I1#Value, Seq[I2#Value])]): BinarySequenceModel[I1, I2, O]

/**
* Check if the stage is serializable
*
* @return Failure if not serializable
*/
final override def checkSerializable: Try[Unit] = ClosureUtils.checkSerializable(fitFn _)

/**
* Spark operation on dataset to produce Dataset
* for constructor fit function and then turn output function into a Model
*
* @param dataset input data for this stage
* @return a fitted model that will perform the transformation specified by the function defined in constructor fit
*/
override def fit(dataset: Dataset[_]): BinarySequenceModel[I1, I2, O] = {
assert(inN.nonEmpty, "Inputs cannot be empty")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps getTransientFeatures.nonEmpty?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a check specifically on the sequence input getTransientFeatures will be nonempty

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then getTransientFeatures.size > 1, so it will include the check of existence of in1 as well

setInputSchema(dataset.schema).transformSchema(dataset.schema)

val seqColumns = inN.map(feature => col(feature.name))
val columns = Array(col(in1.name)) ++ seqColumns

val df = dataset.select(columns: _*)
val ds = df.map(r => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be using the converters here instead:

val ds = df.map { r =>
   val rowSeq = r.toSeq
   (convertI1.fromSpark(rowSeq.head), rowSeq.tail.map(seqIConvert.fromSpark(_).value))
}

val arr = new ArrayBuffer[I2](r.length-1)
var i = 1
while (i < r.length) {
arr += seqIConvert.fromSpark(r.get(i))
i += 1
}
(convertI1.fromSpark(r.get(0)).value, arr.map(_.value).toSeq)
})
val model = fitFn(ds)

model
.setParent(this)
.setInput(in1.asFeatureLike[I1], inN.map(_.asFeatureLike[I2]): _*)
.setMetadata(getMetadata())
.setOutputFeatureName(getOutputFeatureName)
}

}


/**
* Extend this class and return it from your [[SequenceEstimator]] fit function.
* Takes a sequence of input features of the same type and produces a single
* new output feature using the specified function. Performs row wise transformation specified in transformFn.
*
* @param operationName unique name of the operation this stage performs
* @param uid uid for instance
* @param tti1 type tag for input1
* @param tti2 type tag for input2
* @param tto type tag for output
* @param ttov type tag for output value
* @tparam I1 input1 type
* @tparam I2 input2 type
* @tparam O output type
*/
abstract class BinarySequenceModel[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType]
(
val operationName: String,
val uid: String
)(
implicit val tti1: TypeTag[I1],
val tti2: TypeTag[I2],
val tto: TypeTag[O],
val ttov: TypeTag[O#Value]
) extends Model[BinarySequenceModel[I1, I2, O]] with OpTransformer2N[I1, I2, O]
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.stages.base.sequence

import com.salesforce.op.UID
import com.salesforce.op.features.FeatureSparkTypes
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.stages.{OpPipelineStage2N, OpTransformer}
import org.apache.spark.ml.Transformer
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.util.ClosureUtils
import org.apache.spark.sql.functions._

import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

trait OpTransformer2N[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs

extends Transformer with OpPipelineStage2N[I1, I2, O] with OpTransformer {

implicit val tti1: TypeTag[I1]
implicit val tti2: TypeTag[I2]

/**
* Function used to convert input to output
*/
def transformFn: (I1, Seq[I2]) => O

/**
* Check if the stage is serializable
*
* @return Failure if not serializable
*/
final override def checkSerializable: Try[Unit] = ClosureUtils.checkSerializable(transformFn)

/**
* Spark operation on dataset to produce new output feature column using defined function
*
* @param dataset input data for this stage
* @return a new dataset containing a column for the transformed feature
*/
override def transform(dataset: Dataset[_]): DataFrame = {
assert(inN.nonEmpty, "Inputs cannot be empty")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps getTransientFeatures.nonEmpty here as well?

val newSchema = setInputSchema(dataset.schema).transformSchema(dataset.schema)
val functionUDF = FeatureSparkTypes.udf2N[I1, I2, O](transformFn)
val meta = newSchema(getOutputFeatureName).metadata
val columns = getTransientFeatures().map(in => dataset.col(in.name))
dataset.select(col("*"), functionUDF(struct(columns: _*)).as(getOutputFeatureName, meta))
}

private val transformNFn = FeatureSparkTypes.transform2N[I1, I2, O](transformFn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private lazy val

override def transformKeyValue: KeyValue => Any = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what should be lazy here. Please clarify.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikeloh77 you dont want to know 😛 but if you are curious, mainly because of SelectedModel class in ModelSelector

val inName1 = in1.name
val inNames = inN.map(_.name)
(kv: KeyValue) => transformNFn(kv(inName1), inNames.map(name => kv(name)))
}

}

/**
* Transformer that takes a single feature of type I1 and a sequence of features o type I2 and produces
* a single new output feature using the specified function. Performs row wise transformation specified
* in transformFn. This abstract class should be extended when settable parameters are needed within
* the transform function.
*
* @param operationName unique name of the operation this stage performs
* @param uid uid for instance
* @param tti1 type tag for single input
* @param tti2 type tag for sequence input
* @param tto type tag for output
* @param ttov type tag for output value
* @tparam I1 input single feature type
* @tparam I2 input sequence feature type
* @tparam O output feature type
*/
abstract class BinarySequenceTransformer[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType]
(
val operationName: String,
val uid: String
)(
implicit val tti1: TypeTag[I1],
val tti2: TypeTag[I2],
val tto: TypeTag[O],
val ttov: TypeTag[O#Value]
) extends OpTransformer2N[I1, I2, O]

/**
* Transformer that takes a single feature of type I1 and a sequence of features of type I2 and produces
* a single new output feature using the specified function. Performs row wise transformation specified
* in transformFn. This class should be extended when no parameters are needed within the transform function.
*
* @param operationName unique name of the operation this stage performs
* @param transformFn function used to convert input to output
* @param uid uid for instance
* @param tti1 type tag for single input
* @param tti2 type tag for sequence input
* @param tto type tag for output
* @param ttov type tag for output value
* @tparam I1 input single feature type
* @tparam I2 input sequence feature type
* @tparam O output feature type
*/
final class BinarySequenceLambdaTransformer[I1 <: FeatureType, I2 <: FeatureType, O <: FeatureType]
(
operationName: String,
val transformFn: (I1, Seq[I2]) => O,
uid: String = UID[BinarySequenceLambdaTransformer[I1, I2, O]]
)(
implicit tti1: TypeTag[I1],
tti2: TypeTag[I2],
tto: TypeTag[O],
ttov: TypeTag[O#Value]
) extends BinarySequenceTransformer[I1, I2, O](operationName = operationName, uid = uid)
Loading