Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues in using the source connector with HDFS #5

Closed
jumbo007 opened this issue May 20, 2017 · 22 comments
Closed

Issues in using the source connector with HDFS #5

jumbo007 opened this issue May 20, 2017 · 22 comments

Comments

@jumbo007
Copy link

This is my use case: There will be CSV files getting dumped on a HDFS path. I have to produce these CSV files on a Kafka Topic.
I downloaded the project on my eclipse and built using maven. Inside the target directory, I got - "kafka-connect-hdfs-0.10.2.0-package" which had etc/kafka-connect-hdfs/kafka-connect-fs.properties. This is what I updated in this file:
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=2
fs.uris=hdfs://abc.com:9000/test
topic=mytopic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=","
file_reader.delimited.header=true

I downloaded confluent-3.2.1 and updated etc/kafka/connect-standalone.properties as below:
bootstrap.servers=1.2.3.4:9092,1.2.3.5:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

I then, moved the target directory from my windows to $CONFLUENT_HOME path in the server. I exported classpath in the same way as mentioned by you in the Getting started section:
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '-package' | tr '\n' ':')"

Used below command to start the connector:
nohup $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/target/kafka-connect-hdfs-0.10.2.0-package/etc/kafka-connect-hdfs/kafka-connect-fs.properties &

Now, I get error as:
[2017-05-20 11:26:50,155] ERROR Failed to create job for /home/tmp/confluent-3.2.1/target/kafka-connect-hdfs-0.10.2.0-package/etc/kafka-connect-hdfs/kafka-connect-fs.properties (org.apache.kafka.connect.cli.ConnectStandalone:88)
[2017-05-20 11:26:50,156] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.github.mmolimar.kafka.connect.fs.FsSourceConnector, available connectors are: io.confluent.connect.replicator.ReplicatorSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, io.confluent.connect.s3.S3SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, io.confluent.connect.storage.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSourceConnector, org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.sink.SinkConnector, io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, org.apache.kafka.connect.tools.MockConnector, org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.source.SourceConnector, io.confluent.connect.hdfs.HdfsSinkConnector, io.confluent.connect.hdfs.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.github.mmolimar.kafka.connect.fs.FsSourceConnector, available connectors are: io.confluent.connect.replicator.ReplicatorSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, io.confluent.connect.s3.S3SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, io.confluent.connect.storage.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSourceConnector, org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.sink.SinkConnector, io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, org.apache.kafka.connect.tools.MockConnector, org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.source.SourceConnector, io.confluent.connect.hdfs.HdfsSinkConnector, io.confluent.connect.hdfs.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector
at org.apache.kafka.connect.runtime.ConnectorFactory.getConnectorClass(ConnectorFactory.java:84)
at org.apache.kafka.connect.runtime.ConnectorFactory.newConnector(ConnectorFactory.java:38)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:336)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:235)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
[2017-05-20 11:26:50,159] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66)
[2017-05-20 11:26:50,159] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-05-20 11:26:50,159] DEBUG stopping org.eclipse.jetty.server.Server@1aa7ecca (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,162] DEBUG Graceful shutdown org.eclipse.jetty.server.Server@1aa7ecca by (org.eclipse.jetty.server.Server:418)
[2017-05-20 11:26:50,162] DEBUG stopping ServerConnector@42a48628{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,162] DEBUG stopping org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@3c19aaa5 (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,163] DEBUG stopping org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=0 selected=0 (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,163] DEBUG Stopping org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=0 selected=0 (org.eclipse.jetty.io.SelectorManager:432)
[2017-05-20 11:26:50,164] DEBUG Queued change org.eclipse.jetty.io.SelectorManager$ManagedSelector$Stop@2a20ba2c (org.eclipse.jetty.io.SelectorManager:480)
[2017-05-20 11:26:50,167] DEBUG Selector loop woken up from select, 0/0 selected (org.eclipse.jetty.io.SelectorManager:602)
[2017-05-20 11:26:50,168] DEBUG Running change org.eclipse.jetty.io.SelectorManager$ManagedSelector$Stop@2a20ba2c (org.eclipse.jetty.io.SelectorManager:525)
[2017-05-20 11:26:50,168] DEBUG Stopped org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=-1 selected=-1 (org.eclipse.jetty.io.SelectorManager:437)
[2017-05-20 11:26:50,168] DEBUG STOPPED org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=-1 selected=-1 (org.eclipse.jetty.util.component.AbstractLifeCycle:204)

