- point-point model (queue/router)
- pub-sub model
-
- Distributed Message Cache
-
- Distributed Event Bus
same with AZ quantity
This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the "delete" retention policy. This represents an SLA on how soon consumers must read their data. If set to -1, no time limit is applied.
- Type: long
- Default: 604800000 (7 days)
- Valid Values: [-1,...]
- Server Default Property: log.retention.ms
- Importance: medium
- auto.create.topics.enable=true
- min.insync.replicas=2
- num.io.threads=8
- num.network.threads=5
- num.replica.fetchers=2
- replica.lag.time.max.ms=30000
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- socket.send.buffer.bytes=102400
- zookeeper.session.timeout.ms=18000
pip install kafka-pythonpip install kafka-python
from kafka import KafkaProducer
def send_data(_kafka_topic, _producer):
while True:
data = get_random_record()
partition_key = str(data["rowkey"])
print(data)
_producer.send(_kafka_topic, json.dumps(data).encode('utf-8'))
if __name__ == '__main__':
producer = KafkaProducer(bootstrap_servers="<msk cluster broker list >")
KAFKA_TOPIC = "<topic name>"
send_data(KAFKA_TOPIC, producer)
The default value is 1, which means as long as the producer receives an ack from the leader broker of that topic, it would take it as a successful commit and continue with the next message. It’s not recommended to set acks=0, because then you don’t get any guarantee on the commit. acks=all would make sure that the producer gets acks from all the in-sync replicas of this topic. It gives the strongest message durability, but it also takes long time which results in higher latency. So, you need to decide what is more important for you
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("<app name>") \
.config("spark.sql.debug.maxToStringFields", "100") \
.getOrCreate()
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<msk cluster broker list>") \
.option("kafka.security.protocol", "SSL") \
.option("failOnDataLoss", "false") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.option("spark.streaming.kafka.maxRatePerPartition", "50") \
.load()
- vertical scaling: manually upgrade broker instance type, or simply add storage
- horizontal scaling: manually add broker into cluster
- msk update broker count
- kafka cluster expansion ops
msk cluster broker list sample:
<broker1 endpoint>:9092,<broker2 endpoint>:9092,<broker3 endpoint>:9092
cli sample:
./kafka-topics.sh --bootstrap-server <msk cluster broker list> —list
./kafka-console-consumer.sh --bootstrap-server <msk cluster broker list> --topic <topic name> from beginning
./kafka-console-consumer.sh --bootstrap-server <msk cluster broker list> --topic <topic name>
./kafka-topics.sh —bootstrap-server <msk cluster broker list> —create —topic <topic name> —partitions 3 —replication-factor 2
this feature is to offer the possibility that all kafka cluster data is persistent with optimized storage. version 2.8.2 and above available.