# Let's expand our cluster to three nodes

## Generate new config file for each of the brokers

In [None]:
!cp ${KAFKA_HOME}/config/server.properties ${KAFKA_HOME}/config/server-1.properties
!cp ${KAFKA_HOME}/config/server.properties ${KAFKA_HOME}/config/server-2.properties

In [None]:
!cat ${KAFKA_HOME}/config/server.properties 

In [None]:
!ls ${KAFKA_HOME}/config/

## Edit the files and set the following properties:
The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each other's data.

``` bash
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dirs=/tmp/kafka-logs-2
```

In [None]:
!cp resources/* ${KAFKA_HOME}/config

## Starting the 2 new brokers
In two terminals:
``` bash
kafka-server-start.sh ${KAFKA_HOME}/config/server-1.properties
kafka-server-start.sh ${KAFKA_HOME}/config/server-2.properties
```

# Create a new topic
Replication-factor now is 3

In [None]:
!kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

# Get information about brokers and topics

In [None]:
!kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic

#### "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.

#### "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.

#### "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

In [None]:
!kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test

# Publish some messages
In a terminal
```bash
kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
```

# Consuming messages

In [None]:
# You have to stop this cell before continue
!kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

# Killing the leader

In [None]:
!ps aux | grep server.properties

In [None]:
!kill -9 <leaderProcessID>

# Get information about brokers and topics

In [None]:
#Consulting a new broker, port:9093
!kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic my-replicated-topic

#### Leadership has switched to one of the followers and the node killed is no longer in the in-sync replica set.

# Messages are still available for consumption even though the leader that took the writes originally is down

In [None]:
!kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic