From a3d8b2847c77945d6c97aeedfb979030c874ae41 Mon Sep 17 00:00:00 2001 From: Ventura Del Monte Date: Wed, 29 Mar 2017 16:20:44 +0200 Subject: [PATCH] [SOLMA] Initial Commit --- .gitattributes | 3 + .github/CONTRIBUTING.md | 11 + .github/PULL_REQUEST_TEMPLATE.md | 17 + .gitignore | 36 ++ README.md | 1 + pom.xml | 333 ++++++++++++++++++ .../java/eu/proteus/annotations/Proteus.java | 33 ++ .../proteus/solma/fd/FrequentDirections.scala | 191 ++++++++++ .../solma/moments/MomentsEstimator.scala | 150 ++++++++ src/main/scala/eu/proteus/solma/package.scala | 44 +++ .../pipeline/ChainedStreamPredictor.scala | 70 ++++ .../pipeline/ChainedStreamTransformer.scala | 75 ++++ .../solma/pipeline/StreamEstimator.scala | 111 ++++++ .../solma/pipeline/StreamPredictor.scala | 101 ++++++ .../solma/pipeline/StreamTransformer.scala | 95 +++++ .../sampling/SimpleReservoirSampling.scala | 116 ++++++ .../proteus/solma/utils/FlinkSolmaUtils.scala | 68 ++++ .../solma/fd/FrequentDirectionsITSuite.scala | 178 ++++++++++ .../moments/MomentsEstimatorITSuite.scala | 127 +++++++ .../sampling/ReservoirSamplingITSuite.scala | 103 ++++++ .../proteus/solma/utils/FlinkTestBase.scala | 49 +++ tools/maven/checkstyle.xml | 159 +++++++++ tools/maven/scalastyle-config.xml | 146 ++++++++ tools/maven/suppressions.xml | 28 ++ 24 files changed, 2245 insertions(+) create mode 100644 .gitattributes create mode 100644 .github/CONTRIBUTING.md create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 .gitignore create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/eu/proteus/annotations/Proteus.java create mode 100644 src/main/scala/eu/proteus/solma/fd/FrequentDirections.scala create mode 100644 src/main/scala/eu/proteus/solma/moments/MomentsEstimator.scala create mode 100644 src/main/scala/eu/proteus/solma/package.scala create mode 100644 src/main/scala/eu/proteus/solma/pipeline/ChainedStreamPredictor.scala create mode 100644 src/main/scala/eu/proteus/solma/pipeline/ChainedStreamTransformer.scala create mode 100644 src/main/scala/eu/proteus/solma/pipeline/StreamEstimator.scala create mode 100644 src/main/scala/eu/proteus/solma/pipeline/StreamPredictor.scala create mode 100644 src/main/scala/eu/proteus/solma/pipeline/StreamTransformer.scala create mode 100644 src/main/scala/eu/proteus/solma/sampling/SimpleReservoirSampling.scala create mode 100644 src/main/scala/eu/proteus/solma/utils/FlinkSolmaUtils.scala create mode 100644 src/test/scala/eu/proteus/solma/fd/FrequentDirectionsITSuite.scala create mode 100644 src/test/scala/eu/proteus/solma/moments/MomentsEstimatorITSuite.scala create mode 100644 src/test/scala/eu/proteus/solma/sampling/ReservoirSamplingITSuite.scala create mode 100644 src/test/scala/eu/proteus/solma/utils/FlinkTestBase.scala create mode 100644 tools/maven/checkstyle.xml create mode 100644 tools/maven/scalastyle-config.xml create mode 100644 tools/maven/suppressions.xml diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..ecc9cf2 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,3 @@ +*.bat text eol=crlf +flink-runtime-web/web-dashboard/web/* linguist-vendored -diff + diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md new file mode 100644 index 0000000..ac0b373 --- /dev/null +++ b/.github/CONTRIBUTING.md @@ -0,0 +1,11 @@ +# How to contribute to Apache Flink + +Thank you for your intention to contribute to the Apache Flink project. As an open-source community, we highly appreciate external contributions to our project. + +To make the process smooth for the project *committers* (those who review and accept changes) and *contributors* (those who propose new changes via pull requests), there are a few rules to follow. + +## Contribution Guidelines + +Please check out the [How to Contribute guide](http://flink.apache.org/how-to-contribute.html) to understand how contributions are made. +A detailed explanation can be found in our [Contribute Code Guide](http://flink.apache.org/contribute-code.html) which also contains a list of coding guidelines that you should follow. +For pull requests, there is a [check list](PULL_REQUEST_TEMPLATE.md) with criteria taken from the How to Contribute Guide and the Coding Guidelines. diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..5f4c2fe --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,17 @@ +Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. +If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). +In addition to going through the list, please provide a meaningful description of your changes. + +- [ ] General + - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") + - The pull request addresses only one issue + - Each commit in the PR has a meaningful commit message (including the JIRA id) + +- [ ] Documentation + - Documentation has been added for new functionality + - Old documentation affected by the pull request has been updated + - JavaDoc for public methods has been added + +- [ ] Tests & Build + - Functionality added by the pull request is covered by tests + - `mvn clean verify` has been executed successfully locally or a Travis build has passed diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c9e4e8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +.cache +scalastyle-output.xml +.classpath +.idea +.metadata +.settings +.project +.version.properties +filter.properties +logs.zip +target +tmp +*.class +*.iml +*.swp +*.jar +*.log +.DS_Store +build-target +flink-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/ +flink-runtime-web/web-dashboard/assets/fonts/ +flink-runtime-web/web-dashboard/node_modules/ +flink-runtime-web/web-dashboard/bower_components/ +atlassian-ide-plugin.xml +out/ +/docs/api +/docs/content +/docs/.bundle +/docs/.rubydeps +/docs/ruby2/.bundle +/docs/ruby2/.rubydeps +/docs/.jekyll-metadata +*.ipr +*.iws +tools/flink +tools/flink-* diff --git a/README.md b/README.md new file mode 100644 index 0000000..5bb29b1 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# proteus-solma diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..dcda547 --- /dev/null +++ b/pom.xml @@ -0,0 +1,333 @@ + + + + 4.0.0 + + + eu.proteus + proteus-solma_2.10 + proteus-solma + 0.1-SNAPSHOT + + jar + + + UTF-8 + UTF-8 + 1.7 + 2.0.1 + 1C + true + log4j-test.properties + 18.0 + 1.3-SNAPSHOT + 2.10.4 + 2.10 + 3.1.0 + 4.12 + 1.10.19 + 1.6.5 + 0.12 + log4j-test.properties + + + + + + + + + org.apache.flink + flink-streaming-scala_2.10 + ${flink.version} + + + + org.apache.flink + flink-scala_2.10 + ${flink.version} + + + + org.apache.flink + flink-ml_2.10 + ${flink.version} + + + + org.apache.flink + flink-streaming-contrib_2.10 + ${flink.version} + + + + org.scalanlp + breeze_${scala.binary.version} + ${breeze.version} + + + + com.github.fommil.netlib + all + 1.1.2 + pom + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-reflect + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.apache.commons + commons-math3 + 3.5 + + + + + + + org.scalatest + scalatest_${scala.binary.version} + 2.2.2 + test + + + + org.apache.flink + flink-clients_2.10 + ${flink.version} + test + + + + org.apache.flink + flink-test-utils_2.10 + ${flink.version} + test + + + + org.slf4j + slf4j-log4j12 + jar + test + + + + log4j + log4j + jar + test + + + + + + com.google.code.findbugs + jsr305 + + + + org.slf4j + slf4j-api + + + + + + + + + + com.google.code.findbugs + jsr305 + 1.3.9 + + + + org.slf4j + slf4j-api + 1.7.7 + + + + org.slf4j + slf4j-log4j12 + 1.7.7 + + + + log4j + log4j + 1.2.17 + + + + org.apache.commons + commons-lang3 + 3.3.2 + + + + org.reflections + reflections + 0.9.10 + test + + + + + + + + + windows + + + windows + + + + "(?<!(IT|Integration))(Test|Suite|Case)" + "(IT|Integration)(Test|Suite|Case)" + + + + default + + true + + + (?<!(IT|Integration))(Test|Suite|Case) + (IT|Integration)(Test|Suite|Case) + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + compile + testCompile + + + + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + W + + + + scala-test + + test + + + ${suffix.test} + -Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dlog.dir=${log.dir} -Dmvn.forkNumber=1 -XX:-UseGCOverheadLimit + + + + integration-test + integration-test + + test + + + ${suffix.it} + -Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dlog.dir=${log.dir} -Dmvn.forkNumber=1 -XX:-UseGCOverheadLimit + + + + + + + + org.scalastyle + scalastyle-maven-plugin + + ${project.basedir}/tools/maven/scalastyle-config.xml + + + + + maven-assembly-plugin + 2.4 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + default + package + + test-jar + + + + + + + + diff --git a/src/main/java/eu/proteus/annotations/Proteus.java b/src/main/java/eu/proteus/annotations/Proteus.java new file mode 100644 index 0000000..ee0138a --- /dev/null +++ b/src/main/java/eu/proteus/annotations/Proteus.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package eu.proteus.annotations; + +import org.apache.flink.annotation.Public; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * Interface to mark methods developed within PROTEUS EU H2020 project. + */ +@Documented +@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR }) +@Public +public @interface Proteus { +} diff --git a/src/main/scala/eu/proteus/solma/fd/FrequentDirections.scala b/src/main/scala/eu/proteus/solma/fd/FrequentDirections.scala new file mode 100644 index 0000000..e78d434 --- /dev/null +++ b/src/main/scala/eu/proteus/solma/fd/FrequentDirections.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.fd + +import breeze.linalg +import eu.proteus.annotations.Proteus +import eu.proteus.solma.pipeline.{StreamFitOperation, StreamTransformer, TransformDataStreamOperation} +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.streaming.api.scala._ +import eu.proteus.solma._ +import breeze.linalg.svd.{SVD => BreezeSVD} +import breeze.linalg.{DenseMatrix => BreezeDenseMatrix, Vector => BreezeVector} + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag + +@Proteus +class FrequentDirections extends StreamTransformer[FrequentDirections] { + + import FrequentDirections._ + + def setFeaturesNumber(count: Int): FrequentDirections = { + parameters.add(FeaturesNumber, count) + this + } + + def setSketchSize(size: Int): FrequentDirections = { + parameters.add(SketchSize, size) + this + } + + def enableAggregation(enabled: Boolean): FrequentDirections = { + parameters.add(AggregateSketches, enabled) + this + } + +} + +object FrequentDirections { + + // ====================================== Helpers ============================================= + + case class Sketch(queue: mutable.Queue[Int], matrix: BreezeDenseMatrix[Double]) + + // ====================================== Parameters ============================================= + + case object SketchSize extends Parameter[Int] { + override val defaultValue: Option[Int] = Some(1) + } + + case object AggregateSketches extends Parameter[Boolean] { + override val defaultValue: Option[Boolean] = Some(false) + } + + case object FeaturesNumber extends Parameter[Int] { + override val defaultValue: Option[Int] = Some(1) + } + + // ==================================== Factory methods ========================================== + + def apply(): FrequentDirections = { + new FrequentDirections() + } + + // ==================================== Operations ========================================== + + + implicit def fitNoOp[T] = { + new StreamFitOperation[FrequentDirections, T]{ + override def fit( + instance: FrequentDirections, + fitParameters: ParameterMap, + input: DataStream[T]) + : Unit = {} + } + } + + var ell, d: Int = -1 + + implicit def treansformFrequentDirections[T <: Vector : TypeInformation : ClassTag] = { + new TransformDataStreamOperation[FrequentDirections, T, T] { + override def transformDataStream( + instance: FrequentDirections, + transformParameters: ParameterMap, + input: DataStream[T]) + : DataStream[T] = { + val resultingParameters = instance.parameters ++ transformParameters + val statefulStream = FlinkSolmaUtils.ensureKeyedStream[T](input) + ell = resultingParameters(SketchSize) + d = resultingParameters(FeaturesNumber) + assert(ell < d * 2, "the sketch size should be smaller than twice the number of features") + val sketchesStream = statefulStream.flatMapWithState((in, state: Option[Sketch]) => { + val (elem, _) = in + val out = new ListBuffer[BreezeVector[Double]]() + val sketch = updateSketch(elem.asBreeze, state, out) + (out, Some(sketch)) + }) + if (resultingParameters(AggregateSketches)) { + sketchesStream.fold(None.asInstanceOf[Option[Sketch]])((acc, item) => { + Some(updateSketch(item, acc)) + }).flatMap((acc, out) => { + val Sketch(zeroRows, matrix) = acc.get + val toOutputType = (x: BreezeVector[Double]) => x.copy.fromBreeze.asInstanceOf[T] + if (zeroRows.isEmpty) { + (0 until matrix.rows) foreach (i => out collect toOutputType(matrix(i, ::).t)) + } + }) + } else { + sketchesStream.map(x => { + x.fromBreeze.asInstanceOf[T] + }) + } + } + } + } + + private [solma] def updateSketch( + elem: BreezeVector[Double], + state: Option[Sketch], + out: mutable.Buffer[BreezeVector[Double]] = null + ): Sketch = { + state match { + case Some(Sketch(zeroRows, matrix)) => { + if (zeroRows.isEmpty) { + // sketch is full + if (out != null) { + for (i <- 0 until matrix.rows) { + out += matrix(i, ::).t.copy + } + } + // perform compression + compressMatrix(matrix, zeroRows) + } + matrix(zeroRows.dequeue, ::) := elem.t + Sketch(zeroRows, matrix) + } + case None => { + val zeroRows = new mutable.Queue[Int]() + val matrix = BreezeDenseMatrix.zeros[Double](ell, d) + matrix(0, ::) := elem.t + for (i <- 1 until ell) { + zeroRows += i + } + Sketch(zeroRows, matrix) + } + } + } + + def compressMatrix(matrix: BreezeDenseMatrix[Double], zeroRows: mutable.Queue[Int]): Unit = { + val BreezeSVD(u, sigma, vt) = linalg.svd.reduced(matrix) + val delta = math.pow(sigma(math.floor(ell / 2).toInt), 2.0) + val sigmaMinusDeltaMatrix = BreezeDenseMatrix.zeros[Double](ell, d) + sigma.foreachPair((idx, s) => { + val x = math.pow(s, 2.0) - delta + sigmaMinusDeltaMatrix(idx, idx) = if (x < 0.0) { + 0.0 + } else { + math.sqrt(x) + } + }) + matrix := sigmaMinusDeltaMatrix * vt + val nzr = linalg.sum(matrix, linalg.Axis._1) + nzr.foreachPair((idx, v) => { + if (math.round(v) == 0) { + zeroRows += idx + } + }) + } + +} diff --git a/src/main/scala/eu/proteus/solma/moments/MomentsEstimator.scala b/src/main/scala/eu/proteus/solma/moments/MomentsEstimator.scala new file mode 100644 index 0000000..84466ba --- /dev/null +++ b/src/main/scala/eu/proteus/solma/moments/MomentsEstimator.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.moments + + +import eu.proteus.annotations.Proteus +import eu.proteus.solma.pipeline.{StreamFitOperation, StreamTransformer, TransformDataStreamOperation} +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.common.ParameterMap +import eu.proteus.solma._ +import org.apache.flink.streaming.api.scala._ +import breeze.linalg.{Vector => BreezeVector} + +import scala.collection.mutable + +@Proteus +class MomentsEstimator extends StreamTransformer[MomentsEstimator] { +} + +object MomentsEstimator { + + // ====================================== Parameters ============================================= + + + // ====================================== Extra ============================================= + + class Moments( + var counter: Long, + var currMean: BreezeVector[Double], + var M2: BreezeVector[Double]) { + + def this(x: BreezeVector[Double]) = this(1L, x.copy, BreezeVector.zeros[Double](x.size)) + + def process(x: BreezeVector[Double]): this.type = { + counter += 1L + val delta = x :- currMean + currMean :+= (delta :/ counter.toDouble) + val delta2 = x :- currMean + M2 :+= (delta :*= delta2) + this + } + + def mean: BreezeVector[Double] = { + currMean + } + + def variance: BreezeVector[Double] = { + M2 :/ (counter.toDouble - 1) + } + + def merge(that: Moments): this.type = { + val cnt = counter.toDouble + that.counter.toDouble + val delta = that.currMean :- currMean + val m_a = variance :* (counter.toDouble - 1) + val m_b = that.variance :* (that.counter.toDouble - 1) + currMean :*= counter.toDouble + currMean :+= (that.currMean :* that.counter.toDouble) + currMean :/= cnt + delta :^= 2.0 + delta :*= (counter.toDouble * that.counter.toDouble / cnt) + M2 := (m_a :+ m_b) :+= delta + counter += that.counter + this + } + + override def clone(): Moments = { + new Moments(counter, currMean.copy, M2.copy) + } + + override def toString: String = { + "[counter=" + counter + ",mean=" + mean.toString + ",variance=" + variance.toString + "]" + } + } + + // ==================================== Factory methods ========================================== + + def apply(): MomentsEstimator = { + new MomentsEstimator() + } + + // ==================================== Operations ========================================== + + implicit def fitNoOp[T] = { + new StreamFitOperation[MomentsEstimator, T]{ + override def fit( + instance: MomentsEstimator, + fitParameters: ParameterMap, + input: DataStream[T]) + : Unit = {} + } + } + + implicit def transformMomentsEstimators[T <: Vector] = { + new TransformDataStreamOperation[MomentsEstimator, T, Moments]{ + override def transformDataStream( + instance: MomentsEstimator, + transformParameters: ParameterMap, + input: DataStream[T]) + : DataStream[Moments] = { + val resultingParameters = instance.parameters ++ transformParameters + val statefulStream = FlinkSolmaUtils.ensureKeyedStream[T](input) + statefulStream.mapWithState((in, state: Option[Moments]) => { + val (element, pid) = in + val x = element.asBreeze + val metrics = state match { + case Some(curr) => { + curr.process(x) + } + case None => { + new Moments(x) + } + } + ((pid, metrics), Some(metrics)) + }).fold(new mutable.HashMap[Int, Moments]())((acc: mutable.HashMap[Int, Moments], in) => { + val (pid, moments) = in + acc(pid) = moments + acc.remove(-1) + val it = acc.values.iterator + val ret = it.next.clone() + while (it.hasNext) { + ret.merge(it.next) + } + acc(-1) = ret + acc + } + ).map(data => data(-1)) + } + } + } + + +} diff --git a/src/main/scala/eu/proteus/solma/package.scala b/src/main/scala/eu/proteus/solma/package.scala new file mode 100644 index 0000000..805a5f1 --- /dev/null +++ b/src/main/scala/eu/proteus/solma/package.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus + +import org.apache.flink.api.common.functions.FoldFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.streaming.api.scala._ + +package object solma { + + implicit class RichDataStream[T](dataStream: DataStream[T]) { + + def fold[R: TypeInformation](zeroValue: R)(fun: (R, T) => R): DataStream[R] = { + implicit val typeInfo = TypeInformation.of(classOf[(Int, T)]) + + val folder = new FoldFunction[(Int, T), R] { + def fold(acc: R, v: (Int, T)) = { + fun(acc, v._2) + } + } + dataStream.map(x => (0, x)) + .keyBy(x => x._1) + .fold(zeroValue, folder) + } + + } +} diff --git a/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamPredictor.scala b/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamPredictor.scala new file mode 100644 index 0000000..630009e --- /dev/null +++ b/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamPredictor.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.pipeline + +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.streaming.api.scala._ + +case class ChainedStreamPredictor[ + T <: StreamTransformer[T], + P <: StreamPredictor[P]] + (transformer: T, predictor: P) + extends StreamPredictor[ChainedStreamPredictor[T, P]]{} + +object ChainedStreamPredictor { + implicit def chainedPredictOperation[ + T <: StreamTransformer[T], + P <: StreamPredictor[P], + Testing, + Intermediate, + Prediction]( + implicit transformOperation: TransformDataStreamOperation[T, Testing, Intermediate], + predictOperation: PredictDataStreamOperation[P, Intermediate, Prediction]) + : PredictDataStreamOperation[ChainedStreamPredictor[T, P], Testing, Prediction] = { + + new PredictDataStreamOperation[ChainedStreamPredictor[T, P], Testing, Prediction] { + override def predictDataStream( + instance: ChainedStreamPredictor[T, P], + predictParameters: ParameterMap, + input: DataStream[Testing]) + : DataStream[Prediction] = { + val testing = instance.transformer.transform(input, predictParameters) + instance.predictor.predict(testing, predictParameters) + } + } + } + + implicit def chainedFitOperation[L <: StreamTransformer[L], R <: StreamPredictor[R], I, T] + (implicit fitOperation: StreamFitOperation[L, I], + transformOperation: TransformDataStreamOperation[L, I, T], + predictorFitOperation: StreamFitOperation[R, T]) + : StreamFitOperation[ChainedStreamPredictor[L, R], I] = { + new StreamFitOperation[ChainedStreamPredictor[L, R], I] { + override def fit( + instance: ChainedStreamPredictor[L, R], + fitParameters: ParameterMap, + input: DataStream[I]) + : Unit = { + instance.transformer.fit(input, fitParameters) + val intermediateResult = instance.transformer.transform(input, fitParameters) + instance.predictor.fit(intermediateResult, fitParameters) + } + } + } +} diff --git a/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamTransformer.scala b/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamTransformer.scala new file mode 100644 index 0000000..fdf13ce --- /dev/null +++ b/src/main/scala/eu/proteus/solma/pipeline/ChainedStreamTransformer.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.pipeline + +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.streaming.api.scala._ + +case class ChainedStreamTransformer[ + L <: StreamTransformer[L], + R <: StreamTransformer[R] + ](left: L, right: R) + extends StreamTransformer[ChainedStreamTransformer[L, R]] { +} + +object ChainedStreamTransformer { + implicit def chainedStreamTransformOperation[ + L <: StreamTransformer[L], + R <: StreamTransformer[R], + I, + T, + O](implicit + transformOpLeft: TransformDataStreamOperation[L, I, T], + transformOpRight: TransformDataStreamOperation[R, T, O]) + : TransformDataStreamOperation[ChainedStreamTransformer[L,R], I, O] = { + new TransformDataStreamOperation[ChainedStreamTransformer[L, R], I, O] { + override def transformDataStream( + instance: ChainedStreamTransformer[L, R], + transformParameters: ParameterMap, + input: DataStream[I]): DataStream[O] = { + val intermediateResult = transformOpLeft.transformDataStream( + instance.left, + transformParameters, + input) + transformOpRight.transformDataStream( + instance.right, + transformParameters, + intermediateResult + ) + } + } + } + + implicit def chainedStreamFitOperation[ + L <: StreamTransformer[L], R <: StreamTransformer[R], I, T]( + implicit leftFitOperation: StreamFitOperation[L, I], + leftTransformOperation: TransformDataStreamOperation[L, I, T], + rightFitOperation: StreamFitOperation[R, T]) + : StreamFitOperation[ChainedStreamTransformer[L, R], I] = { + new StreamFitOperation[ChainedStreamTransformer[L, R], I] { + override def fit( + instance: ChainedStreamTransformer[L, R], + fitParameters: ParameterMap, + input: DataStream[I]): Unit = { + val intermediateResult = instance.left.transform(input, fitParameters) + instance.right.fit(intermediateResult, fitParameters) + } + } + } +} diff --git a/src/main/scala/eu/proteus/solma/pipeline/StreamEstimator.scala b/src/main/scala/eu/proteus/solma/pipeline/StreamEstimator.scala new file mode 100644 index 0000000..23a5f82 --- /dev/null +++ b/src/main/scala/eu/proteus/solma/pipeline/StreamEstimator.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.pipeline + +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.ml.common.{ParameterMap, WithParameters} +import org.apache.flink.streaming.api.scala._ + +import scala.reflect.runtime.universe._ + +trait StreamEstimator[Self] extends WithParameters with Serializable { + that: Self => + + /** Fits the estimator to the given input data. The fitting logic is contained in the + * [[StreamFitOperation]]. The computed state will be stored in the implementing class. + * + * @param training Training data stream + * @param fitParameters Additional parameters for the [[StreamFitOperation]] + * @param fitOperation [[StreamFitOperation]] which encapsulates the algorithm logic + * @tparam Training Type of the training data + * @return + */ + def fit[Training]( + training: DataStream[Training], + fitParameters: ParameterMap = ParameterMap.Empty)(implicit + fitOperation: StreamFitOperation[Self, Training]): Unit = { + FlinkSolmaUtils.registerFlinkMLTypes(training.executionEnvironment) + fitOperation.fit(this, fitParameters, training) + } +} + +object StreamEstimator { + + implicit def fallbackFitOperation[ + Self: TypeTag, + Training: TypeTag] + : StreamFitOperation[Self, Training] = { + new StreamFitOperation[Self, Training]{ + override def fit( + instance: Self, + fitParameters: ParameterMap, + input: DataStream[Training]) + : Unit = { + val self = typeOf[Self] + val training = typeOf[Training] + + throw new RuntimeException("There is no StreamFitOperation defined for " + self + + " which trains on a DataStream[" + training + "]") + } + } + } + + implicit def fallbackTransformOperation[ + Self: TypeTag, + IN: TypeTag] + : TransformDataStreamOperation[Self, IN, Any] = { + new TransformDataStreamOperation[Self, IN, Any] { + override def transformDataStream( + instance: Self, + transformParameters: ParameterMap, + input: DataStream[IN]) + : DataStream[Any] = { + val self = typeOf[Self] + val in = typeOf[IN] + + throw new RuntimeException("There is no StreamTransformOperation defined for " + + self + " which takes a DataStream[" + in + + "] as input.") + } + } + } + + implicit def fallbackPredictOperation[ + Self: TypeTag, + Testing: TypeTag] + : PredictDataStreamOperation[Self, Testing, Any] = { + new PredictDataStreamOperation[Self, Testing, Any] { + override def predictDataStream( + instance: Self, + predictParameters: ParameterMap, + input: DataStream[Testing]) + : DataStream[Any] = { + val self = typeOf[Self] + val testing = typeOf[Testing] + + throw new RuntimeException("There is no StreamPredictOperation defined for " + self + + " which takes a DataStream[" + testing + "] as input.") + } + } + } +} + +trait StreamFitOperation[Self, Training]{ + def fit(instance: Self, fitParameters: ParameterMap, input: DataStream[Training]): Unit +} diff --git a/src/main/scala/eu/proteus/solma/pipeline/StreamPredictor.scala b/src/main/scala/eu/proteus/solma/pipeline/StreamPredictor.scala new file mode 100644 index 0000000..efc1cb0 --- /dev/null +++ b/src/main/scala/eu/proteus/solma/pipeline/StreamPredictor.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.pipeline + +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.streaming.api.scala._ + +trait StreamPredictor[Self] extends StreamEstimator[Self] { + that: Self => + + def predict[Testing, Prediction]( + testing: DataStream[Testing], + predictParameters: ParameterMap = ParameterMap.Empty)(implicit + predictor: PredictDataStreamOperation[Self, Testing, Prediction]) + : DataStream[Prediction] = { + FlinkSolmaUtils.registerFlinkMLTypes(testing.getExecutionEnvironment) + predictor.predictDataStream(this, predictParameters, testing) + } + +} + +object StreamPredictor { + implicit def defaultPredictDataStreamOperation[ + Instance <: StreamEstimator[Instance], + Model, + Testing, + PredictionValue]( + implicit predictOperation: StreamPredictOperation[Instance, Model, Testing, PredictionValue], + testingTypeInformation: TypeInformation[Testing], + predictionValueTypeInformation: TypeInformation[PredictionValue]) + : PredictDataStreamOperation[Instance, Testing, (Testing, PredictionValue)] = { + new PredictDataStreamOperation[Instance, Testing, (Testing, PredictionValue)] { + override def predictDataStream( + instance: Instance, + predictParameters: ParameterMap, + input: DataStream[Testing]) + : DataStream[(Testing, PredictionValue)] = { + val resultingParameters = instance.parameters ++ predictParameters + val model = predictOperation.getModel(instance, resultingParameters) + implicit val resultTypeInformation = createTypeInformation[(Testing, PredictionValue)] + input.map(element => { + (element, predictOperation.predict(element, model)) + }) + } + } + } +} + +trait PredictDataStreamOperation[Self, Testing, Prediction] extends Serializable{ + + /** Calculates the predictions for all elements in the [[DataStream]] input + * + * @param instance The Predictor instance that we will use to make the predictions + * @param predictParameters The parameters for the prediction + * @param input The DataSet containing the unlabeled examples + * @return + */ + def predictDataStream( + instance: Self, + predictParameters: ParameterMap, + input: DataStream[Testing]) + : DataStream[Prediction] +} + +trait StreamPredictOperation[Instance, Model, Testing, Prediction] extends Serializable{ + + /** Defines how to retrieve the model of the type for which this operation was defined + * + * @param instance The Predictor instance that we will use to make the predictions + * @param predictParameters The parameters for the prediction + * @return A DataSet with the model representation as its only element + */ + def getModel(instance: Instance, predictParameters: ParameterMap): Model + + /** Calculates the prediction for a single element given the model of the [[StreamPredictor]]. + * + * @param value The unlabeled example on which we make the prediction + * @param model The model representation of the prediciton algorithm + * @return A label for the provided example of type [[Prediction]] + */ + def predict(value: Testing, model: Model): + Prediction +} diff --git a/src/main/scala/eu/proteus/solma/pipeline/StreamTransformer.scala b/src/main/scala/eu/proteus/solma/pipeline/StreamTransformer.scala new file mode 100644 index 0000000..cf97f4d --- /dev/null +++ b/src/main/scala/eu/proteus/solma/pipeline/StreamTransformer.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.pipeline + +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.ml.common.ParameterMap +import org.apache.flink.streaming.api.scala._ + +import scala.reflect.ClassTag + +trait StreamTransformer[Self] extends StreamEstimator[Self] { + that: Self => + + def transform[Input, Output]( + input: DataStream[Input], + transformParameters: ParameterMap = ParameterMap.Empty) + (implicit transformOperation: TransformDataStreamOperation[Self, Input, Output]) + : DataStream[Output] = { + FlinkSolmaUtils.registerFlinkMLTypes(input.executionEnvironment) + transformOperation.transformDataStream(that, transformParameters, input) + } + +} + +object StreamTransformer { + implicit def defaultTransformDataSetOperation[ + Instance <: StreamEstimator[Instance], + Model, + Input, + Output]( + implicit transformOperation: StreamTransformOperation[Instance, Model, Input, Output], + outputTypeInformation: TypeInformation[Output], + outputClassTag: ClassTag[Output]) + : TransformDataStreamOperation[Instance, Input, Output] = { + new TransformDataStreamOperation[Instance, Input, Output] { + override def transformDataStream( + instance: Instance, + transformParameters: ParameterMap, + input: DataStream[Input]) + : DataStream[Output] = { + val resultingParameters = instance.parameters ++ transformParameters + val model = transformOperation.getModel(instance, resultingParameters) + + input.map(element => { + transformOperation.transform(element, model) + }) + } + } + } +} + +trait TransformDataStreamOperation[Instance, Input, Output] extends Serializable { + def transformDataStream( + instance: Instance, + transformParameters: ParameterMap, + input: DataStream[Input]) + : DataStream[Output] +} + +trait StreamTransformOperation[Instance, Model, Input, Output] extends Serializable { + + /** Retrieves the model of the [[StreamTransformer]] for which this operation has been defined. + * + * @param instance + * @param transformParemters + * @return + */ + def getModel(instance: Instance, transformParemters: ParameterMap): Model + + /** Transforms a single element with respect to the model associated with the respective + * [[StreamTransformer]] + * + * @param element + * @param model + * @return + */ + def transform(element: Input, model: Model): Output +} diff --git a/src/main/scala/eu/proteus/solma/sampling/SimpleReservoirSampling.scala b/src/main/scala/eu/proteus/solma/sampling/SimpleReservoirSampling.scala new file mode 100644 index 0000000..42f446d --- /dev/null +++ b/src/main/scala/eu/proteus/solma/sampling/SimpleReservoirSampling.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.sampling + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.ml.common.{Parameter, ParameterMap} +import org.apache.flink.ml.math.Vector +import SimpleReservoirSampling.ReservoirSize +import eu.proteus.annotations.Proteus +import eu.proteus.solma.pipeline.{StreamFitOperation, StreamTransformer, TransformDataStreamOperation} +import eu.proteus.solma.utils.FlinkSolmaUtils +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.util.XORShiftRandom + +import scala.collection.mutable +import scala.reflect.ClassTag + +@Proteus +class SimpleReservoirSampling extends StreamTransformer[SimpleReservoirSampling] { + + def setReservoirSize(size: Int): SimpleReservoirSampling = { + parameters.add(ReservoirSize, size) + this + } + +} + +object SimpleReservoirSampling { + + + // ====================================== Parameters ============================================= + + case object ReservoirSize extends Parameter[Int] { + override val defaultValue: Option[Int] = Some(10) + } + + + // ==================================== Factory methods ========================================== + + def apply(): SimpleReservoirSampling = { + new SimpleReservoirSampling() + } + + // ==================================== Operations ========================================== + + + implicit def fitNoOp[T] = { + new StreamFitOperation[SimpleReservoirSampling, T]{ + override def fit( + instance: SimpleReservoirSampling, + fitParameters: ParameterMap, + input: DataStream[T]) + : Unit = {} + } + } + + implicit def treansformSimpleReservoirSampling[T <: Vector : TypeInformation : ClassTag] = { + new TransformDataStreamOperation[SimpleReservoirSampling, T, T]{ + override def transformDataStream( + instance: SimpleReservoirSampling, + transformParameters: ParameterMap, + input: DataStream[T]) + : DataStream[T] = { + val resultingParameters = instance.parameters ++ transformParameters + val statefulStream = FlinkSolmaUtils.ensureKeyedStream[T](input) + val k = resultingParameters(ReservoirSize) + val gen = new XORShiftRandom() + implicit val typeInfo = TypeInformation.of(classOf[(Long, Int, Array[T])]) + statefulStream.flatMapWithState((in, state: Option[(Long, Int, Array[T])]) => { + val (element, _) = in + state match { + case Some(curr) => { + var (streamCounter, reservoirCounter, reservoir) = curr + val data = new mutable.ListBuffer[T]() + if (reservoirCounter <= k) { + reservoir(reservoirCounter - 1) = element + reservoirCounter += 1 + data += element + } else { + val j = gen.nextInt(streamCounter.toInt + 1) + if (j < k) { + reservoir(j) = element + data += element + } + } + (data, Some((streamCounter + 1, reservoirCounter, reservoir))) + } + case None => { + val reservoir = Array.ofDim[T](k) + reservoir(0) = element + (Seq(element), Some((1L, 1, reservoir))) + } + } + }) + + } + } + } + +} diff --git a/src/main/scala/eu/proteus/solma/utils/FlinkSolmaUtils.scala b/src/main/scala/eu/proteus/solma/utils/FlinkSolmaUtils.scala new file mode 100644 index 0000000..8544eea --- /dev/null +++ b/src/main/scala/eu/proteus/solma/utils/FlinkSolmaUtils.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.utils + +import eu.proteus.annotations.Proteus +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.util.XORShiftRandom + +@Proteus +object FlinkSolmaUtils { + def registerFlinkMLTypes(env: StreamExecutionEnvironment): Unit = { + + // Vector types + env.registerType(classOf[org.apache.flink.ml.math.DenseVector]) + env.registerType(classOf[org.apache.flink.ml.math.SparseVector]) + + // Matrix types + env.registerType(classOf[org.apache.flink.ml.math.DenseMatrix]) + env.registerType(classOf[org.apache.flink.ml.math.SparseMatrix]) + + // Breeze Vector types + env.registerType(classOf[breeze.linalg.DenseVector[_]]) + env.registerType(classOf[breeze.linalg.SparseVector[_]]) + + // Breeze specialized types + env.registerType(breeze.linalg.DenseVector.zeros[Double](0).getClass) + env.registerType(breeze.linalg.SparseVector.zeros[Double](0).getClass) + + // Breeze Matrix types + env.registerType(classOf[breeze.linalg.DenseMatrix[Double]]) + env.registerType(classOf[breeze.linalg.CSCMatrix[Double]]) + + // Breeze specialized types + env.registerType(breeze.linalg.DenseMatrix.zeros[Double](0, 0).getClass) + env.registerType(breeze.linalg.CSCMatrix.zeros[Double](0, 0).getClass) + } + + def ensureKeyedStream[T](input: DataStream[T]): KeyedStream[(T, Int), Int] = { + input match { + case keyed : KeyedStream[(T, Int), Int] => keyed + case _ => { + val gen = new XORShiftRandom() + val max = input.executionEnvironment.getParallelism + implicit val typeInfo = TypeInformation.of(classOf[(T, Int)]) + input + .map(x => (x, gen.nextInt(max))) + .keyBy(x => x._2) + } + } + } +} diff --git a/src/test/scala/eu/proteus/solma/fd/FrequentDirectionsITSuite.scala b/src/test/scala/eu/proteus/solma/fd/FrequentDirectionsITSuite.scala new file mode 100644 index 0000000..d1a8852 --- /dev/null +++ b/src/test/scala/eu/proteus/solma/fd/FrequentDirectionsITSuite.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.fd + +import eu.proteus.annotations.Proteus +import eu.proteus.solma.utils.FlinkTestBase +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.streaming.api.scala._ +import org.scalatest.{FlatSpec, Matchers} +import org.apache.flink.contrib.streaming.scala.utils._ +import breeze.linalg + +import scala.collection.mutable + +@Proteus +class FrequentDirectionsITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + import FrequentDirections._ + import FrequentDirectionsITSuite._ + + behavior of "Flink's Frequent Directions" + + it should "perform frequent directions without final merge" in { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.setMaxParallelism(1) + + val ell = 5 + + val stream = env.fromCollection(FrequentDirectionsITSuite.data) + + val transformer = FrequentDirections() + .setSketchSize(ell) + .setFeaturesNumber(3) + + val it = transformer.transform(stream).collect() + + val values = new mutable.ArrayBuffer[Vector]() + + it foreach values.+= + + val A = linalg.DenseMatrix.zeros[Double](data.length, data.head.size) + val B = linalg.DenseMatrix.zeros[Double](ell, data.head.size) + + for ((x, i) <- data.view.zipWithIndex) { + val xb = x.asBreeze + A(i, ::) := xb.t + } + + for ((x, i) <- values.drop(values.size - ell).view.zipWithIndex) { + val xb = x.asBreeze + B(i, ::) := xb.t + } + + val AAT = A.t * A + val BBT = B.t * B + + val left = (AAT - BBT).data.foldLeft(0.0)((acc: Double, x) => acc + x) + val right = AAT.data.foldLeft(0.0)((acc: Double, x) => acc + math.pow(x, 2.0)) * ell + + left should be <= right + } + + it should "perform frequent directions with final merge" in { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env.setMaxParallelism(4) + + val ell = 5 + + val stream = env.fromCollection(FrequentDirectionsITSuite.data) + + val transformer = FrequentDirections() + .setSketchSize(ell) + .setFeaturesNumber(3) + .enableAggregation(true) + + val it = transformer.transform(stream).collect() + + val values = new mutable.ArrayBuffer[Vector]() + + it foreach values.+= + + val A = linalg.DenseMatrix.zeros[Double](data.length, data.head.size) + val B = linalg.DenseMatrix.zeros[Double](ell, data.head.size) + + for ((x, i) <- data.view.zipWithIndex) { + val xb = x.asBreeze + A(i, ::) := xb.t + } + + for ((x, i) <- values.drop(values.size - ell).view.zipWithIndex) { + val xb = x.asBreeze + B(i, ::) := xb.t + } + + val AAT = A.t * A + val BBT = B.t * B + + val left = (AAT - BBT).data.foldLeft(0.0)((acc: Double, x) => acc + x) + val right = AAT.data.foldLeft(0.0)((acc: Double, x) => acc + math.pow(x, 2.0)) * ell + + left should be <= right + } + +} + +object FrequentDirectionsITSuite { + val data: Seq[Vector] = List( + DenseVector(Array(2104.00, 3.00, 0.0)), + DenseVector(Array(1600.00, 3.00, 0.0)), + DenseVector(Array(2400.00, 3.00, 0.0)), + DenseVector(Array(1416.00, 2.00, 0.0)), + DenseVector(Array(3000.00, 4.00, 0.0)), + DenseVector(Array(1985.00, 4.00, 0.0)), + DenseVector(Array(1534.00, 3.00, 0.0)), + DenseVector(Array(1427.00, 3.00, 0.0)), + DenseVector(Array(1380.00, 3.00, 0.0)), + DenseVector(Array(1494.00, 3.00, 0.0)), + DenseVector(Array(1940.00, 4.00, 0.0)), + DenseVector(Array(2000.00, 3.00, 0.0)), + DenseVector(Array(1890.00, 3.00, 0.0)), + DenseVector(Array(4478.00, 5.00, 0.0)), + DenseVector(Array(1268.00, 3.00, 0.0)), + DenseVector(Array(2300.00, 4.00, 0.0)), + DenseVector(Array(1320.00, 2.00, 0.0)), + DenseVector(Array(1236.00, 3.00, 0.0)), + DenseVector(Array(2609.00, 4.00, 0.0)), + DenseVector(Array(3031.00, 4.00, 0.0)), + DenseVector(Array(1767.00, 3.00, 0.0)), + DenseVector(Array(1888.00, 2.00, 0.0)), + DenseVector(Array(1604.00, 3.00, 0.0)), + DenseVector(Array(1962.00, 4.00, 0.0)), + DenseVector(Array(3890.00, 3.00, 0.0)), + DenseVector(Array(1100.00, 3.00, 0.0)), + DenseVector(Array(1458.00, 3.00, 0.0)), + DenseVector(Array(2526.00, 3.00, 0.0)), + DenseVector(Array(2200.00, 3.00, 0.0)), + DenseVector(Array(2637.00, 3.00, 0.0)), + DenseVector(Array(1839.00, 2.00, 0.0)), + DenseVector(Array(1000.00, 1.00, 0.0)), + DenseVector(Array(2040.00, 4.00, 0.0)), + DenseVector(Array(3137.00, 3.00, 0.0)), + DenseVector(Array(1811.00, 4.00, 0.0)), + DenseVector(Array(1437.00, 3.00, 0.0)), + DenseVector(Array(1239.00, 3.00, 0.0)), + DenseVector(Array(2132.00, 4.00, 0.0)), + DenseVector(Array(4215.00, 4.00, 0.0)), + DenseVector(Array(2162.00, 4.00, 0.0)), + DenseVector(Array(1664.00, 2.00, 0.0)), + DenseVector(Array(2238.00, 3.00, 0.0)), + DenseVector(Array(2567.00, 4.00, 0.0)), + DenseVector(Array(1200.00, 3.00, 0.0)), + DenseVector(Array(852.00, 2.00, 0.0)), + DenseVector(Array(1852.00, 4.00, 0.0)), + DenseVector(Array(1203.00, 3.00, 0.0)) + ) +} diff --git a/src/test/scala/eu/proteus/solma/moments/MomentsEstimatorITSuite.scala b/src/test/scala/eu/proteus/solma/moments/MomentsEstimatorITSuite.scala new file mode 100644 index 0000000..eb27619 --- /dev/null +++ b/src/test/scala/eu/proteus/solma/moments/MomentsEstimatorITSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.moments + +import breeze.linalg +import breeze.linalg.* +import eu.proteus.annotations.Proteus +import eu.proteus.solma.utils.FlinkTestBase +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.contrib.streaming.scala.utils._ +import org.scalatest.{FlatSpec, Matchers} +import breeze.stats.meanAndVariance + +@Proteus +class MomentsEstimatorITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's Moments Estimator" + + import MomentsEstimator._ + import MomentsEstimatorITSuite._ + + it should "estimate the mean and var of a stream" in { + + val m = linalg.DenseMatrix.zeros[Double](data.length, data.head.size) + for (i <- data.indices) { + m(i, ::) := data(i).asBreeze.t + } + val result = meanAndVariance(m(::, *)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env.setMaxParallelism(4) + + val stream = env.fromCollection(data) + + val estimator = MomentsEstimator() + + val it = estimator.transform(stream).collect() + + val eps = 1e06 + while (it.hasNext) { + val elem = it.next + if (elem.counter == 47) { + elem.mean(0) should be (result(0).mean +- eps) + elem.mean(1) should be (result(1).mean +- eps) + elem.mean(2) should be (result(2).mean +- eps) + elem.variance(0) should be (result(0).variance +- eps) + elem.variance(1) should be (result(1).variance +- eps) + elem.variance(2) should be (result(2).variance +- eps) + } + } + } + +} + +object MomentsEstimatorITSuite { + val data: Seq[Vector] = List( + DenseVector(Array(2104.00, 3.00, 0.0)), + DenseVector(Array(1600.00, 3.00, 0.0)), + DenseVector(Array(2400.00, 3.00, 0.0)), + DenseVector(Array(1416.00, 2.00, 0.0)), + DenseVector(Array(3000.00, 4.00, 0.0)), + DenseVector(Array(1985.00, 4.00, 0.0)), + DenseVector(Array(1534.00, 3.00, 0.0)), + DenseVector(Array(1427.00, 3.00, 0.0)), + DenseVector(Array(1380.00, 3.00, 0.0)), + DenseVector(Array(1494.00, 3.00, 0.0)), + DenseVector(Array(1940.00, 4.00, 0.0)), + DenseVector(Array(2000.00, 3.00, 0.0)), + DenseVector(Array(1890.00, 3.00, 0.0)), + DenseVector(Array(4478.00, 5.00, 0.0)), + DenseVector(Array(1268.00, 3.00, 0.0)), + DenseVector(Array(2300.00, 4.00, 0.0)), + DenseVector(Array(1320.00, 2.00, 0.0)), + DenseVector(Array(1236.00, 3.00, 0.0)), + DenseVector(Array(2609.00, 4.00, 0.0)), + DenseVector(Array(3031.00, 4.00, 0.0)), + DenseVector(Array(1767.00, 3.00, 0.0)), + DenseVector(Array(1888.00, 2.00, 0.0)), + DenseVector(Array(1604.00, 3.00, 0.0)), + DenseVector(Array(1962.00, 4.00, 0.0)), + DenseVector(Array(3890.00, 3.00, 0.0)), + DenseVector(Array(1100.00, 3.00, 0.0)), + DenseVector(Array(1458.00, 3.00, 0.0)), + DenseVector(Array(2526.00, 3.00, 0.0)), + DenseVector(Array(2200.00, 3.00, 0.0)), + DenseVector(Array(2637.00, 3.00, 0.0)), + DenseVector(Array(1839.00, 2.00, 0.0)), + DenseVector(Array(1000.00, 1.00, 0.0)), + DenseVector(Array(2040.00, 4.00, 0.0)), + DenseVector(Array(3137.00, 3.00, 0.0)), + DenseVector(Array(1811.00, 4.00, 0.0)), + DenseVector(Array(1437.00, 3.00, 0.0)), + DenseVector(Array(1239.00, 3.00, 0.0)), + DenseVector(Array(2132.00, 4.00, 0.0)), + DenseVector(Array(4215.00, 4.00, 0.0)), + DenseVector(Array(2162.00, 4.00, 0.0)), + DenseVector(Array(1664.00, 2.00, 0.0)), + DenseVector(Array(2238.00, 3.00, 0.0)), + DenseVector(Array(2567.00, 4.00, 0.0)), + DenseVector(Array(1200.00, 3.00, 0.0)), + DenseVector(Array(852.00, 2.00, 0.0)), + DenseVector(Array(1852.00, 4.00, 0.0)), + DenseVector(Array(1203.00, 3.00, 0.0)) + ) +} diff --git a/src/test/scala/eu/proteus/solma/sampling/ReservoirSamplingITSuite.scala b/src/test/scala/eu/proteus/solma/sampling/ReservoirSamplingITSuite.scala new file mode 100644 index 0000000..df3bf34 --- /dev/null +++ b/src/test/scala/eu/proteus/solma/sampling/ReservoirSamplingITSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.sampling + +import eu.proteus.annotations.Proteus +import eu.proteus.solma.utils.FlinkTestBase +import org.apache.flink.ml.math.{DenseVector, Vector} +import org.apache.flink.streaming.api.scala._ +import org.scalatest.{FlatSpec, Matchers} + +@Proteus +class ReservoirSamplingITSuite + extends FlatSpec + with Matchers + with FlinkTestBase { + + behavior of "Flink's Reservoir Sampling" + + + it should "perform reservoir sampling" in { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(4) + env.setMaxParallelism(4) + + val stream = env.fromCollection(ReservoirSamplingITSuite.data) + + val transformer = SimpleReservoirSampling() + .setReservoirSize(5) + + transformer.transform(stream).print() + + env.execute("reservoir sampling") + } + +} + +object ReservoirSamplingITSuite { + val data: Seq[Vector] = List( + DenseVector(Array(2104.00, 3.00, 0.0)), + DenseVector(Array(1600.00, 3.00, 0.0)), + DenseVector(Array(2400.00, 3.00, 0.0)), + DenseVector(Array(1416.00, 2.00, 0.0)), + DenseVector(Array(3000.00, 4.00, 0.0)), + DenseVector(Array(1985.00, 4.00, 0.0)), + DenseVector(Array(1534.00, 3.00, 0.0)), + DenseVector(Array(1427.00, 3.00, 0.0)), + DenseVector(Array(1380.00, 3.00, 0.0)), + DenseVector(Array(1494.00, 3.00, 0.0)), + DenseVector(Array(1940.00, 4.00, 0.0)), + DenseVector(Array(2000.00, 3.00, 0.0)), + DenseVector(Array(1890.00, 3.00, 0.0)), + DenseVector(Array(4478.00, 5.00, 0.0)), + DenseVector(Array(1268.00, 3.00, 0.0)), + DenseVector(Array(2300.00, 4.00, 0.0)), + DenseVector(Array(1320.00, 2.00, 0.0)), + DenseVector(Array(1236.00, 3.00, 0.0)), + DenseVector(Array(2609.00, 4.00, 0.0)), + DenseVector(Array(3031.00, 4.00, 0.0)), + DenseVector(Array(1767.00, 3.00, 0.0)), + DenseVector(Array(1888.00, 2.00, 0.0)), + DenseVector(Array(1604.00, 3.00, 0.0)), + DenseVector(Array(1962.00, 4.00, 0.0)), + DenseVector(Array(3890.00, 3.00, 0.0)), + DenseVector(Array(1100.00, 3.00, 0.0)), + DenseVector(Array(1458.00, 3.00, 0.0)), + DenseVector(Array(2526.00, 3.00, 0.0)), + DenseVector(Array(2200.00, 3.00, 0.0)), + DenseVector(Array(2637.00, 3.00, 0.0)), + DenseVector(Array(1839.00, 2.00, 0.0)), + DenseVector(Array(1000.00, 1.00, 0.0)), + DenseVector(Array(2040.00, 4.00, 0.0)), + DenseVector(Array(3137.00, 3.00, 0.0)), + DenseVector(Array(1811.00, 4.00, 0.0)), + DenseVector(Array(1437.00, 3.00, 0.0)), + DenseVector(Array(1239.00, 3.00, 0.0)), + DenseVector(Array(2132.00, 4.00, 0.0)), + DenseVector(Array(4215.00, 4.00, 0.0)), + DenseVector(Array(2162.00, 4.00, 0.0)), + DenseVector(Array(1664.00, 2.00, 0.0)), + DenseVector(Array(2238.00, 3.00, 0.0)), + DenseVector(Array(2567.00, 4.00, 0.0)), + DenseVector(Array(1200.00, 3.00, 0.0)), + DenseVector(Array(852.00, 2.00, 0.0)), + DenseVector(Array(1852.00, 4.00, 0.0)), + DenseVector(Array(1203.00, 3.00, 0.0)) + ) +} diff --git a/src/test/scala/eu/proteus/solma/utils/FlinkTestBase.scala b/src/test/scala/eu/proteus/solma/utils/FlinkTestBase.scala new file mode 100644 index 0000000..5b13a6a --- /dev/null +++ b/src/test/scala/eu/proteus/solma/utils/FlinkTestBase.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eu.proteus.solma.utils + +import eu.proteus.annotations.Proteus +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import org.apache.flink.streaming.util.TestStreamEnvironment +import org.apache.flink.test.util.TestBaseUtils +import org.scalatest.{BeforeAndAfter, Suite} + +@Proteus +trait FlinkTestBase extends BeforeAndAfter { + that: Suite => + + var cluster: Option[LocalFlinkMiniCluster] = None + val parallelism = 4 + + before { + val config = new Configuration + val cl = TestBaseUtils.startCluster(1, parallelism, false, false, true) + TestStreamEnvironment.setAsContext(cl, parallelism) + cluster = Some(cl) + } + + after { + cluster.foreach(c => { + TestStreamEnvironment.unsetAsContext() + TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT) + }) + } + +} diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml new file mode 100644 index 0000000..b8f1b1a --- /dev/null +++ b/tools/maven/checkstyle.xml @@ -0,0 +1,159 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tools/maven/scalastyle-config.xml b/tools/maven/scalastyle-config.xml new file mode 100644 index 0000000..0f7f6bb --- /dev/null +++ b/tools/maven/scalastyle-config.xml @@ -0,0 +1,146 @@ + + + + + + + + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml new file mode 100644 index 0000000..2c29054 --- /dev/null +++ b/tools/maven/suppressions.xml @@ -0,0 +1,28 @@ + + + + + + + + +