-
Notifications
You must be signed in to change notification settings - Fork 5
/
KafkaUnitAdmin.java
61 lines (42 loc) · 1.8 KB
/
KafkaUnitAdmin.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.big.data.kafka.unit;
import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import java.util.Properties;
/**
* Created by kunalgautam on 17.02.17.
*/
public class KafkaUnitAdmin {
public static final int tickTime = 5000;
public static final int sessionTimeout = 60000;
public static final int waitTime = 5000;
private ZkClient zkClient;
private ZkUtils zkUtils ;
private static final ZkSerializer zkSerializer = new ZkSerializer() {
public byte[] serialize(Object data) throws ZkMarshallingError {
return ZKStringSerializer.serialize(data);
}
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
};
public KafkaUnitAdmin(KafkaUnit unit) throws Exception {
zkClient = new ZkClient(unit.getConfig().getZkString(), sessionTimeout, waitTime, zkSerializer);
zkUtils = new ZkUtils(zkClient, new ZkConnection(unit.getConfig().getZkString()), false);
// ZooKeeper zooKeeper = new ZooKeeper(unit.getConfig().getZkString(), 10000, null);
// if(zooKeeper.getChildren("/brokers/ids", false).size() != 1){
// get clustersize
// System.exit(0);
// }
}
public void createTopic(String topicName,int noOfPartitions, int noOfReplication, Properties topicConfiguration ){
AdminUtils.createTopic(zkUtils,topicName, noOfPartitions, noOfReplication, topicConfiguration);
}
public void deleteTopic(String topicName) {
AdminUtils.deleteTopic(zkUtils,topicName);
}
}