What am I doing wrong here? Is my deployment not OK?
I also tried to put the share/java/kafka-connect-hdfs jars from target path to actual confluent's share/java/kafka-connect-hdfs, but that did not help.
May be you can throw some light here. How to run this connector end to end. I am in dire need to make this run and implement it.

@mmolimar
Copy link
Owner

Hi there,

I think there is a misunderstanding here. You have in your classpath the kafka-connect-hdfs connector NOT the kafka-connect-fs.
You have to compile this connector and put it in the Kafka-Connect classpath and that's all.

Try it out again and let me know if you have any issue :-)

@jumbo007
Copy link
Author

Hi
Thanks for this update.
Actually, in the first time, I updated in the pom as kafka-connect-hdfs and 0.10.2.0. Hence you see kafka-connect-hdfs-0.10.2.0 instead of kafka-connect-fs-0.2-SNAPSHOT.

Anyway, now I picked up a fresh zip from git and did not update the pom at all. I built through "mvn clean package" and moved the target to $CONFLUENT_HOME. I exported the classpath as:
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '-package' | tr '\n' ':')"

When I echo $CLASSPATH, I do see all the jars present inside the target path this time.

Now, I have given fs.uris as hdfs://abc.com:9000/test where I have two CSV files.

I am using below properties file:

name=FsSourceConnector4
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=2
fs.uris=hdfs://abc.com:9000/test
topic=mytopic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
policy.recursive=true
policy.regexp="^%a+%.csv$"
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=","

The regex looks fine to me to filter only .csv files.
The two CSVs are:
10,ABC,XYZ
20,PQR,MNO
and
99,CAR,1000
100,BIKE,2
They do satisfy the delimited token as well.

