Skip to content

Commit

Permalink
Addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed May 8, 2024
1 parent da82bbe commit 21ff7d9
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.REPLICA1_NAME;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.REPLICA2_NAME;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.VALUE_SCHEMA;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.getMockD2ServiceDiscovery;
import static com.linkedin.venice.fastclient.meta.RequestBasedMetadataTestUtils.getMockMetaData;
import static com.linkedin.venice.utils.TestUtils.waitForNonDeterministicAssertion;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.exceptions.VeniceClientHttpException;
import com.linkedin.venice.client.schema.RouterBackedSchemaReader;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DataProviderUtils;
Expand All @@ -32,6 +41,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -344,4 +355,19 @@ public void testMetadataForwardCompat() throws IOException, InterruptedException
}
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testRequestBasedMetadataStartFailFast() throws IOException {
String storeName = "testStore";
ClientConfig clientConfig = RequestBasedMetadataTestUtils.getMockClientConfig(storeName, false, false);
D2TransportClient d2TransportClient = mock(D2TransportClient.class);
D2ServiceDiscovery d2ServiceDiscovery = getMockD2ServiceDiscovery(d2TransportClient, storeName);
VeniceClientException veniceClientException =
new VeniceClientException(new VeniceClientHttpException(HttpStatus.SC_FORBIDDEN));
doThrow(veniceClientException).when(d2TransportClient).get(anyString());
try (RequestBasedMetadata requestBasedMetadata = new RequestBasedMetadata(clientConfig, d2TransportClient)) {
requestBasedMetadata.setD2ServiceDiscovery(d2ServiceDiscovery);
Assert.assertThrows(ConfigurationException.class, requestBasedMetadata::start);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.fastclient.utils.ClientTestUtils.REQUEST_TYPES_SMALL;
import static com.linkedin.venice.fastclient.utils.ClientTestUtils.STORE_METADATA_FETCH_MODES;
import static org.testng.Assert.assertFalse;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
Expand Down Expand Up @@ -76,7 +77,15 @@ protected void prepareData() throws Exception {
storeVersionName = topic;
return null;
});
veniceCluster.updateStore(storeName, new UpdateStoreQueryParams().setReadComputationEnabled(true));
veniceCluster
.useControllerClient(
client -> assertFalse(
client
.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)
.setReadComputationEnabled(true))
.isError()));
valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.linkedin.venice.fastclient.utils.ClientTestUtils.STORE_METADATA_FETCH_MODES;
import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_KB;
import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB;
import static org.testng.Assert.assertFalse;

import com.github.luben.zstd.ZstdDictTrainer;
import com.linkedin.venice.compression.CompressionStrategy;
Expand Down Expand Up @@ -91,7 +92,15 @@ protected void prepareData() throws Exception {
byte[] compressionDictionaryBytes = trainer.trainSamples();
return ByteBuffer.wrap(compressionDictionaryBytes);
});
veniceCluster.updateStore(storeName, new UpdateStoreQueryParams().setReadComputationEnabled(true));
veniceCluster
.useControllerClient(
client -> assertFalse(
client
.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)
.setReadComputationEnabled(true))
.isError()));
valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.venice.fastclient;

import static org.testng.Assert.assertFalse;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepository;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import java.util.AbstractMap;
Expand Down Expand Up @@ -32,6 +35,15 @@ protected void prepareData() throws Exception {
storeVersionName = topic;
return null;
});
veniceCluster
.useControllerClient(
client -> assertFalse(
client
.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)
.setReadComputationEnabled(true))
.isError()));
valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_KB;
import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB;
import static org.testng.Assert.assertFalse;

