In [1]:
# https://pypi.org/project/kafka-python/

* Topics:  

Every message that is feed into the system must be part of some topic. The topic is nothing but a stream of records. The messages are stored in key-value format. Each message is assigned a sequence, called Offset. The output of one message could be an input of the other for further processing.  

* Producers:  

Producers are the apps responsible to publish data into Kafka system. They publish data on the topic of their choice.

* Consumers:  

The messages published into topics are then utilized by Consumers apps. A consumer gets subscribed to the topic of its choice and consumes data.  

* Broker:  

Every instance of Kafka that is responsible for message exchange is called a Broker. Kafka can be used as a stand-alone machine or a part of a cluster.  



## ZK windows install  
1. JRE installation  
2. ZK download http://zookeeper.apache.org/releases.html#download
3. Copy and Rename “zoo_sample.cfg” to “zoo.cfg” in C:\Tools\zookeeper-3.4.9\conf
4. Create data directory in zookeeper folder.
5. Find & edit dataDir=/tmp/zookeeper to C:\\Tools\\zookeeper\\zookeeper-3.4.9\\data using any text editor like notepad or notepad++. (change the zookeeper version as yours)
6. dataDir=C:\\Tools\\zookeeper\\zookeeper-3.4.9\\data
7. Add entries in System Environment Variables.
8. Add in System Variables ZOOKEEPER_HOME = C:\Tools\zookeeper-3.4.9
9. Edit System Variable named “Path” and append this in the last ;%ZOOKEEPER_HOME%\bin;
10. Open command prompt and type zkserver.

## Kafka windows install
1. Go to config folder in Apache Kafka and edit “server.properties” using any text editor.
2. Find log.dirs and repelace after “=/tmp/kafka-logs” to “=C:\\Tools\\kafka_2.10–0.10.1.1\\kafka-logs” (change your version number).

## Kafka windows setup  
Open command prompt and go to your Apache Kafka directory and run following command.  
`.\bin\windows\kafka-server-start.bat .\config\server.properties`  

Topic creation:  
`kafka-topics.bat — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic some_kafka_topic`  

Producer creation:  
    
`kafka-console-producer.bat — broker-list localhost:9092 — topic some_kafka_topic`  
The broker-list parameter specifies the brokers to be connected as <node_address:port> — that is, localhost:9092.

Consumer creation:  
`kafka-console-consumer.bat -bootstrap-server localhost:2181 -topic sql-insert`

By running all four components (zookeeper, broker, producer, and consumer) in different terminals, you will be able to enter messages from the producer’s terminal and see them appearing in the subscribed consumer’s terminal. If everything works fine, you will be able to push and see messages.

# Linux/macOS ZK + Kafka
Jāielādē Kafka no Apache foundation lapas.  
Kafka līdzi nāk zookeeper servera palaišanas skripts  
`bin/zookeeper-server-start.sh config/zookeeper.properties`  

Un brokera (servera / nodes) palaišanas skripts  
`bin/kafka-server-start.sh config/server.properties`  

Tad uz brokera ir jāizveido topiks  
`bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic some_kafka_topic`  

Palaižot to pašu skriptu ar citiem flagiem var redzēt pilno sarakstu ar topikiem  
`bin/kafka-topics.sh --list --zookeeper localhost:2181` 

In [None]:
# # Jāinstalē bibliotēka vai nu ar termināli vai ar anaconda promptu:
# pip install kafka-python

In [8]:
from json import loads
from json import dumps
import time

from kafka import KafkaConsumer
from kafka import KafkaProducer

In [7]:
# we assume that Zookeeper is running default on localhost:2181 and Kafka on localhost:9092.
# consumer = KafkaConsumer('some_kafka_topic')

In [None]:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))

In [None]:
consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

* The first argument is the topic, numtest in our case.
* `bootstrap_servers=[‘localhost:9092’]`: same as our producer
* `auto_offset_reset=’earliest’`: one of the most important arguments. It handles where the consumer restarts reading after breaking down or being turned off and can be set either to earliest or latest. When set to latest, the consumer starts reading at the end of the log. When set to earliest, the consumer starts reading at the latest committed offset. And that’s exactly what we want here.
* `enable_auto_commit=True`: makes sure the consumer commits its read offset every interval.
* `auto_commit_interval_ms=1000ms`: sets the interval between two commits. Since messages are coming in every five second, committing every second seems fair.
* `group_id=’counters’`: this is the consumer group to which the consumer belongs. Remember from the introduction that a consumer needs to be part of a consumer group to make the auto commit work.
* The value deserializer deserializes the data into a common json format, the inverse of what our value serializer was doing.

In [None]:
# Tagad atveriet jaunu termināli, ielogojieties ar ssh tajā pašā VM, 
# atveriet pitona shellu,
# izdariet visus nepieciešamos importus,
# Un izpildiet divas sekojošās rindiņas
data = {'number' : 2}
producer.send('numtest', value=data)

In [None]:
for message in consumer:
    message = message.value