# SGD Opportunistic Example


---

## Preparing dependancies

The coursierapi is necessary at import time because it is mandatory to include the maven repository from the local folder, this step will not be required after the release, but it will be needed if you want to modify the code and use the modified version.

The piece of code below, add the location of the new MavenRepository that is located in the path **/maven/local/repository**; this is the same path use in the container stating.

This step imports the required modules to execute the code. All these packages come from the previous Maven Instalation

The imported libraries are:

Module | Java's | Scala's | Description
:----- | -------------: | --------------: | :----------
wayang-core | 8, 11 | 2.11, 2.12 | provides core data structures and the optimizer (required)
wayang-basic | 8, 11 | 2.11, 2.12 | provides common operators and data types for your apps (recommended)
wayang-api-scala-java | 8, 11 | 2.11, 2.12 | provides an easy-to-use Scala and Java API to assemble wayang plans (recommended)
wayang-java | 8, 11 | 2.11, 2.12 | adapters for [Java Stream](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html) processing platforms
wayang-spark | 8, 11 | 2.11, 2.12 | adapters for [Apache Spark](https://spark.apache.org) processing platforms
wayang-flink | 8, 11 | 2.11, 2.12 | adapters for [Apache Flink](https://flink.apache.org) processing platforms
hadoop-common | 8,11 | - | Hadoop-commons is required because the lack of the Environment Variable **HADOOP_HOME**
log4j-core | 8,11 | - | Logggin library to manipulate the logs

In [1]:
import $ivy.`com.thoughtworks.paranamer:paranamer:2.8`

import $ivy.`org.apache.wayang::wayang-api-scala-java:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang:wayang-core:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang:wayang-basic:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang:wayang-java:0.6.1-SNAPSHOT`
import $ivy.`org.apache.wayang::wayang-spark:0.6.1-SNAPSHOT`
import $ivy.`org.apache.hadoop:hadoop-common:2.8.5`
import $ivy.`org.apache.logging.log4j:log4j-core:2.14.0`

import org.apache.wayang.api._
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.function.FunctionDescriptor
import org.apache.wayang.core.plugin.Plugin
import org.apache.wayang.core.util.{Tuple => WayangTuple, WayangCollections}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import java.io.File
import java.util.ArrayList
import java.util.Arrays
import java.util.{Collection => JavaCollection}
import java.util.List
import scala.math.{exp, abs, max}
import scala.collection.JavaConversions._

//Logging change the level to INFO
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator

Configurator.setRootLevel(Level.INFO);

Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0.pom
Downloaded https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j/2.14.0/log4j-2.14.0.pom
Downloaded https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j/2.14.0/log4j-2.14.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.14.0/log4j-api-2.14.0.pom
Downloaded https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.14.0/log4j-api-2.14.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0-sources.jar
Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0.jar
Downloading https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-api/2.14.0/log4j-api-2.14.0.jar
Downloading https://repo1.maven.org/maven2/org/ap

[32mimport [39m[36m$ivy.$                                         

[39m
[32mimport [39m[36m$ivy.$                                                        
[39m
[32mimport [39m[36m$ivy.$                                             
[39m
[32mimport [39m[36m$ivy.$                                              
[39m
[32mimport [39m[36m$ivy.$                                             
[39m
[32mimport [39m[36m$ivy.$                                               
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                           

[39m
[32mimport [39m[36morg.apache.wayang.api._
[39m
[32mimport [39m[36morg.apache.wayang.core.api.Configuration
[39m
[32mimport [39m[36morg.apache.wayang.core.api.WayangContext
[39m
[32mimport [39m[36morg.apache.wayang.core.function.ExecutionContext
[39m
[32mimport [39m[36morg.apache.wayang.core.function.FunctionDescriptor
[39m
[32mimport

---

Here we include all the classes that has been used in the code

In [2]:
class Transform(var features: Int) 
    extends FunctionDescriptor.SerializableFunction[String, Array[Double]] {

  override def apply(line: String): Array[Double] = {
    val pointStr: Array[String] = line.split(",")
    val point: Array[Double] = Array.ofDim[Double](features + 1)
    for (i <- 0 until pointStr.length) {
      point(i) = pointStr(i).toDouble
    }
    point
  }

}

defined [32mclass[39m [36mTransform[39m

In [3]:
class ComputeLogisticGradient
    extends FunctionDescriptor.ExtendedSerializableFunction[Array[Double], Array[Double]] {

  var weights: Array[Double] = _

  override def apply(point: Array[Double]): Array[Double] = {
    val gradient: Array[Double] = Array.ofDim[Double](point.length)
    var dot: Double = 0
    for (j <- 0 until weights.length) dot += weights(j) * point(j + 1)
    for (j <- 0 until weights.length)
      gradient(j + 1) = ((1 / (1 + exp(-1 * dot))) - point(0)) * point(j + 1)
    //counter for the step size required in the update
    gradient(0) = 1
    gradient
  }

  override def open(executionContext: ExecutionContext): Unit = {
    this.weights = executionContext
      .getBroadcast("weights")
      .iterator()
      .next()
      .asInstanceOf[Array[Double]]
  }

}

defined [32mclass[39m [36mComputeLogisticGradient[39m

In [4]:
class Sum
    extends FunctionDescriptor.SerializableBinaryOperator[Array[Double]] {

  override def apply(o: Array[Double], o2: Array[Double]): Array[Double] = {
    val g1: Array[Double] = o
    val g2: Array[Double] = o2
    if (//samples came from one partition only
        g2 == null) g1
    if (//samples came from one partition only
        g1 == null) g2
    val sum: Array[Double] = Array.ofDim[Double](g1.length)
    
    //count
    sum(0) = g1(0) + g2(0)
    for (i <- 1 until g1.length) sum(i) = g1(i) + g2(i)
    sum
  }

}

defined [32mclass[39m [36mSum[39m

In [5]:
class WeightsUpdate
    extends FunctionDescriptor.ExtendedSerializableFunction[Array[Double], Array[Double]] {

  var weights: Array[Double] = _

  var current_iteration: Int = _

  var stepSize: Double = 1

  var regulizer: Double = 0

  def this(stepSize: Double, regulizer: Double) = {
    this()
    this.stepSize = stepSize
    this.regulizer = regulizer
  }

  override def apply(input: Array[Double]): Array[Double] = {
    val count: Double = input(0)
    val alpha: Double = (stepSize / (current_iteration + 1))
    val newWeights: Array[Double] = Array.ofDim[Double](weights.length)
    for (j <- 0 until weights.length) {
      newWeights(j) = (1 - alpha * regulizer) * weights(j) - alpha * (1.0 / count) * input(
          j + 1)
    }
    newWeights
  }

  override def open(executionContext: ExecutionContext): Unit = {
    this.weights = executionContext
      .getBroadcast("weights")
      .iterator()
      .next()
      .asInstanceOf[Array[Double]]
    this.current_iteration = executionContext.getCurrentIteration
  }

}

defined [32mclass[39m [36mWeightsUpdate[39m

In [6]:
class ComputeNorm
    extends FunctionDescriptor.ExtendedSerializableFunction[Array[Double], (Double, Double)] {

  var previousWeights: Array[Double] = _

  override def apply(weights: Array[Double]): (Double, Double) = {
    var normDiff: Double = 0.0
    var normWeights: Double = 0.0
    for (j <- 0 until weights.length) {
      normDiff += abs(weights(j) - previousWeights(j))
      normWeights += abs(weights(j))
    }
      
    (normDiff, normWeights)
  }

  override def open(executionContext: ExecutionContext): Unit = {
    this.previousWeights = executionContext
      .getBroadcast("weights")
      .iterator()
      .next()
      .asInstanceOf[Array[Double]]
  }

}

defined [32mclass[39m [36mComputeNorm[39m

In [7]:
class LoopCondition(var accuracy: Double, var max_iterations: Int)
    extends FunctionDescriptor.ExtendedSerializablePredicate[JavaCollection[(Double, Double)]] {

  private var current_iteration: Int = _

  override def test(collection: JavaCollection[(Double, Double)]): Boolean = {
    val input: (Double, Double) = WayangCollections.getSingle(collection)
    println("Running iteration: " + current_iteration)
    (input._1 < accuracy * max(input._2, 1.0) || current_iteration > max_iterations)
  }

  override def open(executionContext: ExecutionContext): Unit = {
    this.current_iteration = executionContext.getCurrentIteration
  }

}

defined [32mclass[39m [36mLoopCondition[39m

In [8]:
/**
  * This class executes a stochastic gradient descent optimization on Rheem.
  */
class SGDImpl(plugins: Array[Plugin]) {

  def apply(confFile: Configuration, datasetUrl: String, datasetSize: Int, features: Int, maxIterations: Int, accuracy: Double, sampleSize: Int): Array[Double] = {
    // Initialize the builder.
    val context = new WayangContext(confFile)
    for (plugin <- this.plugins) {
      context.withPlugin(plugin)
    }
    val planBuilder = new PlanBuilder(context)
      
    // Create initial weights.
    val weights: List[Array[Double]] = Arrays.asList(Array.ofDim[Double](features))
      
    val weightsBuilder: DataQuanta[Array[Double]] =
      planBuilder.loadCollection(weights).withName("init weights")
      
    // Load and transform the data.
    val transformBuilder: DataQuanta[Array[Double]] = planBuilder
      .readTextFile(datasetUrl).withName("source")
      .mapJava(new Transform(features)).withName("transform")
      
    // Do the SGD
    val loop: DataQuanta[Array[Double]] = weightsBuilder.doWhileJava(
      new LoopCondition(accuracy, maxIterations),
      (w) => {
        var newWeightsDataset: DataQuanta[Array[Double]] =
          transformBuilder
            .sample(sampleSize, datasetSize).withBroadcast(w, "weights")
            .mapJava(new ComputeLogisticGradient()).withBroadcast(w, "weights").withName("compute")
            .reduceJava(new Sum()).withName("reduce")
            .mapJava(new WeightsUpdate()).withBroadcast(w, "weights").withName("update")
          
        var convergenceDataset: DataQuanta[(Double, Double)] = 
                 newWeightsDataset.mapJava(new ComputeNorm()).withBroadcast(w, "weights")
          
        new WayangTuple(newWeightsDataset, convergenceDataset)
      },
      maxIterations
    )
      
    WayangCollections.getSingleOrNull(loop.collect())
    
  }

}

defined [32mclass[39m [36mSGDImpl[39m

In [None]:
val inputFile = new File("files/HIGGS.csv").toURI().toString()
val confFile = new Configuration(new File("files/wayang_sgd.properties").toURI().toString())

new SGDImpl(Array(Spark.basicPlugin, Java.basicPlugin)).apply(confFile, inputFile, 11000000, 28, 1000, 1, 1)