But I get this:
[2017-05-21 04:34:47,828] INFO FsSourceTaskConfig values:
file_reader.class = class com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
fs.uris = [hdfs://abc.com:9000/test]
policy.class = class com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
policy.recursive = true
policy.regexp = "^%a+%.csv$"
topic = mytopic1
(com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig:180)
[2017-05-21 04:34:47,829] INFO FsSourceTaskConfig values:
file_reader.class = class com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
fs.uris = [hdfs://abc.com:9000/test]
policy.class = class com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
policy.recursive = true
policy.regexp = "^%a+%.csv$"
topic = mytopic1
(com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig:180)
[2017-05-21 04:34:47,868] INFO HdfsFileWatcherPolicy values:
policy.regexp = "^%a+%.csv$"
policy.class = com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
policy.recursive = true
(com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy:253)
[2017-05-21 04:34:48,016] DEBUG setsid exited with exit code 0 (org.apache.hadoop.util.Shell:420)
[2017-05-21 04:34:48,070] DEBUG field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, valueName=Time, value=[Rate of successful kerberos logins and latency (milliseconds)]) (org.apache.hadoop.metrics2.lib.MutableMetricsFactory:42)
[2017-05-21 04:34:48,080] DEBUG field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, valueName=Time, value=[Rate of failed kerberos logins and latency (milliseconds)]) (org.apache.hadoop.metrics2.lib.MutableMetricsFactory:42)
[2017-05-21 04:34:48,081] DEBUG field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, valueName=Time, value=[GetGroups]) (org.apache.hadoop.metrics2.lib.MutableMetricsFactory:42)
[2017-05-21 04:34:48,082] DEBUG UgiMetrics, User and group related metrics (org.apache.hadoop.metrics2.impl.MetricsSystemImpl:232)
[2017-05-21 04:34:49,399] DEBUG Creating new Groups object (org.apache.hadoop.security.Groups:301)
[2017-05-21 04:34:49,402] DEBUG Trying to load the custom-built native-hadoop library... (org.apache.hadoop.util.NativeCodeLoader:46)
[2017-05-21 04:34:49,403] DEBUG Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path (org.apache.hadoop.util.NativeCodeLoader:55)
[2017-05-21 04:34:49,404] DEBUG java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.hadoop.util.NativeCodeLoader:56)
[2017-05-21 04:34:49,404] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[2017-05-21 04:34:49,405] DEBUG Falling back to shell based (org.apache.hadoop.util.PerformanceAdvisory:41)
[2017-05-21 04:34:49,405] DEBUG Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping (org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback:45)
[2017-05-21 04:34:49,492] DEBUG Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000; warningDeltaMs=5000 (org.apache.hadoop.security.Groups:112)
[2017-05-21 04:34:49,494] DEBUG hadoop login (org.apache.hadoop.security.UserGroupInformation:222)
[2017-05-21 04:34:49,495] DEBUG hadoop login commit (org.apache.hadoop.security.UserGroupInformation:157)
[2017-05-21 04:34:49,498] DEBUG using local user:UnixPrincipal: puser (org.apache.hadoop.security.UserGroupInformation:187)
[2017-05-21 04:34:49,498] DEBUG Using user: "UnixPrincipal: puser" with name puser (org.apache.hadoop.security.UserGroupInformation:193)
[2017-05-21 04:34:49,498] DEBUG User entry: "puser" (org.apache.hadoop.security.UserGroupInformation:203)
[2017-05-21 04:34:49,499] DEBUG UGI loginUser:puser (auth:SIMPLE) (org.apache.hadoop.security.UserGroupInformation:827)
[2017-05-21 04:34:49,701] DEBUG dfs.client.use.legacy.blockreader.local = false (org.apache.hadoop.hdfs.BlockReaderLocal:457)
[2017-05-21 04:34:49,701] DEBUG dfs.client.read.shortcircuit = false (org.apache.hadoop.hdfs.BlockReaderLocal:460)
[2017-05-21 04:34:49,701] DEBUG dfs.client.domain.socket.data.traffic = false (org.apache.hadoop.hdfs.BlockReaderLocal:463)
[2017-05-21 04:34:49,701] DEBUG dfs.domain.socket.path = (org.apache.hadoop.hdfs.BlockReaderLocal:466)
[2017-05-21 04:34:49,734] DEBUG multipleLinearRandomRetry = null (org.apache.hadoop.io.retry.RetryUtils:75)
[2017-05-21 04:34:49,749] DEBUG rpcKind=RPC_PROTOCOL_BUFFER, rpcRequestWrapperClass=class org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper, rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@3fa882bb (org.apache.hadoop.ipc.Server:234)
[2017-05-21 04:34:49,753] DEBUG getting client out of cache: org.apache.hadoop.ipc.Client@28542800 (org.apache.hadoop.ipc.Client:63)
[2017-05-21 04:34:50,036] DEBUG Both short-circuit local reads and UNIX domain socket are disabled. (org.apache.hadoop.util.PerformanceAdvisory:110)
[2017-05-21 04:34:50,045] DEBUG DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil:183)
[2017-05-21 04:34:50,050] DEBUG dfs.client.use.legacy.blockreader.local = false (org.apache.hadoop.hdfs.BlockReaderLocal:457)
[2017-05-21 04:34:50,051] DEBUG dfs.client.read.shortcircuit = false (org.apache.hadoop.hdfs.BlockReaderLocal:460)
[2017-05-21 04:34:50,051] DEBUG dfs.client.domain.socket.data.traffic = false (org.apache.hadoop.hdfs.BlockReaderLocal:463)
[2017-05-21 04:34:50,051] DEBUG dfs.domain.socket.path = (org.apache.hadoop.hdfs.BlockReaderLocal:466)
[2017-05-21 04:34:50,052] DEBUG multipleLinearRandomRetry = null (org.apache.hadoop.io.retry.RetryUtils:75)
[2017-05-21 04:34:50,052] DEBUG getting client out of cache: org.apache.hadoop.ipc.Client@28542800 (org.apache.hadoop.ipc.Client:63)
[2017-05-21 04:34:50,053] DEBUG DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection (org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil:183)
[2017-05-21 04:34:50,056] INFO Source task WorkerSourceTask{id=FsSourceConnector5-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:138)
[2017-05-21 04:34:50,056] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 04:34:50,081] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 04:34:50,081] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 04:34:50,082] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 04:34:50,082] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 04:34:50,082] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 04:34:50,082] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 04:34:50,082] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 04:34:50,082] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 04:34:50,082] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)

And then these last last few lines keep repeating. Where is the problem now?

@jumbo007
Copy link
Author

It gives below warning in the middle. Does this have any effect?

[2017-05-21 04:48:43,489] WARN could not create Dir using jarFile from url file:/home/puser/tmp/confluent-3.1.1/target/kafka-connect-fs-0.2-SNAPSHOT-package/share/java/kafka-connect-fs/fastutil-6.5.7.jar. skipping. (org.reflections.Reflections:104)
java.util.zip.ZipException: invalid CEN header (bad signature)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.(ZipFile.java:220)
at java.util.zip.ZipFile.(ZipFile.java:150)
at java.util.jar.JarFile.(JarFile.java:166)
at java.util.jar.JarFile.(JarFile.java:130)
at org.reflections.vfs.Vfs$DefaultUrlTypes$1.createDir(Vfs.java:212)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:99)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:91)
at org.reflections.Reflections.scan(Reflections.java:237)
at org.reflections.Reflections.scan(Reflections.java:204)
at org.reflections.Reflections.(Reflections.java:129)
at org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:275)
at org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:384)
at java.lang.Thread.run(Thread.java:745)

@jumbo007
Copy link
Author

Tried with $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties, but the nohup did not change at all. In the getting started page, you mention the path of kafka-connect-fs.properties as config/kafka-connect-fs.properties.

I dont see this path of kafka-connect-fs.properties. I don't see any config directory. My kafka-connect-fs.properties is at target/kafka-connect-fs-0.2-SNAPSHOT-package/etc/kafka-connect-fs/kafka-connect-fs.properties.

Where am I missing the link?

@jumbo007
Copy link
Author

My Kafka Version - 0.10.2.1
Confluent Version I am using - 3.1.1

Downloaded confluent tar from https://www.confluent.io/download-center/

@mmolimar
Copy link
Owner

There are two different things here.

HdfsFileWatcherPolicy just retrieves files when a document is updated or created. If you had already these files in HDFS you won't get any record. To do this, I recommend to use another policy such as SleepyPolicy or SimplePolicy. You can find more info about policies here.

On the other hand, it looks like fastutil-6.5.7.jar is corrupted. Probably it doesn't matter for reflections API but if you wanna fix this, you should clone the repo and build the source using mvn clean package.

@jumbo007
Copy link
Author

Tried SimplePolicy. No record produced on kafka topic.

[2017-05-21 09:39:45,767] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:39:45,767] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,767] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,767] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:39:45,768] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)

@jumbo007
Copy link
Author

No difference while using SleepyPolicy with 1000ms

@jumbo007
Copy link
Author

Created a path /test/tmp on unix and not on HDFS. Changes file.uris to file:///test/tmp.
But that too gave:
[2017-05-21 09:52:47,086] INFO Reflections took 15433 ms to scan 369 urls, producing 16349 keys and 116047 values (org.reflections.Reflections:229)
[2017-05-21 09:52:47,325] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:47,325] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:48,338] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:48,338] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:49,349] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:49,350] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:50,359] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:50,359] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:51,369] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:51,369] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:52,378] DEBUG About to send 0 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:159)
[2017-05-21 09:52:52,379] DEBUG Nothing to send to Kafka. Polling source for additional records (org.apache.kafka.connect.runtime.WorkerSourceTask:154)
[2017-05-21 09:52:52,990] DEBUG Committing offsets for WorkerSourceTask{id=FsSourceConnector9-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:106)
[2017-05-21 09:52:52,990] DEBUG WorkerSourceTask{id=FsSourceConnector9-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:267)
[2017-05-21 09:52:52,990] DEBUG WorkerSourceTask{id=FsSourceConnector9-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:284)
[2017-05-21 09:52:52,990] DEBUG Finished WorkerSourceTask{id=FsSourceConnector9-0} offset commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:310)

There has to be a way to run this. Where is the setup broken?

@jumbo007
Copy link
Author

The com.github.mmolimar.kafka.connect.fs.FsSourceConnector class is pointing to these jars:

[puser@abc confluent-3.1.1]$ find . -name "*.jar" -exec grep -Hsli com.github.mmolimar.kafka.connect.fs.FsSourceConnector {} ;
./target/kafka-connect-fs-0.2-SNAPSHOT-package/share/java/kafka-connect-fs/kafka-connect-fs-0.2-SNAPSHOT.jar
./target/kafka-connect-fs-0.2-SNAPSHOT-development/share/java/kafka-connect-fs/kafka-connect-fs-0.2-SNAPSHOT.jar
./target/kafka-connect-fs-0.2-SNAPSHOT.jar

@jumbo007
Copy link
Author

private List toSend from org.apache.kafka.connect.runtime.WorkerSourceTask does not seem to get list of files to process(neither from local FS or from HDFS). It remains null and hence no file comes in the list to process. There must be a missing link which I am overlooking.

Listing the contents of property files again which I am using to start confluent-connector.

$CONFLUENT_HOME/etc/kafka/connect-standalone.properties
bootstrap.servers=1.2.4.10:9092,1.2.4.18:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

