Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pixels-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@
<optional>true</optional>
</dependency>

<!--
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>netty-nio-client</artifactId>
<optional>true</optional>
</dependency>
-->

<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
*/
package io.pixelsdb.pixels.common.physical;

import io.pixelsdb.pixels.common.physical.impl.PhysicalHDFSReader;
import io.pixelsdb.pixels.common.physical.impl.PhysicalLocalReader;
import io.pixelsdb.pixels.common.physical.impl.PhysicalS3Reader;
import io.pixelsdb.pixels.common.physical.io.PhysicalHDFSReader;
import io.pixelsdb.pixels.common.physical.io.PhysicalLocalReader;
import io.pixelsdb.pixels.common.physical.io.PhysicalS3Reader;

import java.io.IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
*/
package io.pixelsdb.pixels.common.physical;

import io.pixelsdb.pixels.common.physical.impl.PhysicalHDFSWriter;
import io.pixelsdb.pixels.common.physical.impl.PhysicalLocalWriter;
import io.pixelsdb.pixels.common.physical.impl.PhysicalS3Writer;
import io.pixelsdb.pixels.common.physical.io.PhysicalHDFSWriter;
import io.pixelsdb.pixels.common.physical.io.PhysicalLocalWriter;
import io.pixelsdb.pixels.common.physical.io.PhysicalS3Writer;

import java.io.IOException;

Expand All @@ -38,7 +38,7 @@ private PhysicalWriterUtil()
}