import com.github.luben.zstd.ZstdDictTrainer;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepository;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -48,6 +50,15 @@ protected void prepareData() throws Exception {
byte[] compressionDictionaryBytes = trainer.trainSamples();
return ByteBuffer.wrap(compressionDictionaryBytes);
});
veniceCluster
.useControllerClient(
client -> assertFalse(
client
.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)
.setReadComputationEnabled(true))
.isError()));
valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.fastclient.stats.ClusterStats;
import com.linkedin.venice.fastclient.utils.ClientTestUtils;
Expand Down Expand Up @@ -64,7 +65,10 @@ public void setUp() throws Exception {
r2Client = ClientTestUtils.getR2Client();
d2Client = D2TestUtils.getAndStartHttpsD2Client(veniceCluster.getZk().getAddress());
storeName = veniceCluster.createStore(KEY_COUNT);

veniceCluster.useControllerClient(
client -> assertFalse(
client.updateStore(storeName, new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true))
.isError()));
keySerializer =
SerializerDeserializerFactory.getAvroGenericSerializer(Schema.parse(VeniceClusterWrapper.DEFAULT_KEY_SCHEMA));

Expand Down Expand Up @@ -127,6 +131,10 @@ public void testMetadataSchemaRetriever() {
@Test(timeOut = TIME_OUT)
public void testMetadataZstdDictionaryFetch() {
String zstdStoreName = veniceCluster.createStoreWithZstdDictionary(KEY_COUNT);
veniceCluster.useControllerClient(
client -> assertFalse(
client.updateStore(zstdStoreName, new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true))
.isError()));

ClientConfig.ClientConfigBuilder clientConfigBuilder = new ClientConfig.ClientConfigBuilder();
clientConfigBuilder.setStoreName(zstdStoreName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,15 @@ protected void prepareData() throws Exception {
VersionCreationResponse creationResponse = veniceCluster.getNewStoreVersion(KEY_SCHEMA_STR, VALUE_SCHEMA_STR);
storeVersionName = creationResponse.getKafkaTopic();
storeName = Version.parseStoreFromKafkaTopicName(storeVersionName);
veniceCluster.useControllerClient(
client -> client.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true).setReadComputationEnabled(true)));
veniceCluster
.useControllerClient(
client -> assertFalse(
client
.updateStore(
storeName,
new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)
.setReadComputationEnabled(true))
.isError()));
valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;

// TODO: Make serializers parameterized so we test them all.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.httpclient.HttpClientUtils;
import com.linkedin.venice.integration.utils.D2TestUtils;
Expand Down Expand Up @@ -220,6 +221,10 @@ public void testMetadataFetchRequest() throws ExecutionException, InterruptedExc
}

String storeName = cluster.createStore(1);
cluster.useControllerClient(
controllerClient -> Assert.assertFalse(
controllerClient.updateStore(storeName, new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true))
.isError()));
ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();

client.start();
Expand Down Expand Up @@ -305,8 +310,11 @@ public void testVeniceServerWithD2(boolean https) throws Exception {
URI requestUri =
URI.create("d2://" + d2ServiceName + "/" + QueryAction.METADATA.toString().toLowerCase() + "/" + storeName);
RestRequest request = new RestRequestBuilder(requestUri).setMethod("GET").build();
Assert.assertThrows(ExecutionException.class, () -> d2Client.restRequest(request).get());
cluster.useControllerClient(
controllerClient -> controllerClient
.updateStore(storeName, new UpdateStoreQueryParams().setStorageNodeReadQuotaEnabled(true)));
RestResponse response = d2Client.restRequest(request).get();

Assert.assertEquals(response.getStatus(), HttpStatus.SC_OK);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public MetadataResponse getMetadata(String storeName) {
Store store = storeRepository.getStoreOrThrow(storeName);
if (!store.isStorageNodeReadQuotaEnabled()) {
throw new UnsupportedOperationException(
String
.format("Fast client is not enabled for store: %s, please contact Venice team for support", storeName));
String.format(
"Fast client is not enabled for store: %s, please ensure storage node read quota is enabled for the given store",
storeName));
}
// Version metadata
int currentVersionNumber = store.getCurrentVersion();
Expand Down

0 comments on commit 21ff7d9

Please sign in to comment.