# Spark streaming with trace DSL

This notebook uses spark streaming and trace DSL to extract features from trace data
and publish metrics to Prometheus.

In [1]:
%%loadFromPOM
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>2.4.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>2.4.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>2.4.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.12</artifactId>
      <version>2.3.0</version>
    </dependency>    
<dependency>
    <groupId>io.prometheus</groupId>
    <artifactId>simpleclient_httpserver</artifactId>
    <version>0.7.0</version>
</dependency>

## Install library to the local maven repository
This step is only needed if trace DSL source code has been modified.
Open terminal in Jupyter and run the following command:
```
cd work && ./mvnw clean install -DskipTests
```

In [3]:
%maven io.jaegertracing:jaeger-tracedsl:0.1.0-SNAPSHOT

System.out.println(Keys.class);
System.out.println(org.apache.spark.SparkConf.class);

class Keys
class org.apache.spark.SparkConf


## Define connection to Kafka

In [15]:
String kafkaServers = "192.168.42.6:32632";
String kafkaTopic = "jaeger-spans";
int prometheusPort = 9001;

In [21]:
import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.tinkerpop.gremlin.structure.Graph;
import scala.Tuple2;
import io.jaegertracing.analytics.gremlin.*;
import io.jaegertracing.analytics.query.*;
import io.jaegertracing.analytics.model.*;

HTTPServer server = new HTTPServer(prometheusPort);

SparkConf sparkConf = new SparkConf().setAppName("Trace DSL").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));

Set<String> topics = Collections.singleton(kafkaTopic);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaServers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams. put("value.deserializer", ProtoSpanDeserializer.class);
// hack to start always from beginning
kafkaParams.put("group.id", "trace-aggregation-" + System.currentTimeMillis());
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("startingOffsets", "earliest");
kafkaParams.put("endingOffsets", "latest");

JavaInputDStream<ConsumerRecord<String, Span>> messages =
    KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topics, kafkaParams));

JavaPairDStream<String, Span> traceIdSpanTuple = messages.mapToPair(record -> {
  return new Tuple2<>(record.value().traceId, record.value());
});

JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {
  Iterable<Span> spans = traceIdSpans._2();
  Trace trace = new Trace();
  trace.traceId = traceIdSpans._1();
  trace.spans = StreamSupport.stream(spans.spliterator(), false)
      .collect(Collectors.toList());
  return trace;
});

tracesStream.foreachRDD((traceRDD, time) -> {
  traceRDD.foreach(trace -> {
    Graph graph = GraphCreator.create(trace);
    TraceDepth.calculate(graph);
  });
});

ssc.start();
ssc.awaitTermination();

19/11/05 16:40:18 INFO SparkContext: Running Spark version 2.4.4
19/11/05 16:40:18 INFO SparkContext: Submitted application: Trace DSL
19/11/05 16:40:18 INFO SecurityManager: Changing view acls to: jovyan
19/11/05 16:40:18 INFO SecurityManager: Changing modify acls to: jovyan
19/11/05 16:40:18 INFO SecurityManager: Changing view acls groups to: 
19/11/05 16:40:18 INFO SecurityManager: Changing modify acls groups to: 
19/11/05 16:40:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jovyan); groups with view permissions: Set(); users  with modify permissions: Set(jovyan); groups with modify permissions: Set()
19/11/05 16:40:18 INFO Utils: Successfully started service 'sparkDriver' on port 39681.
19/11/05 16:40:18 INFO SparkEnv: Registering MapOutputTracker
19/11/05 16:40:18 INFO SparkEnv: Registering BlockManagerMaster
19/11/05 16:40:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMappe

EvaluationInterruptedException: Evaluator was interrupted while executing: 'ssc.start();'

## Stop

In [20]:
if (server != null) server.stop();
if (ssc != null) { ssc.stop(); ssc.close();}


19/11/05 16:40:12 WARN StreamingContext: StreamingContext has not been started yet
19/11/05 16:40:12 INFO SparkUI: Stopped Spark web UI at http://c8bb256338ca:4040
19/11/05 16:40:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/11/05 16:40:12 INFO MemoryStore: MemoryStore cleared
19/11/05 16:40:12 INFO BlockManager: BlockManager stopped
19/11/05 16:40:12 INFO BlockManagerMaster: BlockManagerMaster stopped
19/11/05 16:40:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/11/05 16:40:12 INFO SparkContext: Successfully stopped SparkContext
19/11/05 16:40:12 WARN StreamingContext: StreamingContext has already been stopped
19/11/05 16:40:12 INFO SparkContext: SparkContext already stopped.


## Get Prometheus metrics

Open browser on the host running this notebook to see exported Prometheus metrics e.g. http://localhost:9001. 
Or configure Prometheus to scarep metrics from the host where this notebook is running.