### Great resource for Kafka
https://kafka.apache.org/documentation/

### Add Maven dependency

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>me.nikhil</groupId>
	<artifactId>kafka-examples</artifactId>
	<version>1.0-SNAPSHOT</version>
	<name>kafkaexamples</name>

	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.11.0.0</version>
		</dependency>
	</dependencies>
</project>
```

### Producer

```java
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerDemo {

	public static void main(String args[]) {
		
		Properties properties = new Properties();

		//kafka bootstrap server
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("key.serializer", StringSerializer.class.getName());
		properties.setProperty("value.serializer", StringSerializer.class.getName());
		
		//producer acks
		properties.setProperty("acks", "1");
		properties.setProperty("retries", "3");
		properties.setProperty("linger.ms", "1");
		
		Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
		
		for (int key=0 ; key < 10; key++) {
			ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("firsttopic", Integer.toString(key), "message with key " + Integer.toString(key));
			producer.send(producerRecord);
		}
		producer.close();
		
	}
}
```

### Consumer

```java
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaConsumerDemo {

	public static void main(String args[]) {
		
		Properties properties = new Properties();

		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("key.deserializer", StringDeserializer.class.getName());
		properties.setProperty("value.deserializer", StringDeserializer.class.getName());
		
		properties.setProperty("group.id", "test");
		properties.setProperty("enable.auto.commit", "true");
		properties.setProperty("auto.commit.interval.ms", "1000");
		properties.setProperty("auto.offset.reset", "earliest");
		
		KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
		kafkaConsumer.subscribe(Arrays.asList("firsttopic"));
		
		while (true) {
			ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);
			for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
				System.out.println("Partition: " + consumerRecord.partition() + 
								   ", Offset: "  + consumerRecord.offset() +
								   ", Key: "  + consumerRecord.key() +
								   ", Value: "  + consumerRecord.value());
			}
		}
	}
}

```

### Properties - Configurations for the producer and consumer

https://kafka.apache.org/documentation/#producerconfigs  
https://kafka.apache.org/documentation/#newconsumerconfigs

The ones that we looked at are:

#### Producer

- **bootstrap.servers**  
    A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
- **key.serializer**	
    Serializer class for key that implements the Serializer interface.
- **value.serializer**	 
    Serializer class for value that implements the Serializer interface.
- **acks**  
    The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
    - acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
    - acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
    - acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
- **retries**  
    Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
- **linger.ms**	 
    - tl;dr : The messages aren't sent until the Kafka client sends it for you. You can do `producer.flush()`, or you can set this property to say how often the producer should automatically send the messages.
    - The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.	
    

#### Consumer

- **bootstrap.servers**  
    Same as above
- **key.deserializer**  
    Deserializer class for key that implements the Deserializer interface.
- **value.deserializer**  
    Deserializer class for value that implements the Deserializer interface.
- **group.id**  
    A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy.
- **enable.auto.commit**  
    If true the consumer's offset will be periodically committed in the background.
- **auto.commit.interval.ms**
    - The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true.
    - Alternatively, use `consumer.commitSync()`
- **auto.offset.reset**
    What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
    - earliest: automatically reset the offset to the earliest offset
    - latest: automatically reset the offset to the latest offset
    - none: throw exception to the consumer if no previous offset is found for the consumer's group
    - anything else: throw exception to the consumer.

I also created executable fat jars using maven. In doing so, I learnt 3 things:
- Creating a fat jar using maven
- Reading from a properties file in Java in a maven project
- Reading from the command line using the Apache Commons CLI library

The jar is set up to run the consumer as the main class:
```bash
$ java -jar kafka-examples-1.0-SNAPSHOT-jar-with-dependencies.jar --bootstrap.servers 127.0.0.1:9092 --topics firsttopic
```

You could easily run the producer class as follows:
```bash
$ java -cp kafka-examples-1.0-SNAPSHOT-jar-with-dependencies.jar me.nikhil.kafkatester.KafkaProducerDemo
```