# Naming the components on the current agent MysqlAgent.sources = mysql_source MysqlAgent.channels = MemChannel MysqlAgent.sinks = kafka_sinks # Describing/Configuring the source MysqlAgent.sources.mysql_source.type = org.keedio.flume.source.SQLSource # URL to connect to database (currently only mysql is supported) MysqlAgent.sources.mysql_source.hibernate.connection.url = jdbc:mysql://[removed]/[removed] # Database connection properties MysqlAgent.sources.mysql_source.hibernate.connection.user = [removed] MysqlAgent.sources.mysql_source.hibernate.connection.password = [removed] MysqlAgent.sources.mysql_source.database = [removed] MysqlAgent.sources.mysql_source.custom.query = SELECT idx, INET_NTOA(IP_SRC_ADDR) source, L4_SRC_PORT source_port, INET_NTOA(IP_DST_ADDR) destination, L4_DST_PORT destination_port, PROTOCOL, BYTES, PACKETS, FROM_UNIXTIME(FIRST_SWITCHED) first_switched, FROM_UNIXTIME(LAST_SWITCHED) last_switched, (LAST_SWITCHED - FIRST_SWITCHED) switched_time_in_seconds FROM flowsv4_3 WHERE FIRST_SWITCHED <= LAST_SWITCHED AND idx>$@$ ORDER BY idx ASC MysqlAgent.sources.mysql_source.hibernate.dialect = org.keedio.flume.source.MySQLCustomDialect #anything but null value (this is a bug in 1.3.7) MysqlAgent.sources.mysql_source.status.column = true # Increment column properties MysqlAgent.sources.mysql_source.incremental.column.name = idx # Increment value is from you want to start taking data from tables (0 will import entire table) MysqlAgent.sources.mysql_source.start.from = 13500000 MysqlAgent.sources.mysql_source.batch.size = 1000 MysqlAgent.sources.mysql_source.max.rows = 10000 # Query delay, each configured milisecond the query will be sent MysqlAgent.sources.mysql_source.run.query.delay=5000 # Status file is used to save last readed row MysqlAgent.sources.mysql_source.status.file.path = /var/log/flume MysqlAgent.sources.mysql_source.status.file.name = mysql_source.status # Describing/Configuring the Kafka sink MysqlAgent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink MysqlAgent.sinks.kafka_sinks.topic = [removed] MysqlAgent.sinks.kafka_sinks.brokerList = [removed] MysqlAgent.sinks.kafka_sinks.requiredAcks = 0 MysqlAgent.sinks.kafka_sinks.batchSize = 1 # Describing/Configuring the channel MysqlAgent.channels.MemChannel.type = memory MysqlAgent.channels.MemChannel.capacity = 10000 MysqlAgent.channels.MemChannel.transactionCapacity = 2000 # Binding the source and sink to the channel MysqlAgent.sources.mysql_source.channels = MemChannel MysqlAgent.sinks.kafka_sinks.channel = MemChannel