Skip to content

Commit

Permalink
Bug fixes and optimisations
Browse files Browse the repository at this point in the history
1. Critical fix to Min-max scaling pipes in DynaML, feature scaling was
done with a GaussianScaler object.
2. Used scaly-streams `optimize` method
3. Added script for playing around with kernel GP models
  • Loading branch information
mandar2812 committed Dec 16, 2016
1 parent 527349b commit 3671f35
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 55 deletions.
14 changes: 3 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ packageDescription := "DynaML is a scala library/repl for implementing and worki
"which can be extended easily to implement advanced models for small and large scale applications.\n\n"+
"But the library can also be used as an educational/research tool for data analysis."

val mainVersion = "v1.4.1-beta.7"
val mainVersion = "v1.4.1-beta.8"

val dataDirectory = settingKey[File]("The directory holding the data files for running example scripts")

Expand All @@ -33,17 +33,9 @@ lazy val commonSettings = Seq(
linearAlgebraDependencies ++ chartsDependencies ++
tinkerpopDependency ++ notebookInterfaceDependency ++
openMLDependency ++ rejinDependency ++
rPackages ++ cppCompatDependencies)/*,
rPackages ++ cppCompatDependencies),

scalacOptions ++= Seq("-Xplugin-require:scalaxy-streams", "-optimise", "-Yclosure-elim", "-Yinline"),
scalacOptions in Test ~= (_ filterNot (_ == "-Xplugin-require:scalaxy-streams")),
scalacOptions in Test += "-Xplugin-disable:scalaxy-streams",
autoCompilerPlugins := true,
addCompilerPlugin("com.nativelibs4java" % "scalaxy-streams_2.11" % "0.3.4")*/
scalacOptions ++= Seq("-optimise", "-Yclosure-elim", "-Yinline")
)

lazy val pipes = (project in file("dynaml-pipes")).settings(baseSettings:_*)
Expand Down
16 changes: 16 additions & 0 deletions dynaml-core/scripts/kernelModels.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
implicit val ev = VectorField(6)
implicit val sp = genericReplicationEncoder[DenseVector[Double]](2)

val sp1 = breezeDVSplitEncoder(2)
val kernel = new LaplacianKernel(1.5)

val other_kernel = new RBFKernel(4.5)
val other_kernel1 = new CauchyKernel(1.0)

val otherSumK = kernel + other_kernel
val sumK2 = new DecomposableCovariance(otherSumK, other_kernel1)(sp1)

AbottPowerPlant(sumK2, new DiracKernel(0.09),
opt = Map("globalOpt" -> "GS", "grid" -> "1", "step" -> "0.004"),
num_training = 3000, num_test = 1000, deltaT = 2, column = 7)

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import io.github.mandar2812.dynaml.wavelets.{GroupedHaarWaveletFilter, HaarWavel
import org.apache.log4j.Logger
import org.renjin.script.RenjinScriptEngine
import org.renjin.sexp._

import scalaxy.streams.optimize
import scala.reflect.ClassTag

/**
Expand Down Expand Up @@ -254,7 +254,7 @@ object DynaMLPipe {
* (Stream(training data), Stream(test data))
* */
@deprecated("*Standardization pipes are deprecated as of v1.4,"+
" use pipes that output scaler objects instead")
" use pipes that output io.github.mandar2812.dynaml.pipes.Scaler objects instead")
val trainTestGaussianStandardization =
DataPipe((trainTest: (Stream[(DenseVector[Double], Double)],
Stream[(DenseVector[Double], Double)])) => {
Expand Down Expand Up @@ -286,7 +286,7 @@ object DynaMLPipe {
* (Stream(training data), Stream(test data))
* */
@deprecated("*Standardization pipes are deprecated as of v1.4,"+
" use pipes that output scaler objects instead")
" use pipes that output io.github.mandar2812.dynaml.pipes.Scaler objects instead")
val featuresGaussianStandardization =
DataPipe((trainTest: (Stream[(DenseVector[Double], Double)],
Stream[(DenseVector[Double], Double)])) => {
Expand Down Expand Up @@ -315,7 +315,7 @@ object DynaMLPipe {
* (Stream(training data), Stream(test data))
* */
@deprecated("*Standardization pipes are deprecated as of v1.4,"+
" use pipes that output scaler objects instead")
" use pipes that output io.github.mandar2812.dynaml.pipes.Scaler objects instead")
val trainTestGaussianStandardizationMO =
DataPipe((trainTest: (Stream[(DenseVector[Double], DenseVector[Double])],
Stream[(DenseVector[Double], DenseVector[Double])])) => {
Expand Down Expand Up @@ -361,9 +361,9 @@ object DynaMLPipe {
math.sqrt(v/(trainTest.length.toDouble - 1.0)))


val featuresScaler = new GaussianScaler(mean(0 until num_features), stdDev(0 until num_features))
val featuresScaler = GaussianScaler(mean(0 until num_features), stdDev(0 until num_features))

val targetsScaler = new GaussianScaler(
val targetsScaler = GaussianScaler(
mean(num_features until num_features + num_targets),
stdDev(num_features until num_features + num_targets))

Expand All @@ -386,11 +386,11 @@ object DynaMLPipe {
val (m, sigma) = utils.getStatsMult(trainTest.map(tup =>
DenseVector(tup._1.toArray ++ tup._2.toArray)).toList)

val featuresScaler = new MVGaussianScaler(
val featuresScaler = MVGaussianScaler(
m(0 until num_features),
sigma(0 until num_features, 0 until num_features))

val targetsScaler = new MVGaussianScaler(
val targetsScaler = MVGaussianScaler(
m(num_features until num_features + num_targets),
sigma(num_features until num_features + num_targets, num_features until num_features + num_targets))

Expand Down Expand Up @@ -419,9 +419,9 @@ object DynaMLPipe {
math.sqrt(v/(trainTest._1.length.toDouble - 1.0)))


val featuresScaler = new GaussianScaler(mean(0 until num_features), stdDev(0 until num_features))
val featuresScaler = GaussianScaler(mean(0 until num_features), stdDev(0 until num_features))

val targetsScaler = new GaussianScaler(
val targetsScaler = GaussianScaler(
mean(num_features until num_features + num_targets),
stdDev(num_features until num_features + num_targets))

Expand All @@ -445,11 +445,11 @@ object DynaMLPipe {
val (m, sigma) = utils.getStatsMult(trainTest._1.map(tup =>
DenseVector(tup._1.toArray ++ tup._2.toArray)).toList)

val featuresScaler = new MVGaussianScaler(
val featuresScaler = MVGaussianScaler(
m(0 until num_features),
sigma(0 until num_features, 0 until num_features))

val targetsScaler = new MVGaussianScaler(
val targetsScaler = MVGaussianScaler(
m(num_features until num_features + num_targets),
sigma(num_features until num_features + num_targets, num_features until num_features + num_targets))

Expand All @@ -475,9 +475,9 @@ object DynaMLPipe {
val (min, max) = utils.getMinMax(trainTest.map(tup =>
DenseVector(tup._1.toArray ++ tup._2.toArray)).toList)

val featuresScaler = new GaussianScaler(min(0 until num_features), max(0 until num_features))
val featuresScaler = MinMaxScaler(min(0 until num_features), max(0 until num_features))

val targetsScaler = new MinMaxScaler(
val targetsScaler = MinMaxScaler(
min(num_features until num_features + num_targets),
max(num_features until num_features + num_targets))

Expand All @@ -501,9 +501,9 @@ object DynaMLPipe {
val (min, max) = utils.getMinMax(trainTest._1.map(tup =>
DenseVector(tup._1.toArray ++ tup._2.toArray)).toList)

val featuresScaler = new GaussianScaler(min(0 until num_features), max(0 until num_features))
val featuresScaler = MinMaxScaler(min(0 until num_features), max(0 until num_features))

val targetsScaler = new MinMaxScaler(
val targetsScaler = MinMaxScaler(
min(num_features until num_features + num_targets),
max(num_features until num_features + num_targets))

Expand Down Expand Up @@ -572,9 +572,13 @@ object DynaMLPipe {
* put them back together.
* */
val breezeDVSplitEncoder = (n: Int) => Encoder((v: DenseVector[Double]) => {
v.toArray.grouped(n).map(DenseVector(_)).toArray
optimize {
v.toArray.grouped(n).map(DenseVector(_)).toArray
}
}, (vs: Array[DenseVector[Double]]) => {
DenseVector(vs.map(_.toArray).reduceLeft((a,b) => a++b))
optimize {
DenseVector(vs.map(_.toArray).reduceLeft((a,b) => a++b))
}
})

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.github.mandar2812.dynaml.kernels

import scalaxy.streams.optimize
import scala.reflect.ClassTag
import breeze.linalg.DenseMatrix
import io.github.mandar2812.dynaml.DynaMLPipe
import io.github.mandar2812.dynaml.algebra.PartitionedPSDMatrix
import io.github.mandar2812.dynaml.pipes._

import scala.reflect.ClassTag


/**
* Scalar Kernel defines algebraic behavior for kernels of the form
* K: Index x Index -> Double, i.e. kernel functions whose output
Expand Down Expand Up @@ -129,10 +128,14 @@ class DecomposableCovariance[S](kernels: LocalScalarKernel[S]*)(

override def evaluate(x: S, y: S): Double = {
val (xs, ys) = (encoding*encoding)((x,y))
reducer(xs.zip(ys).zip(kernels).map(coupleAndKern => {
val (u,v) = coupleAndKern._1
coupleAndKern._2.evaluate(u,v)
}))
reducer(
optimize {
xs.zip(ys).zip(kernels).map(coupleAndKern => {
val (u,v) = coupleAndKern._1
coupleAndKern._2.evaluate(u,v)
})
}
)
}

override def gradient(x: S, y: S): Map[String, Double] = reducer match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.annotation.tailrec
import scala.util.matching.Regex
import sys.process._
import java.net.URL

import scalaxy.streams.optimize
import spire.algebra.Field

/**
Expand Down Expand Up @@ -210,8 +210,8 @@ package object utils {

def strReplace(fileName: String)
(findStringRegex: String, replaceString: String)
: Stream[String] = textFileToStream(fileName)
.map(replace(findStringRegex)(replaceString))
: Stream[String] = optimize {textFileToStream(fileName)
.map(replace(findStringRegex)(replaceString))}

def writeToFile(destination: String)(lines: Stream[String]): Unit = {
val writer = new BufferedWriter(new FileWriter(new File(destination)))
Expand All @@ -222,23 +222,25 @@ package object utils {
}

def transformData(transform: (String) => String)(lines: Stream[String]): Stream[String] =
lines.map(transform)
optimize { lines.map(transform) }

def extractColumns(lines: Stream[String], sep: String,
columns: List[Int], naStrings:Map[Int, String]): Stream[String] = {
val tFunc = (line: String) => {
val fields = line.split(sep)

val newFields:List[String] = columns.map(col => {
if (!naStrings.contains(col) || fields(col) != naStrings(col)) fields(col)
else "<NA>"
})
optimize {
val newFields:List[String] = columns.map(col => {
if (!naStrings.contains(col) || fields(col) != naStrings(col)) fields(col)
else "<NA>"
})

val newLine = newFields.foldLeft("")(
(str1, str2) => str1+sep+str2
)
val newLine = newFields.foldLeft("")(
(str1, str2) => str1+sep+str2
)

newLine.tail
newLine.tail
}
}

transformData(tFunc)(lines)
Expand Down Expand Up @@ -301,9 +303,11 @@ package object utils {
def isSymmetricMatrix[V](mat: Matrix[V]): Unit = {
isSquareMatrix(mat)

for (i <- 0 until mat.rows; j <- 0 until i)
if (mat(i,j) != mat(j,i))
throw new MatrixNotSymmetricException
optimize {
for (i <- 0 until mat.rows; j <- 0 until i)
if (mat(i,j) != mat(j,i))
throw new MatrixNotSymmetricException
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.github.mandar2812.dynaml.pipes

import scalaxy.streams._

/**
* @author mandar2812 17/6/16.
*
Expand All @@ -8,7 +10,7 @@ package io.github.mandar2812.dynaml.pipes
*/
trait Scaler[S] extends DataPipe[S, S]{
def apply[T[S] <: Traversable[S]](data: T[S]) =
data.map(run _).asInstanceOf[T[S]]
optimize { data.map(run).asInstanceOf[T[S]] }

def *[T](that: Scaler[T]) = {
val firstRun = this.run _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ specific language governing permissions and limitations
under the License.
* */
package io.github.mandar2812.dynaml.pipes
import scalaxy.streams.optimize


/**
* @author mandar2812 on 17/11/15.
Expand Down Expand Up @@ -44,15 +46,15 @@ trait StreamDataPipe[I, J, K] extends DataPipe[Stream[I], K]{
}

trait StreamMapPipe[I, J] extends StreamDataPipe[I, J, Stream[J]] {
override def run(data: Stream[I]): Stream[J] = data.map(pipe)
override def run(data: Stream[I]): Stream[J] = optimize { data.map(pipe) }
}

trait StreamFilterPipe[I] extends StreamDataPipe[I, Boolean, Stream[I]] {
override def run(data: Stream[I]): Stream[I] = data.filter(pipe)
override def run(data: Stream[I]): Stream[I] = optimize { data.filter(pipe) }
}

trait StreamPartitionPipe[I] extends StreamDataPipe[I, Boolean, (Stream[I], Stream[I])] {
override def run(data: Stream[I]): (Stream[I], Stream[I]) = data.partition(pipe)
override def run(data: Stream[I]): (Stream[I], Stream[I]) = optimize { data.partition(pipe) }
}

trait StreamSideEffectPipe[I] extends StreamDataPipe[I, Unit, Unit] {
Expand Down

0 comments on commit 3671f35

Please sign in to comment.