# Distributed Systems
## 2021/22

Lab 10

Nuno Preguiça, Sérgio Duarte, Dina Borrego, João Vilalonga

# Goals

In the end of this lab you should be able to:

+ Understand what is Apache Kafka;
+ Know how to perform publish/subscribe communications using Kafka;
+ Know how to exploit Kafka to perform total order execution of operations

# Apache KAFKA

[Kakfa](https://kafka.apache.org/) is an open-source distributed event streaming platform.

+ Exposes a topic-based [publish/subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) API. 
+ Provides **group communication** with ordering and reliability guarantees.


# Apache KAFKA Architecture

<img src="https://preguica.github.io/sd2122/praticas2122/aula10/kafka-architecture.png" width="50%"></img>

# Apache KAFKA - Reliability

Events are persisted in a **persistent log**, stored in stable secondary **stable memory** (disk).

+ The log survives broker crashes;
+ The log is pruned lazily (ex., events in log can last days);
+ The log can be replayed from the beginning<br> to receive events published while subscriber was offline;
+ Each topic has its own, independent, event log.

# Apache KAFKA - Ordering

For performance reasons, for each topic, the event log can be **partitioned**.

For each topic, events are **totally ordered** within a **given** partition.

The event record **offset** defines the ordering of an event within a given partition.

For topics that use more than one partition, events are **partially ordered**.

## Apache KAFKA - Maven Dependencies

```xml
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
	</dependency>
</dependencies>
```

## Apache KAFKA - Create Publisher

```java
static final String TOPIC = "mytopic";
static final String KAFKA_BROKERS = "localhost:9092";
static final String EVENT_DATA_STR = "some event data";

var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

var publisher = new KafkaProducer<String, String>(props);		
```

[KafkaProducer](https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html) represents the event publisher.

To create the publisher we need some configuration properties.

+ A comma separated list of brokers endpoints (host:port);
+ The serializer classes used to encode event data into raw bytes.

There are more [configuration properties](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) that provide fine control over the behavior of the publisher

## Apache KAFKA - Publishing Events
```java
var publisher = new KafkaProducer<String, String>(props);		

var promise = publisher.send( new ProducerRecord<>(TOPIC, EVENT_DATA_STR));
var metadata = promise.get();
		
long offset = metadata.offset();
```

Events are published as [producer records](https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html).

We need to provide:

+ The event **topic**;
+ The event **value**;
+ Optionally, the event **key** and/or **partition**.

The send operation is asynchronous and only returns a [future](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Future.html) that will hold
the event record [metadata](https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/RecordMetadata.html) after completion.

To block until the *send* is completed, we call *get()* on the *metadata future*. 

The event record metadata contains some useful information about the event, including its **offset**.

## Apache KAFKA - Create Subscriber

```java
static final String MY_GROUP_ID = "my_group_id";
static final String REPLAY_FROM_BEGINNING = "earliest";
static final String KAFKA_BROKERS = "localhost:9092, kafka:9092";

var props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

props.put(ConsumerConfig.GROUP_ID_CONFIG, MY_GROUP_ID );
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, REPLAY_FROM_BEGINNING);

var consumer = new KafkaConsumer<String, String>(props);
```

[KafkaConsumer](https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html) represents the event subscriber.

To create the subscriber we need some configuration properties.

+ A comma separated list of brokers endpoints (host:port);
+ The deserializer classes used to decode raw bytes to event key and values;
+ The group id of the consumer (if multiple consumers share the same group id, consumed events are split among them)
+ The mode of event log replay: earliest - from the beggining; latest - the next event to be published.

There are more [configuration properties](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) that provide fine control over the behavior of the subscriber.

## Apache KAFKA - Subscription and notification

```java
static final String TOPIC = "mytopic";

var consumer = new KafkaConsumer<String, String>(props);

consumer.subscribe(List.of(TOPIC));

consumer.poll(Duration.ofSeconds(10)).forEach( System.out::println );
```

The consumer can subscribe to multiple topics, supplied as a list.

Events are notified as [consumer records](https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html).

The event record contains the event data: **key** and **value**, as well as, *metadata*, <br>namely, the **topic** it belongs to, and the offset, within the **topic partition**.

Consuming events is a blocking operation: **poll** blocks for a period of time; <br> and, returns
the list of events, ordered according to the semantics:
 + unordered among topics;
 + partially ordered among partitions of the same topic;
 + totally ordered within the same topic partition.

## Apache Zookeeper

[Zookeeper](https://zookeeper.apache.org/) is a centralized service for highly reliable distributed coordination.

Can provide distributed applications with:

+ Naming;
+ Distributed synchronization;
+ **Distributed consensus/elections**;
+ Configuration information storage.

## Apache Zookeeper

Zookeeper provides hierarchical tuple space that stores information in a reliable and **strongly
consistent** way.

Some relevant aspects of zookeeper:

+ Tuples have a *unique name* and optionally a *value* associated. 
 + A tuple is named a **znode** in the zookeeper nomenclature.


+ Tuples are hierarchical like the directory structure of a file system. 
 + You can have a tuple named ’zoo’; a child tuple named ‘one’, whose complete name is ‘/zoo/one’


+ It is possible to **monitor changes** to a znode (and all its children),<br> and be **notified** by zookeeper whenever a change
happens.

## Zookeeper tuples types

There are several kinds of znodes.

+ **Persistent** znodes survive the crash of the client that created them;
+ **Ephemeral** nodes are deleted automatically when the client session/connection that created them is broken;

+ **Persistent_Sequential** and **Ephemeral_Sequential** are nodes whose names are generated automatically in a strictly increasing order.

## Zookeeper - Maven dependencies

```xml
<dependency>
    <groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.8.0</version>
</dependency>
```

## Zookeeper - Client session¶

```java
static final int TIMEOUT = 5000;

var connectedSignal = new CountDownLatch(1);

client = new ZooKeeper(host, TIMEOUT, (e) -> {
    if (e.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
        connectedSignal.countDown();
});
    
connectedSignal.await();
// client is now connected...
```

Connection to the Zookeeper server is asynchronous and go throw multiple states.

A countdown latch can be used as barrier to only proceed once the connection event is notified.

## Zookeeper - Tuple creation

```java
public String createNode(String path, byte[] data, CreateMode mode) {
    try {
        return client.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
	} catch (KeeperException.NodeExistsException x) {
		return path;
	} catch (Exception x) {
        return null;
    }
}
```

Example usages:

```java
createNode("/path", new byte[0], CreateMode.PERSISTENT)
    
var new_path = createNode("/path", new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL)
```

To create a tuple using a client in a *connected* state, we supply:

+ the path (full name) of the tuple;
+ the data associated with the tuple;
+ the type of node.

For **EPHEMERAL_SEQUENTIAL** zknodes the name is created automatically by Zookeeper and is returned
by the creation operation. 

## Zookeeper  List Tuple Children 

```java 
public List<String> getChildren(String path) {
    try {
        return client.getChildren(path, false);
	} catch (Exception x) {
        x.printStackTrace();
	}
	return Collections.emptyList();
}
```

## Zookeeper  List Tuple Children 

```java 
public List<String> getChildren(String path, Watcher watcher) {
    try {
        return client().getChildren(path, watcher);
	} catch (Exception x) {
        x.printStackTrace();
	}
	return Collections.emptyList();
}
```

Example usage:

```java
getChilden("/path", (e) -> {
    // something under '/path' changed...
});
```

# EXERCISES


### Kafka : clients in localhost

+ Download and study the sample projects provided.

+ Start Kafka with the provided script.

 `sh start-kafka.sh localhost`


+ Run the KafkaSender and KafkaReceiver examples, in eclipse/IDE.


### Kafka : clients as docker containers

+ Edit KafkaSender and KafkaReceiver and replace the KAFKA_BROKERS constant to "kafka:9092"

+ Start Kafka with the provided script.

 `sh start-kafka.sh kafka`


+ Build the docker image

 `mvn clean compile assembly:single docker:build`


+ Run KafkaReceiver

 `docker run -t --network=sdnet sd2122-aula10-kafka java -cp /home/sd/sd.jar sd2122.aula10.kafka.examples.KafkaReceiver`


+ Run KafkaSender

 `docker run -t --network=sdnet sd2122-aula10-kafka java -cp /home/sd/sd.jar sd2122.aula10.kafka.examples.KafkaSender   msg`

### TP2 Kafka Replication Helper Code

+ The provided *SyncPoint class* is a generic version of the code presented in lectures.

+ Check *TotalOrderExecutor class* on how to use SyncPoint class.