Kafka Connect connector for reading CSV files into Kafka.
Java Shell
Clone or download

README.md

Overview

This Kafka Connect connector provides the capability to watch a directory for files and read the data as new files are written to the input directory. Each of the records in the input file will be converted based on the user supplied schema.

The CSVRecordProcessor supports reading CSV or TSV files. It can convert a CSV on the fly to the strongly typed Kafka Connect data types. It currently has support for all of the schema types and logical types that are supported in Kafka 0.10.x. If you couple this with the Avro converter and Schema Registry by Confluent, you will be able to process csv files to strongly typed Avro data in real time.

Schema Configuration

This connector requires you to specify the schema of the files to be read. This is controlled by the key.schema and the value.schema configuration settings. This setting is a schema in json form stored as a string. The field schemas are used to parse the data in each row. This allows the user to strongly define the types for each field.

Generating Schemas

Csv

mvn clean package
export CLASSPATH="$(find target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-spooldir -type f -name '*.jar' | tr '\n' ':')"
kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t csv -f src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data -c config/CSVExample.properties -i id

Json

mvn clean package
export CLASSPATH="$(find target/kafka-connect-target/usr/share/kafka-connect/kafka-connect-spooldir -type f -name '*.jar' | tr '\n' ':')"
kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t json -f src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/json/FieldsMatch.data -c config/JsonExample.properties -i id

Key schema example

{
    "name" : "com.example.users.UserKey",
    "type" : "STRUCT",
    "isOptional" : false,
    "fieldSchemas" : {
        "id" : {
          "type" : "INT64",
          "isOptional" : false
        }
    }
}

Value schema example

{
  "name" : "com.example.users.User",
  "type" : "STRUCT",
  "isOptional" : false,
  "fieldSchemas" : {
    "id" : {
      "type" : "INT64",
      "isOptional" : false
    },
    "first_name" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "last_name" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "email" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "gender" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "ip_address" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "last_login" : {
      "name" : "org.apache.kafka.connect.data.Timestamp",
      "type" : "INT64",
      "version" : 1,
      "isOptional" : true
    },
    "account_balance" : {
      "name" : "org.apache.kafka.connect.data.Decimal",
      "type" : "BYTES",
      "version" : 1,
      "parameters" : {
        "scale" : "2"
      },
      "isOptional" : true
    },
    "country" : {
      "type" : "STRING",
      "isOptional" : true
    },
    "favorite_color" : {
      "type" : "STRING",
      "isOptional" : true
    }
  }
}

Logical Type Examples

Timestamp

{
  "name" : "org.apache.kafka.connect.data.Timestamp",
  "type" : "INT64",
  "version" : 1,
  "isOptional" : true
}

Decimal

{
  "name" : "org.apache.kafka.connect.data.Decimal",
  "type" : "BYTES",
  "version" : 1,
  "parameters" : {
    "scale" : "2"
  },
  "isOptional" : true
}

Date

{
  "name" : "org.apache.kafka.connect.data.Date",
  "type" : "INT32",
  "version" : 1,
  "isOptional" : true
}

Time

{
  "name" : "org.apache.kafka.connect.data.Time",
  "type" : "INT32",
  "version" : 1,
  "isOptional" : true
}

Configuration

SpoolDirCsvSourceConnector

The SpoolDirCsvSourceConnector will monitor the directory specified in input.path for files and read them as a CSV converting each of the records to the strongly typed equavalent specified in key.schema and value.schema.

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector

# Set these required values
finished.path=
input.file.pattern=
error.path=
topic=
input.path=
Name Description Type Default Valid Values Importance
error.path The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@319b4fb3 high
finished.path The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@20851e5b high
input.file.pattern Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). string high
input.path The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@5de6d178 high
topic The Kafka topic to write the data to. string high
halt.on.error Should the task halt when it encounters an error or continue to the next file. boolean true high
key.schema The schema for the key written to Kafka. string "" high
value.schema The schema for the value written to Kafka. string "" high
csv.first.row.as.header Flag to indicate if the fist row of data contains the header of the file. If true the position of the columns will be determined by the first row to the CSV. The column position will be inferred from the position of the schema supplied in value.schema. If set to true the number of columns must be greater than or equal to the number of fields in the schema. boolean false medium
schema.generation.enabled Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set. boolean false medium
schema.generation.key.fields The field(s) to use to build a key schema. This is only used during schema generation. list [] medium
schema.generation.key.name The name of the generated key schema. string com.github.jcustenborder.kafka.connect.model.Key medium
schema.generation.value.name The name of the generated value schema. string com.github.jcustenborder.kafka.connect.model.Value medium
timestamp.field The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a Timestamp string "" medium
timestamp.mode Determines how the connector will set the timestamp for the ConnectRecord. If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used. string PROCESS_TIME ValidEnum{enum=TimestampMode, allowed=[FIELD, FILE_TIME, PROCESS_TIME]} medium
batch.size The number of records that should be returned with each batch. int 1000 low
csv.case.sensitive.field.names Flag to determine if the field names in the header row should be treated as case sensitive. boolean false low
csv.escape.char Escape character. int 92 low
csv.file.charset Character set to read wth file with. string UTF-8 Big5,Big5-HKSCS,CESU-8,EUC-JP,EUC-KR,GB18030,GB2312,GBK,IBM-Thai,IBM00858,IBM01140,IBM01141,IBM01142,IBM01143,IBM01144,IBM01145,IBM01146,IBM01147,IBM01148,IBM01149,IBM037,IBM1026,IBM1047,IBM273,IBM277,IBM278,IBM280,IBM284,IBM285,IBM290,IBM297,IBM420,IBM424,IBM437,IBM500,IBM775,IBM850,IBM852,IBM855,IBM857,IBM860,IBM861,IBM862,IBM863,IBM864,IBM865,IBM866,IBM868,IBM869,IBM870,IBM871,IBM918,ISO-2022-CN,ISO-2022-JP,ISO-2022-JP-2,ISO-2022-KR,ISO-8859-1,ISO-8859-13,ISO-8859-15,ISO-8859-2,ISO-8859-3,ISO-8859-4,ISO-8859-5,ISO-8859-6,ISO-8859-7,ISO-8859-8,ISO-8859-9,JIS_X0201,JIS_X0212-1990,KOI8-R,KOI8-U,Shift_JIS,TIS-620,US-ASCII,UTF-16,UTF-16BE,UTF-16LE,UTF-32,UTF-32BE,UTF-32LE,UTF-8,windows-1250,windows-1251,windows-1252,windows-1253,windows-1254,windows-1255,windows-1256,windows-1257,windows-1258,windows-31j,x-Big5-HKSCS-2001,x-Big5-Solaris,x-COMPOUND_TEXT,x-euc-jp-linux,x-EUC-TW,x-eucJP-Open,x-IBM1006,x-IBM1025,x-IBM1046,x-IBM1097,x-IBM1098,x-IBM1112,x-IBM1122,x-IBM1123,x-IBM1124,x-IBM1364,x-IBM1381,x-IBM1383,x-IBM300,x-IBM33722,x-IBM737,x-IBM833,x-IBM834,x-IBM856,x-IBM874,x-IBM875,x-IBM921,x-IBM922,x-IBM930,x-IBM933,x-IBM935,x-IBM937,x-IBM939,x-IBM942,x-IBM942C,x-IBM943,x-IBM943C,x-IBM948,x-IBM949,x-IBM949C,x-IBM950,x-IBM964,x-IBM970,x-ISCII91,x-ISO-2022-CN-CNS,x-ISO-2022-CN-GB,x-iso-8859-11,x-JIS0208,x-JISAutoDetect,x-Johab,x-MacArabic,x-MacCentralEurope,x-MacCroatian,x-MacCyrillic,x-MacDingbat,x-MacGreek,x-MacHebrew,x-MacIceland,x-MacRoman,x-MacRomania,x-MacSymbol,x-MacThai,x-MacTurkish,x-MacUkraine,x-MS932_0213,x-MS950-HKSCS,x-MS950-HKSCS-XP,x-mswin-936,x-PCK,x-SJIS_0213,x-UTF-16LE-BOM,X-UTF-32BE-BOM,X-UTF-32LE-BOM,x-windows-50220,x-windows-50221,x-windows-874,x-windows-949,x-windows-950,x-windows-iso2022jp low
csv.ignore.leading.whitespace Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored. boolean true low
csv.ignore.quotations Sets the ignore quotations mode - if true, quotations are ignored. boolean false low
csv.keep.carriage.return Flag to determine if the carriage return at the end of the line should be maintained. boolean false low
csv.null.field.indicator Indicator to determine how the CSV Reader can determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information see http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html. string NEITHER ValidEnum{enum=CSVReaderNullFieldIndicator, allowed=[EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER]} low
csv.quote.char The character that is used to quote a field. This typically happens when the csv.separator.char character is within the data. int 34 low
csv.separator.char The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \t. int 44 low
csv.skip.lines Number of lines to skip in the beginning of the file. int 0 low
csv.strict.quotes Sets the strict quotes setting - if true, characters outside the quotes are ignored. boolean false low
csv.verify.reader Flag to determine if the reader should be verified. boolean true low
empty.poll.wait.ms The amount of time to wait if a poll returns an empty list of records. long 1000 [1,...,9223372036854775807] low
file.minimum.age.ms The amount of time in milliseconds after the file was last written to before the file can be processed. long 0 [0,...,9223372036854775807] low
parser.timestamp.date.formats The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html list [yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss] low
parser.timestamp.timezone The timezone that all of the dates will be parsed with. string UTC low
processing.file.extension Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. string .PROCESSING ValidPattern{pattern=^.*..+$} low

SpoolDirJsonSourceConnector

This connector is used to stream JSON files from a directory while converting the data based on the schema supplied in the configuration.

name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector

# Set these required values
finished.path=
input.file.pattern=
topic=
error.path=
input.path=
Name Description Type Default Valid Values Importance
error.path The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@4a126898 high
finished.path The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@21940db3 high
input.file.pattern Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). string high
input.path The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. string com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@f723474 high
topic The Kafka topic to write the data to. string high
halt.on.error Should the task halt when it encounters an error or continue to the next file. boolean true high
key.schema The schema for the key written to Kafka. string "" high
value.schema The schema for the value written to Kafka. string "" high
schema.generation.enabled Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set. boolean false medium
schema.generation.key.fields The field(s) to use to build a key schema. This is only used during schema generation. list [] medium
schema.generation.key.name The name of the generated key schema. string com.github.jcustenborder.kafka.connect.model.Key medium
schema.generation.value.name The name of the generated value schema. string com.github.jcustenborder.kafka.connect.model.Value medium
timestamp.field The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a Timestamp string "" medium
timestamp.mode Determines how the connector will set the timestamp for the ConnectRecord. If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field. If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used. string PROCESS_TIME ValidEnum{enum=TimestampMode, allowed=[FIELD, FILE_TIME, PROCESS_TIME]} medium
batch.size The number of records that should be returned with each batch. int 1000 low
empty.poll.wait.ms The amount of time to wait if a poll returns an empty list of records. long 1000 [1,...,9223372036854775807] low
file.minimum.age.ms The amount of time in milliseconds after the file was last written to before the file can be processed. long 0 [0,...,9223372036854775807] low
parser.timestamp.date.formats The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html list [yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss] low
parser.timestamp.timezone The timezone that all of the dates will be parsed with. string UTC low
processing.file.extension Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. string .PROCESSING ValidPattern{pattern=^.*..+$} low

Building on you workstation

git clone git@github.com:jcustenborder/kafka-connect-spooldir.git
cd kafka-connect-spooldir
mvn clean package

Debugging

Running with debugging

./bin/debug.sh

Running with debugging - suspending

export SUSPEND='y'
./bin/debug.sh