From 9cdccad5f0c550bfc59650a0d5fff288eccf0805 Mon Sep 17 00:00:00 2001 From: Mathias Herberts Date: Fri, 23 Feb 2024 11:47:24 +0100 Subject: [PATCH 1/2] Replaced call to acquire by a call to tryAcquire which has not changed its signature since old versions. Otherwise call to acquire may fail in Spark --- .../src/main/java/io/warp10/hadoop/Warp10RecordWriter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java b/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java index 18a0cea95..db78bc50e 100644 --- a/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java +++ b/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java @@ -1,5 +1,5 @@ // -// Copyright 2018-2023 SenX S.A.S. +// Copyright 2018-2024 SenX S.A.S. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.net.URL; import java.util.HashMap; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; import org.apache.hadoop.io.BytesWritable; @@ -136,7 +137,7 @@ public void write(Writable key, Writable value) throws IOException, InterruptedE while (decoder.next()) { if (null != this.limiter) { - this.limiter.acquire(1); + this.limiter.tryAcquire(1, Long.MAX_VALUE, TimeUnit.MICROSECONDS); } if (!first) { From 179c253b6a611229903e56c477238907793bbfa8 Mon Sep 17 00:00:00 2001 From: Mathias Herberts Date: Wed, 28 Feb 2024 14:37:21 +0100 Subject: [PATCH 2/2] Added throwing of IOException when rate is too low --- warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java b/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java index db78bc50e..976f8b5cc 100644 --- a/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java +++ b/warp10/src/main/java/io/warp10/hadoop/Warp10RecordWriter.java @@ -137,7 +137,9 @@ public void write(Writable key, Writable value) throws IOException, InterruptedE while (decoder.next()) { if (null != this.limiter) { - this.limiter.tryAcquire(1, Long.MAX_VALUE, TimeUnit.MICROSECONDS); + if (!this.limiter.tryAcquire(1, Long.MAX_VALUE, TimeUnit.MICROSECONDS)) { + throw new IOException("The configured write rate will not allow the data to be written to Warp 10, consider increasing the provided value."); + } } if (!first) {