$CONFLUENT_HOME/target/kafka-connect-fs-0.2-SNAPSHOT-package/etc/kafka-connect-fs/kafka-connect-fs.properties
name=FsSourceConnector9
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=2
#fs.uris=hdfs://abc.com:9000/test
fs.uris=file:///tmp/test
topic=mytopic2
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=1000
policy.recursive=true
policy.regexp=".+\.csv"
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=","

Command:- nohup $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/target/kafka-connect-fs-0.2-SNAPSHOT-package/etc/kafka-connect-fs/kafka-connect-fs.properties &

@mmolimar
Copy link
Owner

mmolimar commented May 21, 2017

Maybe the regexp does not match the filenames. Try with this one: .*
Without "" and in the delimited token also.

@jumbo007
Copy link
Author

Exactly. That too clicked me and I removed the regex part itself.
Got something on consumer console this time.
The file is - 99,CAR,1000
And it produced -
{"schema":{"type":"struct","fields":[{"type":"string","optional":false,"field":"column_1"}],"optional":false},"payload":{"column_1":"99,CAR,1000"}}

But many times. I have to get rid of this schema information,etc now and figure out why same message was produced so many times.

@mmolimar
Copy link
Owner

The file_reader.delimited.token property should be without "" also.
And if you're retrieving the same records over and over again it will be due to the offsets were not committed yet. This is in the FAQ and here too.

@jumbo007
Copy link
Author

Hi there,
I am able to produce something at least now. Thanks for the help. There is still a lot to be implemented for my actual CSV files. Before I look into that, may you please answer these two questions for production level deployment of FsConnector.
1- Can this be run in distributed mode? I will have this deployed in two servers and both will run together, processing different files and maintaining offsets. If one goes down, other takes up and maintains data loss till other is brought up.
2- I tried to put confluent folder on a server other than hadoop cluster member. But it gave IOException: HADOOP_HOME is not set. If I keep in any of the hadoop member servers, then this issue vanishes. Is this a point that this connector is to be deployed only on a hadoop cluster member server?

Thanks

@mmolimar
Copy link
Owner

  1. For sure you can. You can do something like this to add the connector:
    cat << EOF > kafka-connect-fs.json { "name": "connect-fs", "config": { "tasks.max": "1", "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector", "fs.uris": "hdfs://abc.com:9000/test", "topic": "mytopic", "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy", "policy.recursive": "true", "policy.regexp": ".*", "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader", "file_reader.delimited.token": "," } } EOF

And then:

curl -X POST -H "Content-Type: application/json" -H "Accept: application/json" -d @kafka-connect-fs.json http://kafka_connect_distributed:8083/connectors

  1. The connector can be deployed in a host out of the hadoop cluster. Try setting that env var or configuring your connector with policy.fs.<fs_property>

@jumbo007
Copy link
Author

That is a very useful information shared. Thanks a lot.
I am facing one issue while implementing with my real time production CSVs. When I try a sample CSV say:-
10,20,30,ABC

I see on the consumer console the value produced as -
{"value":"10,20,30,ABC"}. I dont want this to be produced. I tried with DelimitedText and with Text. But no help.
I then changed connect-standalone.properties. I changed the value of key and value converters.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

But this gave console o/p as:
STRUCT{value=10,20,30,ABC}

All I want is plain data produced on KAFKA. If I have 10,20,30 on file, it should give 10,20,30.
What is the configuration option for such a scenario?

@mmolimar
Copy link
Owner

You have to use DelimitedTextFileReader and the corresponding token in property file_reader.delimited.header

@jumbo007
Copy link
Author

