-
Notifications
You must be signed in to change notification settings - Fork 0
/
AverageAirportDelayForEachAirportConsumer.java
75 lines (55 loc) · 2.9 KB
/
AverageAirportDelayForEachAirportConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import java.util.Collections;
public class AverageAirportDelayForEachAirportConsumer {
private final static Logger LOGGER = Logger.getLogger(AverageAirportDelayForEachAirportConsumer.class);
private DynamoDB dynamoDB;
public AverageAirportDelayForEachAirportConsumer(KafkaConsumerClient kafkaConsumerClient, DynamoDBClient dynamoDBClient) {
KafkaConsumer<Long, String> kafkaConsumer = kafkaConsumerClient.consumer;
dynamoDB = dynamoDBClient.dynamoDB;
String topic = "average-airport-delay-for-each-airport";
kafkaConsumer.subscribe(Collections.singletonList(topic));
LOGGER.info("Listening to records on topic: " + topic);
while (true) {
ConsumerRecords<Long, String> consumerRecords = kafkaConsumer.poll(1000);
consumerRecords.forEach(this::sendTopicRecordToDynamoDB);
kafkaConsumer.commitAsync();
}
}
private void sendTopicRecordToDynamoDB(ConsumerRecord<Long, String> consumerRecord) {
LOGGER.info("Record key: " + consumerRecord.key());
LOGGER.info("Record value: " + consumerRecord.value());
LOGGER.info("Record partition: " + consumerRecord.partition());
LOGGER.info("Record offset: " + consumerRecord.offset());
final String[] carrierAndDelayAverage = consumerRecord.value()
.replace("(", "")
.replace("[", "")
.replace("]", "")
.replace(")", "")
.replace("\"", "")
.replace("'", "")
.split(",");
if (carrierAndDelayAverage.length == 2) {
final String carrier = carrierAndDelayAverage[0];
final Float averageDelay = Float.parseFloat(carrierAndDelayAverage[1]);
LOGGER.info("Sending carrier: " + carrier + " with average delay " + averageDelay);
Table table = dynamoDB.getTable("average-airport-delay-for-each-airport-streaming");
try {
final Item item = new Item()
.withPrimaryKey("average_delay", averageDelay)
.with("carrier", carrier);
final PutItemOutcome putItemOutcome = table.putItem(item);
LOGGER.info("Item has been put into database successfully" + putItemOutcome.getPutItemResult());
} catch (Exception e) {
LOGGER.error("Failed to put item into table");
e.printStackTrace();
}
}
}
}