public static PhysicalWriter newPhysicalWriter(Storage storage, String path, long blockSize,
short replication, boolean addBlockPadding) throws IOException
short replication, boolean addBlockPadding) throws IOException
{
checkArgument(storage != null, "storage should not be null");
checkArgument(path != null, "path should not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.scheduler;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
package io.pixelsdb.pixels.common.physical;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.scheduler;
package io.pixelsdb.pixels.common.physical;

import io.pixelsdb.pixels.common.physical.scheduler.NoopScheduler;
import io.pixelsdb.pixels.common.physical.scheduler.RateLimitedScheduler;
import io.pixelsdb.pixels.common.physical.scheduler.SortMergeScheduler;
import io.pixelsdb.pixels.common.utils.ConfigFactory;

/**
Expand Down Expand Up @@ -47,13 +50,13 @@ private SchedulerFactory()
{
// Add more schedulers here.
case "noop":
scheduler = new NoopScheduler();
scheduler = NoopScheduler.Instance();
break;
case "sortmerge":
scheduler = new SortMergeScheduler();
scheduler = SortMergeScheduler.Instance();
break;
case "ratelimited":
scheduler = new RateLimitedScheduler();
scheduler = RateLimitedScheduler.Instance();
break;
default:
throw new UnsupportedOperationException("The read request scheduler '" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
*/
package io.pixelsdb.pixels.common.physical;

import io.pixelsdb.pixels.common.physical.impl.HDFS;
import io.pixelsdb.pixels.common.physical.impl.LocalFS;
import io.pixelsdb.pixels.common.physical.impl.S3;
import io.pixelsdb.pixels.common.physical.storage.HDFS;
import io.pixelsdb.pixels.common.physical.storage.LocalFS;
import io.pixelsdb.pixels.common.physical.storage.S3;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.HDFS;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalWriter;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.HDFS;
import io.pixelsdb.pixels.common.utils.Constants;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.logging.log4j.LogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.LocalFS;

import java.io.IOException;
import java.io.RandomAccessFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalWriter;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.LocalFS;
import io.pixelsdb.pixels.common.utils.Constants;

import java.io.DataOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.S3;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -49,6 +50,8 @@ public class PhysicalS3Reader implements PhysicalReader
private static boolean enableAsync = false;
private static boolean useAsyncClient = false;
private static final ExecutorService clientService;
private final static int LEN_1M = 1024*1024;
private final static int LEN_10M = 1024*1024*10;

static
{
Expand All @@ -64,6 +67,8 @@ public class PhysicalS3Reader implements PhysicalReader
private long length;
private S3Client client;
private S3AsyncClient asyncClient;
private S3AsyncClient asyncClient1M;
private S3AsyncClient asyncClient10M;

public PhysicalS3Reader(Storage storage, String path) throws IOException
{
Expand All @@ -87,6 +92,15 @@ public PhysicalS3Reader(Storage storage, String path) throws IOException
this.position = new AtomicLong(0);
this.client = this.s3.getClient();
this.asyncClient = this.s3.getAsyncClient();
if (S3.isRequestDiversionEnabled())
{
this.asyncClient1M = this.s3.getAsyncClient1M();
this.asyncClient10M = this.s3.getAsyncClient10M();
}
else
{
this.asyncClient1M = this.asyncClient10M = null;
}
enableAsync = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("s3.enable.async"));
useAsyncClient = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("s3.use.async.client"));
if (!useAsyncClient)
Expand Down Expand Up @@ -178,7 +192,23 @@ public CompletableFuture<ByteBuffer> readAsync(long offset, int len) throws IOEx
CompletableFuture<ResponseBytes<GetObjectResponse>> future;
if (useAsyncClient)
{
future = asyncClient.getObject(request, AsyncResponseTransformer.toBytes());
if (S3.isRequestDiversionEnabled())
{
if (len < LEN_1M)
{
future = asyncClient.getObject(request, AsyncResponseTransformer.toBytes());
} else if (len < LEN_10M)
{
future = asyncClient1M.getObject(request, AsyncResponseTransformer.toBytes());
} else
{
future = asyncClient10M.getObject(request, AsyncResponseTransformer.toBytes());
}
}
else
{
future = asyncClient.getObject(request, AsyncResponseTransformer.toBytes());
}
}
else
{
Expand All @@ -192,6 +222,11 @@ public CompletableFuture<ByteBuffer> readAsync(long offset, int len) throws IOEx

try
{
/**
* Issue #128:
* We tried to use thenApplySync using the clientService executor,
* it does not help improving the query performance.
*/
return future.thenApply(resp ->
{
if (resp != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import io.pixelsdb.pixels.common.physical.PhysicalWriter;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.storage.S3;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.sync.ResponseTransformer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.impl;
package io.pixelsdb.pixels.common.physical.io;

import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.pixelsdb.pixels.common.physical.scheduler;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Scheduler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -40,8 +41,18 @@
public class NoopScheduler implements Scheduler
{
private static Logger logger = LogManager.getLogger(NoopScheduler.class);
private static NoopScheduler instance;

NoopScheduler() {}
public static Scheduler Instance()
{
if (instance == null)
{
instance = new NoopScheduler();
}
return instance;
}

protected NoopScheduler() {}

@Override
public void executeBatch(PhysicalReader reader, RequestBatch batch) throws IOException
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,27 @@
/*
* Copyright 2021 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.physical.scheduler;

import com.google.common.util.concurrent.RateLimiter;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Scheduler;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
Expand All @@ -23,13 +43,23 @@
public class RateLimitedScheduler extends SortMergeScheduler
{
private static Logger logger = LogManager.getLogger(RateLimitedScheduler.class);
private static RateLimitedScheduler instance;

public static Scheduler Instance()
{
if (instance == null)
{
instance = new RateLimitedScheduler();
}
return instance;
}

private RateLimiter mbpsRateLimiter;
private RateLimiter rpsRateLimiter;
private RetryPolicy retryPolicy;
private final boolean enableRetry;

RateLimitedScheduler()
protected RateLimitedScheduler()
{
ConfigFactory.Instance().registerUpdateCallback("read.request.rate.limit.mbps", value ->
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.pixelsdb.pixels.common.physical.scheduler;

import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.Scheduler;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -44,12 +45,22 @@
public class SortMergeScheduler implements Scheduler
{
private static Logger logger = LogManager.getLogger(SortMergeScheduler.class);
private static SortMergeScheduler instance;
private static int MaxGap;

public static Scheduler Instance()
{
if (instance == null)
{
instance = new SortMergeScheduler();
}
return instance;
}

private RetryPolicy retryPolicy;
private final boolean enableRetry;

SortMergeScheduler()
protected SortMergeScheduler()
{
this.enableRetry = Boolean.parseBoolean(ConfigFactory.Instance().getProperty("read.request.enable.retry"));
if (this.enableRetry)
Expand Down Expand Up @@ -167,8 +178,8 @@ protected class MergedRequest
private List<Integer> lengths;
private List<CompletableFuture<ByteBuffer>> futures;
// fields used by the retry policy.
protected long startTimeMs = -1;
protected long completeTimeMs = -1;
protected volatile long startTimeMs = -1;
protected volatile long completeTimeMs = -1;
private int retried = 0;

public MergedRequest(RequestFuture first)
Expand Down
Loading