I tried with all the combinations of key converter,value converter,key.schema.enable and with DelimitedTextFileReader. Using DelimitedTextFileReader gives poor o/p as compared to TextFileReader.
O/p is:
Struct{column_1=1,column_2=,column_3=ABC,column_4=XYZ,column_5=14,column_6=G900V,column_7=9082390421_05_06_2017_16_30_01_PART1.dlf,column_8=null,column_9=355300070040105,column_10=40,column_11=5668,column_12=,column_13=187564,column_14=40.38928259,column_15=-74.53900965,column_16=,column_17=,column_18=,column_19=,column_20=,column_21=,column_22=,column_23=,column_24=,column_25=,column_26=-1,column_27=,column_28=,column_29=,column_30=,column_31=,column_32=,column_33=,column_34=,column_35=,column_36=1494108696000,column_37=0,column_38=False,column_39=,column_40=,column_41=,column_42=,column_43=,column_44=,column_45=0,column_46=0,column_47=,column_48=,column_49=0,column_50=,column_51=,column_52=,column_53=,column_54=,column_55=,column_56=,column_57=,column_58=,column_59=,column_60=,column_61=,column_62=,column_63=,column_64=,column_65=,column_66=,column_67=,column_68=,column_69=,column_70=,column_71=,column_72=0,column_73=0,column_74=0,column_75=0,column_76=0,column_77=,column_78=,column_79=,column_80=,column_81=,column_82=,column_83=,column_84=,column_85=,column_86=,column_87=,column_88=,column_89=,column_90=,column_91=,column_92=,column_93=,column_94=,column_95=,column_96=,column_97=,column_98=,column_99=,column_100=,column_101=,column_102=,column_103=,column_104=,column_105=,column_106=,column_107=,column_108=,column_109=,column_110=,column_111=0,column_112=,column_113=,column_114=,column_115=,column_116=,column_117=,column_118=,column_119=,column_120=,column_121=0,column_122=,column_123=,column_124=,column_125=,column_126=,column_127=,column_128=,column_129=,column_130=,column_131=0}
Struct{column_1=1,column_2=,column_3=ABC,column_4=XYZ,column_5=14,column_6=G900V,column_7=9082390421_05_06_2017_16_30_01_PART1.dlf,column_8=null,column_9=355300070040105,column_10=40,column_11=5668,column_12=,column_13=187564,column_14=40.38928259,column_15=-74.53900965,column_16=,column_17=,column_18=,column_19=,column_20=,column_21=,column_22=,column_23=,column_24=,column_25=,column_26=-1,column_27=,column_28=,column_29=,column_30=,column_31=,column_32=,column_33=,column_34=,column_35=,column_36=1494108696000,column_37=0,column_38=False,column_39=,column_40=,column_41=,column_42=,column_43=,column_44=,column_45=0,column_46=0,column_47=,column_48=,column_49=0,column_50=,column_51=,column_52=,column_53=,column_54=,column_55=,column_56=,column_57=,column_58=,column_59=,column_60=,column_61=,column_62=,column_63=,column_64=,column_65=,column_66=,column_67=,column_68=,column_69=,column_70=,column_71=,column_72=0,column_73=0,column_74=0,column_75=0,column_76=0,column_77=,column_78=,column_79=,column_80=,column_81=,column_82=,column_83=,column_84=,column_85=,column_86=,column_87=,column_88=,column_89=,column_90=,column_91=,column_92=,column_93=,column_94=,column_95=,column_96=,column_97=,column_98=,column_99=,column_100=,column_101=,column_102=,column_103=,column_104=,column_105=,column_106=,column_107=,column_108=,column_109=,column_110=,column_111=0,column_112=,column_113=,column_114=,column_115=,column_116=,column_117=,column_118=,column_119=,column_120=,column_121=0,column_122=,column_123=,column_124=,column_125=,column_126=,column_127=,column_128=,column_129=,column_130=,column_131=0}

Does not produce anything with file_reader.delimited.header=false.

I sincerely request you to please give a try yourself.

Just a simple CSV file on HDFS. Say:

101,John,Smith,Computer
102,Jane,Doe,Physics

And produce this on kafka topic as it is:
101,John,Smith,Computer
102,Jane,Doe,Physics

And share the:
etc/kafka/connect-standalone.properties which you used
and kafka-connect-fs.properties which you used.

It will be very beneficial. :)

@mmolimar
Copy link
Owner

mmolimar commented May 23, 2017

You're getting the columns you have in your CSV file and the default column name is 'column_N'. So, I think it's fine.
If you wanna set the column names or just produce a few columns, include a header with those columns and set the flag ``file_reader.delimited.header´´to true.
You can see this in the integration tests.

@jumbo007
Copy link
Author

If I include column name in the CSV and make the header flag true, then instead of column_n, it will use the columns from csv.
But that is not at all what I need.
Why there is struct in the console consumer?Why it comes as Name= ABC,etc ?
These are the few questions which are there. How can I simply produce the exact content from hdfs file to Kafka topic ?

@mmolimar
Copy link
Owner

Because they're Struct records and have their own structure, so it's ok. You're just printing out the objects.
Actually, these questions are related more with Kafka-Connect than the connector.

@mmolimar mmolimar closed this as completed Jun 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants