/
sendMessages.js
105 lines (86 loc) · 3.49 KB
/
sendMessages.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
const createRetry = require('../retry')
const flatten = require('../utils/flatten')
const { KafkaJSMetadataNotLoaded } = require('../errors')
const groupMessagesPerPartition = require('./groupMessagesPerPartition')
const createTopicData = require('./createTopicData')
const responseSerializer = require('./responseSerializer')
const { keys } = Object
const TOTAL_INDIVIDUAL_ATTEMPTS = 5
const staleMetadata = e =>
['UNKNOWN_TOPIC_OR_PARTITION', 'LEADER_NOT_AVAILABLE', 'NOT_LEADER_FOR_PARTITION'].includes(
e.type
)
module.exports = ({ logger, cluster, partitioner }) => {
const retrier = createRetry({ retries: TOTAL_INDIVIDUAL_ATTEMPTS })
return async ({ acks, timeout, compression, topicMessages }) => {
const responsePerBroker = new Map()
for (let { topic } of topicMessages) {
await cluster.addTargetTopic(topic)
}
const createProducerRequests = async responsePerBroker => {
const topicMetadata = new Map()
for (let { topic, messages } of topicMessages) {
const partitionMetadata = cluster.findTopicPartitionMetadata(topic)
if (keys(partitionMetadata).length === 0) {
logger.error('Producing to topic without metadata', {
topic,
targetTopics: Array.from(cluster.targetTopics),
})
throw new KafkaJSMetadataNotLoaded('Producing to topic without metadata')
}
const messagesPerPartition = groupMessagesPerPartition({
topic,
partitionMetadata,
messages,
partitioner,
})
const partitions = keys(messagesPerPartition)
const partitionsPerLeader = cluster.findLeaderForPartitions(topic, partitions)
const leaders = keys(partitionsPerLeader)
topicMetadata.set(topic, { partitionsPerLeader, messagesPerPartition })
for (let nodeId of leaders) {
const broker = await cluster.findBroker({ nodeId })
if (!responsePerBroker.has(broker)) {
responsePerBroker.set(broker, null)
}
}
}
const brokers = Array.from(responsePerBroker.keys())
const brokersWithoutResponse = brokers.filter(broker => !responsePerBroker.get(broker))
return brokersWithoutResponse.map(async broker => {
const entries = Array.from(topicMetadata.entries())
const topicDataForBroker = entries
.filter(([_, { partitionsPerLeader }]) => !!partitionsPerLeader[broker.nodeId])
.map(([topic, { partitionsPerLeader, messagesPerPartition }]) => ({
topic,
partitions: partitionsPerLeader[broker.nodeId],
messagesPerPartition,
}))
const topicData = createTopicData(topicDataForBroker)
try {
const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
} catch (e) {
responsePerBroker.delete(broker)
throw e
}
})
}
const makeRequests = async (bail, retryCount, retryTime) => {
try {
const requests = await createProducerRequests(responsePerBroker)
await Promise.all(requests)
const responses = Array.from(responsePerBroker.values())
return flatten(responses)
} catch (e) {
if (staleMetadata(e) || e.name === 'KafkaJSMetadataNotLoaded') {
await cluster.refreshMetadata()
}
throw e
}
}
return retrier(makeRequests).catch(e => {
throw e.originalError || e
})
}
}