# WordCount Example

In this WordCount Example our goal is:

&emsp; Take a first look at the Wayang API in a short program
- Wayang API is similar to Spark API
- It does not require "a steep learning curve"

&emsp; Show the provided abstraction on top of Big Data platforms
- Single data analytics task can run on any platform 
- Potentially substasks can run in different platforms if the task could take advantage of it 

---

## Preparing dependencies


<!--  Importing the needs libraries for the exections, this are comming from the maven instalation perfomed in the step 0 -->

<!-- > This step imports the necessary libraries for subsequent program executions.
All these packages come from the previous Maven Instalation -->
This step imports the required modules to execute the code. All these packages come from the previous Maven Instalation

The imported libraries are:

Module | Java's | Scala's | Description
:----- | -------------: | --------------: | :----------
wayang-core | 8, 11 | 2.11, 2.12 | provides core data structures and the optimizer (required)
wayang-basic | 8, 11 | 2.11, 2.12 | provides common operators and data types for your apps (recommended)
wayang-api-scala-java | 8, 11 | 2.11, 2.12 | provides an easy-to-use Scala and Java API to assemble wayang plans (recommended)
wayang-java | 8, 11 | 2.11, 2.12 | adapters for [Java Stream](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html) processing platforms
wayang-spark | 8, 11 | 2.11, 2.12 | adapters for [Apache Spark](https://spark.apache.org) processing platforms
wayang-flink | 8, 11 | 2.11, 2.12 | adapters for [Apache Flink](https://flink.apache.org) processing platforms
hadoop-common | 8,11 | - | Hadoop-commons is required because the lack of the Environment Variable **HADOOP_HOME**


In [1]:
/* Import Dependencies */
import $ivy.`com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.2`
import $ivy.`com.fasterxml.jackson.core:jackson-databind:2.10.2`
import $ivy.`org.apache.wayang:wayang-core:0.7.1`
import $ivy.`org.apache.wayang:wayang-basic:0.7.1`
import $ivy.`org.apache.wayang:wayang-java:0.7.1`
import $ivy.`org.apache.wayang:wayang-api-scala-java_2.12:0.7.1`
import $ivy.`org.apache.hadoop:hadoop-common:2.8.5`

[32mimport [39m[36m$ivy.$                                                              
[39m
[32mimport [39m[36m$ivy.$                                                   
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                                   
[39m
[32mimport [39m[36m$ivy.$                                      [39m

In [2]:
/* Include required classes */
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.util.fs.FileSystems
import org.apache.wayang.java.Java
import java.io.File;

[32mimport [39m[36morg.apache.wayang.api._
[39m
[32mimport [39m[36morg.apache.wayang.core.api.{Configuration, WayangContext}
[39m
[32mimport [39m[36morg.apache.wayang.core.util.fs.FileSystems
[39m
[32mimport [39m[36morg.apache.wayang.java.Java
[39m
[32mimport [39m[36mjava.io.File;[39m

## Wordcount program - Platform Agnostic

- Plan Builder works as a utility to build and execute Wayang Plans 
- A Wayang Plan consists of a set of operators with dependencies between them
- The provided logical plan is independent of any underlying platform to use
> Wayang engine receives the user-defined Wayang plan and runs an optimization process to transform this plan consisting of logical operators to an execution plan consisting of a set of physical operators executable by specific platforms

In [3]:
def wordcount(context: WayangContext) = {
    val planBuilder = new PlanBuilder(context)
    
    val inputFile = new File("book.txt").toURI().toString()
    
    planBuilder
      .withJobName(s"WordCount ($inputFile)")
      .readTextFile(inputFile)
      .flatMap(_.split("\\W+"))
      .filter(_.nonEmpty)
      .map(word => (word.toLowerCase, 1))
      .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2))
      .collect()
}

defined [32mfunction[39m [36mwordcount[39m

## Executing WordCount in Java via Wayang Context 

Wayang Context is the entry point for users to work with Wayang. It allows to declare explicitly an execution Platform on which the code will run.

- The object Java.basicPlugin enables to Wayang to use operators of the Java Platform

> WayangContext also manages the Job creation and its execution as Wayang Plans

In [4]:
val context = new WayangContext().withPlugin(Java.basicPlugin)

[36mcontext[39m: [32mWayangContext[39m = org.apache.wayang.core.api.WayangContext@1ce20511

In [5]:
var result = wordcount(context)

## Executing WordCount in Spark

Now, Wayang context we can reassign the target platform

- First, add Wayang Spark Module dependencies

In [6]:
import $ivy.`org.apache.wayang:wayang-spark_2.12:0.7.1`

[32mimport [39m[36m$ivy.$                                          [39m

- Then, import the Spark class

In [7]:
import org.apache.wayang.spark.Spark

[32mimport [39m[36morg.apache.wayang.spark.Spark[39m

- Finally, set Wayang context is to work with Spark Platform
> There is no need to change the WordCount code at all!

In [8]:
val context = new WayangContext().withPlugin(Spark.basicPlugin)

[36mcontext[39m: [32mWayangContext[39m = org.apache.wayang.core.api.WayangContext@3c6d9d9a

In [9]:
var result = wordcount(context)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/jovyan/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/jovyan/.cache/coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
24/02/05 15:50:53 INFO SparkContext: Running Spark version 3.1.2
24/02/05 15:50:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/05 15:50:54 INFO ResourceUtils: No custom resources configured for spark.driver.
24/02/05 15:50:54 INFO SparkContext: 

fs.s3.awsAccessKeyId
fs.s3.awsSecretAccessKey


24/02/05 15:50:55 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.9 KiB, free 2.1 GiB)
24/02/05 15:50:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.1 KiB, free 2.1 GiB)
24/02/05 15:50:55 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 80d73ccee89a:39579 (size: 25.1 KiB, free: 2.1 GiB)
24/02/05 15:50:55 INFO SparkContext: Created broadcast 0 from textFile at SparkTextFileSource.java:70
24/02/05 15:50:56 INFO SparkContext: Starting job: hasNext at Iterator.java:132
24/02/05 15:50:56 INFO FileInputFormat: Total input files to process : 1
24/02/05 15:50:56 INFO DAGScheduler: Registering RDD 5 (mapToPair at SparkReduceByOperator.java:95) as input to shuffle 0
24/02/05 15:50:56 INFO DAGScheduler: Got job 0 (hasNext at Iterator.java:132) with 1 output partitions
24/02/05 15:50:56 INFO DAGScheduler: Final stage: ResultStage 1 (hasNext at Iterator.java:132)
24/02/05 15:50:56 INFO DAGScheduler: Parents o

## Executing WordCount in Flink

Prepare the dependencies and set the Flink Plugin

In [10]:
import $ivy.`org.apache.wayang::wayang-flink:0.7.1`
import org.apache.wayang.flink.Flink
val context = new WayangContext().withPlugin(Flink.basicPlugin)

[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36morg.apache.wayang.flink.Flink
[39m
[36mcontext[39m: [32mWayangContext[39m = org.apache.wayang.core.api.WayangContext@6350f087

In [11]:
var result = wordcount(context)

24/02/05 15:51:10 INFO TypeExtractor: class scala.Tuple2 is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
24/02/05 15:51:10 INFO ExecutionEnvironment: The job has 1 registered types and 0 default Kryo serializers


## The Wordcount native implementation in the platforms

One would have to implement the wordcount three times for three different platforms!

### Java Stream Implementation

```java    
public class JavaWordCount {
  public static void main(String args[]) {
    long wordCount = 0;
    Path textFilePath = Paths.get(args[0]);
    try {
      Stream<String> fileLines = 
      Files
          .lines(textFilePath, Charset.defaultCharset())
          .flatMap(line -> Arrays.stream(line.split("\\W+")));
          .filter(t -> t.length() != 0)
          .collect(
              Collectors.toConcurrentMap(
                  w -> w, 
                  w -> 1,
                  Integer::sum
              )
          );
    } catch (IOException ioException) {
      ioException.printStackTrace();
    }
  }
}
```

### Apache Flink Implementation

```scala
object FlinkWordCount {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    
    env.readTextFile(args(0))
      .flatMap { _.split("\\W+") 
      .filter(t -> t.length() != 0)
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
      .writeAsCsv(args(1), "\n", " ")
                
    env.execute("FlinkWordCount")
  }
}
```

### Apache Spark Implementation

```scala
object SparkWordCount {
    def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("SparkWordCount")
      val sc = new SparkContext(conf)
      sc.textFile(args(0))
          .flatMap(line => line.split("\\W+"))
          .filter(word => word.length != 0 )
          .map(word => (word, 1)) // implicit convertion to RDDPair
          .reduceByKey{case (x, y) => x + y}
          .saveAsTextFile(args(1))
    }
}
```