/
KafkaStreaming.scala
288 lines (226 loc) · 11.8 KB
/
KafkaStreaming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import java.time.Duration
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.protocol
import org.apache.kafka.common.serialization._
import org.apache.kafka.common._
import org.apache.spark.streaming.kafka010.KafkaUtils._
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import SparkBigData._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.log4j.{LogManager, Logger}
import org.apache.spark.streaming.dstream.InputDStream
import java.util.Properties
import java.util.Collections
import scala.collection.JavaConverters._
import org.apache.kafka.clients.producer.{ProducerRecord, _}
import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.clients.producer.ProducerConfig._
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
/*
cet objet regroupe l'ensemble des méthodes et fonctions nécessaires :
1 - pour établir une connexion avec un cluster Kafka (localhost ou cluster)
2 - pour consommer des données provenant d'un ou plusieurs topic Kafka
3 - pour stocker les données dans Kafka
Vous apprenez ici à développer des applications Kafka robustes avec Spark et Scala
*/
object KafkaStreaming {
var KafkaParam : Map[String, Object] = Map(null, null) // mauvaise approche, car variable immutable
var consommateurKafka : InputDStream[ConsumerRecord[String, String]] = null //mauvaise approche, car variable immutable
private var trace_kafka : Logger = LogManager.getLogger("Log_Console")
/**
* cette fonction récupère les paramètres de connexion à un cluster Kafka
* @param kafkaBootStrapServers : adresses IP (avec port) des agents du cluster Kafka
* @param KafkaConsumerGroupId : c'est l'ID du consumer group
* @param KafkaConsumerReadOrder : l'ordre de lecture du Log
* @param KafkaZookeeper : l'adresse IP (avec port) de l'ensemble ZooKeeper
* @param KerberosName : le nom du service Kerberos
* @return : la fonction renvoie une table clé-valeur des paramètres de connexion à un cluster Kafka spécifique
*/
def getKafkaSparkConsumerParams ( kafkaBootStrapServers : String, KafkaConsumerGroupId : String, KafkaConsumerReadOrder : String,
KafkaZookeeper : String, KerberosName : String) : Map[String, Object] = {
KafkaParam = Map(
"bootstrap.servers" -> kafkaBootStrapServers,
"group.id" -> KafkaConsumerGroupId,
"zookeeper.hosts" -> KafkaZookeeper,
"auto.offset.reset" -> KafkaConsumerReadOrder,
"enable.auto.commit" -> (false: java.lang.Boolean),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"sasl.kerberos.service.name" -> KerberosName,
"security.protocol" -> SecurityProtocol.PLAINTEXT
)
return KafkaParam
}
def getKafkaProducerParams (KafkaBootStrapServers : String) : Properties = {
val props : Properties = new Properties()
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("bootstrap.servers ", KafkaBootStrapServers)
props.put("security.protocol", "SASL_PLAINTEXT")
return props
}
def getKafkaProducerParams_exactly_once (KafkaBootStrapServers : String) : Properties = {
val props : Properties = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaBootStrapServers)
props.put("security.protocol", "SASL_PLAINTEXT")
//propriétés pour rendre le producer Exactly-Once
props.put(ProducerConfig.ACKS_CONFIG, "all")
// pour la cohérence éventuelle. Doit être inférieur ou égal au facteur de réplication du topic dans lequel vous allez publier
props.put("min.insync.replicas", "2")
//rendre le producer idempotent
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
props.put(ProducerConfig.RETRIES_CONFIG, "3")
props.put("max.in.flight.requests.per.connection", "3")
return props
}
def getKafkaConsumerParams (kafkaBootStrapServers : String, KafkaConsumerGroupId : String) : Properties = {
val props : Properties = new Properties()
props.put("bootstrap.servers", kafkaBootStrapServers)
props.put("auto.offset.reset", "latest")
props.put("group.id",KafkaConsumerGroupId )
props.put("enable.auto.commit", "false")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
return props
}
def getClientConsumerKafka (kafkaBootStrapServers : String, KafkaConsumerGroupId : String, topic_list : String) : KafkaConsumer[String, String] = {
trace_kafka.info("instanciation d'un consommateur Kafka...")
val consumer = new KafkaConsumer[String, String](getKafkaConsumerParams(kafkaBootStrapServers , KafkaConsumerGroupId))
try {
consumer.subscribe(Collections.singletonList(topic_list)) //on pouvait aussi faire ceci : List(topic_list).asJava
while(true) {
val messages : ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(30))
if(!messages.isEmpty) {
trace_kafka.info("Nombre de messages collectés dans la fenêtre :" + messages.count())
for(message <- messages.asScala) {
println("Topic: " + message.topic() +
",Key: " + message.key() +
",Value: " + message.value() +
", Offset: " + message.offset() +
", Partition: " + message.partition())
}
try {
consumer.commitAsync() // ou bien consumer.commitSync()
} catch {
case ex : CommitFailedException =>
trace_kafka.error("erreur dans le commit des offset. Kafka n'a pas reçu le jeton de reconnaissance confirmant que nous avons bien reçu les données")
}
// méthode de lecture 2
/* val messageIterateur = messages.iterator()
while (messageIterateur.hasNext == true) {
val msg = messageIterateur.next()
println(msg.key() + msg.value() + msg.partition())
} */
}
}
} catch {
case excpt : Exception =>
trace_kafka.error("erreur dans le consumer" + excpt.printStackTrace())
} finally {
consumer.close()
}
return consumer
}
/**
* création d'un Kafka Producer qui va être déployé en production
* @param KafkaBootStrapServers : agents kafka sur lesquels publier le message
* @param topic_name : topic dans lequel publier le message
* @param message : message à publier dans le topic @topic_name
* @return : renvoie un Producer Kafka
*/
def getProducerKafka (KafkaBootStrapServers : String, topic_name : String, message : String) : KafkaProducer[String, String] = {
trace_kafka.info(s"instanciation d'une instance du producer Kafka aux serveurs : ${KafkaBootStrapServers}")
lazy val producer_Kafka = new KafkaProducer[String, String](getKafkaProducerParams(KafkaBootStrapServers))
trace_kafka.info(s"message à publier dans le topic ${topic_name}, ${message}")
val cle : String = "1"
val record_publish = new ProducerRecord[String, String](topic_name, cle, message)
try {
trace_kafka.info("publication du message encours...")
producer_Kafka.send(record_publish)
trace_kafka.info("message publié avec succès ! :)")
} catch {
case ex : Exception =>
trace_kafka.error(s"erreur dans la publication du message dans Kafka ${ex.printStackTrace()}")
trace_kafka.info("La liste des paramètres pour la connexion du Producer Kafka sont :" + getKafkaProducerParams(KafkaBootStrapServers))
} finally {
println("n'oubliez pas de clôturer le Producer à la fin de son utilisation")
}
return producer_Kafka
}
def ProducerKafka_exactly_once (KafkaBootStrapServers : String, topic_name : String) : KafkaProducer[String, String] = {
trace_kafka.info(s"instanciation d'une instance du producer Kafka aux serveurs : ${KafkaBootStrapServers}")
val producer_Kafka = new KafkaProducer[String, String](getKafkaProducerParams_exactly_once(KafkaBootStrapServers))
val record_publish = getJSON(KafkaBootStrapServers : String, topic_name : String)
try {
trace_kafka.info("publication du message encours...")
producer_Kafka.send(record_publish, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception == null) {
//le message a été enregistré dans Kafka sans problème.
trace_kafka.info("offset du message : " + metadata.offset().toString)
trace_kafka.info("topic du message : " + metadata.topic().toString())
trace_kafka.info("partition du message : " + metadata.partition().toString())
trace_kafka.info("heure d'enregistrement du message : " + metadata.timestamp())
}
}
} )
trace_kafka.info("message publié avec succès ! :)")
} catch {
case ex : Exception =>
trace_kafka.error(s"erreur dans la publication du message dans Kafka ${ex.printStackTrace()}")
trace_kafka.info("La liste des paramètres pour la connexion du Producer Kafka sont :" + getKafkaProducerParams_exactly_once(KafkaBootStrapServers))
} finally {
println("n'oubliez pas de clôturer le Producer à la fin de son utilisation")
}
return producer_Kafka
}
def getJSON(KafkaBootStrapServers : String, topic_name : String) : ProducerRecord[String, String] = {
val objet_json = JsonNodeFactory.instance.objectNode()
val price : Int = 45
objet_json.put("orderid", "")
objet_json.put("customerid", "")
objet_json.put("campaignid", "")
objet_json.put("orderdate", "")
objet_json.put("city", "")
objet_json.put("state", "")
objet_json.put("zipcode", "")
objet_json.put("paymenttype", "CB")
objet_json.put("totalprice", price)
objet_json.put("numorderlines", 200)
objet_json.put("numunit",10)
return new ProducerRecord[String, String](topic_name,objet_json.toString)
}
/**
*
* @param kafkaBootStrapServers : adresse IP des agents Kafka
* @param KafkaConsumerGroupId : ID du consummer Group
* @param KafkaConsumerReadOrder : ordre de lecture des données du Log
* @param KafkaZookeeper : ensemble Zookeeper
* @param KerberosName : service kerberos
* @param KafkaTopics : le nom des topics
* @return
*/
def getConsommateurKafka( kafkaBootStrapServers : String, KafkaConsumerGroupId : String, KafkaConsumerReadOrder : String,
KafkaZookeeper : String, KerberosName : String,
KafkaTopics : Array[String], StreamContext : StreamingContext) : InputDStream[ConsumerRecord[String, String]] = {
try {
KafkaParam = getKafkaSparkConsumerParams(kafkaBootStrapServers, KafkaConsumerGroupId , KafkaConsumerReadOrder ,KafkaZookeeper, KerberosName )
consommateurKafka = KafkaUtils.createDirectStream[String, String](
StreamContext,
PreferConsistent,
Subscribe[String, String](KafkaTopics, KafkaParam )
)
} catch {
case ex : Exception =>
trace_kafka.error(s"erreur dans l'initialisation du consumer Kafka ${ex.printStackTrace()}")
trace_kafka.info(s"La liste des paramètres pour la connexion du consommateur Kafka sont : ${KafkaParam}")
}
return consommateurKafka
}
}