Skip to content

Getting a "java.lang.OutOfMemoryError: Java heap space" error after tasks have been running for a while #283

@kurtis-ley

Description

@kurtis-ley

Hi,

I use FilePulse version 2.4.3. to read JSON files from a GCS bucket. These files are 15-20 MB in size and a new one appears every 2 minutes. The JSON files are an array of documents that I need to split out into separate filesThe connector processes the files with no problem, however, when the connector is sitting more idle, the tasks will crash with a "java.lang.OutOfMemoryError: Java heap space" error.

I don't believe this is a lack of memory issue. The JVM is currently set to "-Xms12G" "-Xmx12G" and I'm running it on nodes with 32G. We previously had the JVM set to 18G and the same error occurs. I created a brand new connector to start from scratch and process all 12,000 of those JSON files in the GCS bucket, and they were processed with no issue. Eventually, the new connector failed when the tasks sat a little more idle. The only way that I can prevent the tasks from crashing is by rebuilding the Confluent Connect pods. I've moved the max.tasks down to 1 and the same problem occurs. I don't believe it's an issue with the files themselves because if I rebuild the pods, the files are able to be processed.

The only other contextual clue I have is that I can watch the memory of the pods' memory climb from 2 GB to the 12GB max on one node over time, eventually migrate over to a new pod, and repeat the same process. Eventually, all of the pods with be utilizing 12 GB of memory, even if the FilePulse tasks aren't using those pods. Sounds like a problem with rebalancing and cleaning up old memory.

Sometimes a stack trace is emitted, and sometimes it isn't. Here's an instance of some stack traces I have received:

java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
	at java.base/java.lang.StringCoding.decodeUTF8_0(StringCoding.java:891)
	at java.base/java.lang.StringCoding.decodeUTF8(StringCoding.java:725)
	at java.base/java.lang.StringCoding.decode(StringCoding.java:257)
	at java.base/java.lang.String.<init>(String.java:507)
	at java.base/java.lang.String.<init>(String.java:561)
	at io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.extractJsonField(JSONFilter.java:127)
	at io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter.apply(JSONFilter.java:65)
	at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:43)
	at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:160)
	at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:132)
	at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:100)
	at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:176)
	at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:307)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:263)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Here's the config for the connector:

{
  "name": "filepulse-gcs-fourd-journals5",
  "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "tasks.max": "3",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "max.request.size": "8388608",
  "topic": "fourd.journals",
  "topic.creation.groups": "",
  "topic.creation.default.partitions": "6",
  "topic.creation.default.replication.factor": "3",
  "topic.creation.default.max.message.bytes": "8388608",
  "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.GcsFileSystemListing",
  "gcs.credentials.json": "$MIS_GCS_READER",
  "gcs.bucket.name": "$GCS_BUCKET",
  "gcs.blobs.filter.prefix": "",
  "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
  "fs.listing.interval.ms": "30000",
  "fs.listing.task.delegation.enabled": "false",
  "max.scheduled.files": "1000",
  "ignore.committed.offsets": "false",
  "file.filter.regex.pattern": ".*JournalExport.*\\.json",
  "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.GcsBytesArrayInputReader",
  "offset.policy.class": "io.streamthoughts.kafka.connect.filepulse.offset.DefaultSourceOffsetPolicy",
  "offset.attributes.string": "uri",
  "tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
  "tasks.file.status.storage.topic": "__filepulse-status",
  "tasks.file.status.storage.bootstrap.servers": "$BOOTSTRAP_SERVER",
  "tasks.file.status.storage.topic.partitions": "10",
  "tasks.file.status.storage.topic.replication.factor": "3",
  "tasks.file.status.storage.producer.request.timeout.ms": "20000",
  "tasks.file.status.storage.consumer.request.timeout.ms": "20000",
  "tasks.file.status.storage.producer.security.protocol": "SASL_SSL",
  "tasks.file.status.storage.consumer.security.protocol": "SASL_SSL",
  "tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm": "https",
  "tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm": "https",
  "tasks.file.status.storage.producer.sasl.mechanism": "PLAIN",
  "tasks.file.status.storage.consumer.sasl.mechanism": "PLAIN",
  "tasks.file.status.storage.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$CLUSTER_KEY\" password=\"$CLUSTER_SECRET\";",
  "tasks.file.status.storage.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$CLUSTER_KEY\" password=\"$CLUSTER_SECRET\";",
  "task.partitioner.class": "io.streamthoughts.kafka.connect.filepulse.source.DefaultTaskPartitioner",
  "tasks.halt.on.error": "false",
  "tasks.empty.poll.wait.ms": "500",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
  "fs.cleanup.policy.triggered.on": "COMPLETED",
  "filters": "ParseJSONArray, ExplodeJSONArray, CopyMessageToRoot, DeleteMessage, CopyFieldsData, ConvertFieldsData",
  "filters.ParseJSONArray.type": "io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
  "filters.ParseJSONArray.charset": "UTF-8",
  "filters.ParseJSONArray.source": "message",
  "filters.ParseJSONArray.explode.array": "false",
  "filters.ParseJSONArray.merge": "false",
  "filters.ExplodeJSONArray.type": "io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter",
  "filters.ExplodeJSONArray.source": "message",
  "filters.CopyMessageToRoot.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.CopyMessageToRoot.field": "$value",
  "filters.CopyMessageToRoot.value": "{{ $value.message }}",
  "filters.CopyMessageToRoot.overwrite": "true",
  "filters.DeleteMessage.type": "io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
  "filters.DeleteMessage.fields": "message",
  "filters.CopyFieldsData.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.CopyFieldsData.if": "{{ and(exists($value, 'fields'), exists($value.fields, 'Data')) }}",
  "filters.CopyFieldsData.field": "$value.fields.ParentData",
  "filters.CopyFieldsData.value": "{{ $value.fields.Data }}",
  "filters.CopyFieldsData.overwrite": "true",
  "filters.ConvertFieldsData.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
  "filters.ConvertFieldsData.if": "{{ and(exists($value, 'fields'), exists($value.fields, 'Data')) }}",
  "filters.ConvertFieldsData.field": "$value.fields.Data",
  "filters.ConvertFieldsData.value": "",
  "filters.ConvertFieldsData.overwrite": "true"
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    wontfixThis will not be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions