# Kafka Consumer
### Basic Setup
#### Imports

In [None]:
import json
from rich import print, print_json
from kafka import KafkaConsumer

#### Config

In [None]:
KAFKA_HOST = "kafka"
KAFKA_PORT = 9092
KAFKA_VERSION = (0, 10, 2)
BOOTSTRAP_SERVER = f"{KAFKA_HOST}:{KAFKA_PORT}"

### Creating a Consumer
It's time now to create a `KafkaConsumer`. 

In [None]:
consumer = KafkaConsumer(
    bootstrap_servers=[BOOTSTRAP_SERVER],
    security_protocol="PLAINTEXT",
    value_deserializer=lambda v: json.loads(v.decode("ascii")),
    auto_offset_reset="earliest",
)

The consumer is ready, pointing to our Kafka service and using a deserialization function that will take the bytes from the message value and transform them into a json structure performing the opposite transformation to the one made during the production phase.  

By default a consumer starts reading from a Kafka topic from the point in time it attaches to the cluster. Previous messages are not read. We are changing this behaviour with the `auto_offset_reset='earliest'` parameter, allowing us to read from the beginning of the topic. We are now ready to subscribe to the `metar-json` topic and start reading from it with the following code

In [None]:
consumer.subscribe(topics="metar-json")
for message in consumer:
    print(f"Partition: {message.partition} Offset: {message.offset}")
    print("Value:")
    print_json(data=message.value)

The consumer thread never ends: this is justified by the fact that we always want to consume messages as soon as they're available in the Kafka topic, and there is no "end time" in the streaming world. We should also see the first message appearing on our consumer console.

Now if you switch back to the `kafka-producer` notebook, we can produce a new metar message for `Chicago/O'Hare Intl` by executing the final cell in that notebook.

After executing it, we should immediately receive the message shown below, which was sent from the producer, in the output of the `consumer.subscribe()` cell above.

```
Partition: 0 Offset: 1
Value:
{
  "type": "Feature",
  "id": "734075940",
  "properties": {
    "data": "METAR",
    "id": "KORD",
    "site": "Chicago/O'Hare Intl",
    "prior": 0,
    "obsTime": "2021-12-24T11:35:00Z",
    "temp": 3.9,
    "dewp": 2.2,
    "wspd": 6,
    "wdir": 160,
    "ceil": 2,
    "cover": "OVC",
    "cldCvg1": "OVC",
    "cldBas1": "2",
    "visib": 1.25,
    "fltcat": "LIFR",
    "altim": 1005.8,
    "wx": "BR",
    "rawOb": "KORD 241135Z 16006KT 1 1/4SM R10L/5000VP6000FT BR OVC002 04/02 A2970 RMK AO2 T00390022"
  },
  "geometry": {
    "type": "Point",
    "coordinates": [
      -87.932,
      41.96
    ]
  }
}
```

If you're wondering what the `Partition: 0 Offset: 1` prefix is, check out the consumer code. They are the topics **partition** and **offset** meaning that we are reading the second message (offset starts with 0) from partition 0 of the topic.