Skip to content

Commit

Permalink
First version (#1)
Browse files Browse the repository at this point in the history
* Add Source
  • Loading branch information
Charlie QUILLARD authored and damiencarol committed May 22, 2017
1 parent 5804b63 commit 982da45
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
language: java
jdk:
- oraclejdk8

sudo: false
script: mvn clean verify
24 changes: 24 additions & 0 deletions config/checkstyle.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Puppy Crawl//DTD Check Configuration 1.3//EN"
"http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<module name="Checker">
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="TreeWalker">
<module name="UpperEll"/>
<module name="MethodLength"/>
<module name="LineLength">
<property name="max" value="120"/>
</module>
<module name="Indentation">
<property name="basicOffset" value="2"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="2"/>
</module>
</module>
</module>
6 changes: 6 additions & 0 deletions config/nats.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name=nats-source
connector.class=com.oyst.kafka.connect.nats.NatsSourceConnector
tasks.max=1
topic=NATS-POST
nats.subject=POST
nats.host=nats://localhost:4222
159 changes: 159 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.oyst.kafka.connect</groupId>
<artifactId>nats</artifactId>
<version>0.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<timestamp>${maven.build.timestamp}</timestamp>
<maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format>
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>0.10.2.0</version>
</dependency>
<!-- scope test -->
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-easymock</artifactId>
<version>1.6.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.logimethods</groupId>
<artifactId>nats-connector-gatling_2.11</artifactId>
<version>0.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.3</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<finalName>kafka-connect-nats</finalName>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
<configuration>
<failOnViolation>true</failOnViolation>
<consoleOutput>true</consoleOutput>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<configLocation>${project.basedir}/config/checkstyle.xml</configLocation>
<sourceDirectories>
<directory>${project.build.sourceDirectory}</directory>
</sourceDirectories>
<testSourceDirectories>
<directory>${project.build.testSourceDirectory}</directory>
</testSourceDirectories>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>7.3</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

</project>
61 changes: 61 additions & 0 deletions src/main/java/com/oyst/kafka/connect/nats/NatsSourceConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.oyst.kafka.connect.nats;

import com.oyst.kafka.connect.nats.source.NatsSourceConnectorConfig;
import com.oyst.kafka.connect.nats.source.NatsSourceTask;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class NatsSourceConnector extends SourceConnector {
private static final Logger LOG = LoggerFactory.getLogger(NatsSourceConnector.class);
private Map<String, String> mConfigProperties;

/**
* Start the NATS connector
* @param props Connector's properties
*/
@Override
public void start(Map<String, String> props) {
LOG.info("Start the NATS Source Connector with the next properties : {}", props);
this.mConfigProperties = props;

}

@Override
public void stop() {
LOG.info("Stop the Nats Source Connector");
}

@Override
public String version() {
return AppInfoParser.getVersion();
}

@Override
public Class<? extends Task> taskClass() {
return NatsSourceTask.class;
}

@Override
public ConfigDef config() {
return NatsSourceConnectorConfig.config;
}

@Override
public List<Map<String, String>> taskConfigs(int capacity) {
List<Map<String, String>> taskConfigs = new ArrayList<>(capacity);
Map<String, String> taskProps = new HashMap<>(mConfigProperties);
for (int i = 0; i < capacity; i++){
taskConfigs.add(taskProps);
}
return taskConfigs;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.oyst.kafka.connect.nats.source;

import com.oyst.kafka.connect.nats.NatsSourceConnector;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;


public class NatsSourceConnectorConfig extends AbstractConfig {
private static final Logger LOG = LoggerFactory.getLogger(NatsSourceConnector.class);
/**
* Create default mConfig.
* @return default mConfig
*/
public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(NatsSourceConnectorConstants.KAFKA_TOPIC, Type.STRING, "nats",
Importance.LOW, NatsSourceConnectorConstants.KAFKA_TOPIC_DOC)
.define(NatsSourceConnectorConstants.NATS_SUBJECT, Type.STRING, null,
Importance.MEDIUM, NatsSourceConnectorConstants.NATS_SUBJECT_DOC)
.define(NatsSourceConnectorConstants.NATS_HOST, Type.STRING, "nats://localhost:4222",
Importance.HIGH, NatsSourceConnectorConstants.NATS_HOST_DOC);
}

public static final ConfigDef config = baseConfigDef();

/**
* Transform process properties.
*
* @param properties associative array with properties to be process
*/
public NatsSourceConnectorConfig(Map<String, String> properties) {
super(config, properties);
LOG.info("Initialize transform process properties");
}

public static void main(String[] args) {
System.out.println(config.toRst());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.oyst.kafka.connect.nats.source;

public class NatsSourceConnectorConstants {
// Constants
public static final String KAFKA_TOPIC = "topic";
public static final String NATS_SUBJECT = "nats.subject";
public static final String NATS_HOST = "nats.host";

// Constants Doc
public static final String KAFKA_TOPIC_DOC = "Kafka topic to put received data";
public static final String NATS_SUBJECT_DOC = "The NATS SUBJECT";
public static final String NATS_HOST_DOC = "THE NATS HOST";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.oyst.kafka.connect.nats.source;

import io.nats.client.Connection;
import io.nats.client.Nats;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class NatsSourceTask extends SourceTask {
private static final Logger LOG = LoggerFactory.getLogger(NatsSourceTask.class);
private Connection nc;
private String ktopic;
private BlockingQueue<SourceRecord> mQueue = new LinkedBlockingQueue<>();

@Override
public void start(Map<String, String> map) {
LOG.info("Start the Nats Source Task");
String nsubject = map.get(NatsSourceConnectorConstants.NATS_SUBJECT);
String nhost = map.get(NatsSourceConnectorConstants.NATS_HOST);
this.ktopic = map.get(NatsSourceConnectorConstants.KAFKA_TOPIC);
try {
this.nc = Nats.connect(nhost);
LOG.info("Connected to the next NATS URL : " + this.nc.getConnectedUrl());
} catch (IOException e){
e.printStackTrace();
}

this.nc.subscribe(nsubject, message -> {
LOG.info("Sending the next message : {}", message);
SourceRecord sc = new SourceRecord(null,null,
ktopic ,Schema.STRING_SCHEMA, message.getSubject(),
Schema.BYTES_SCHEMA, message.getData());
mQueue.add(sc);
});
}

@Override
public void stop() {
LOG.info("Stop the Nats Source Task");
this.nc.close();
}

@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();

mQueue.drainTo(records);
return records;
}

public String version() {
return AppInfoParser.getVersion();
}
}
Loading

0 comments on commit 982da45

Please sign in to comment.