Skip to content

Commit

Permalink
Review changes, use streaming and JSON array
Browse files Browse the repository at this point in the history
  • Loading branch information
EricBorczuk committed Jun 23, 2021
1 parent dd5b208 commit 2304be2
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 117 deletions.
@@ -1,14 +1,19 @@
package io.stargate.web.docsapi.models;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class MultiDocsResponse {
@JsonProperty("documentIds")
List<String> documentIds;

@JsonProperty("profile")
ExecutionProfile profile;

@ApiModelProperty(value = "The ids of the documents created, in order of creation")
public List<String> getDocumentIds() {
return documentIds;
Expand All @@ -19,8 +24,16 @@ public MultiDocsResponse setDocumentIds(List<String> documentIds) {
return this;
}

@ApiModelProperty("Profiling information related to the execution of the request (optional)")
public ExecutionProfile getProfile() {
return profile;
}

@JsonCreator
public MultiDocsResponse(@JsonProperty("documentIds") final List<String> documentIds) {
public MultiDocsResponse(
@JsonProperty("documentIds") final List<String> documentIds,
@JsonProperty("profile") ExecutionProfile profile) {
this.documentIds = documentIds;
this.profile = profile;
}
}
Expand Up @@ -208,9 +208,7 @@ public Response writeManyDocs(
String namespace,
@ApiParam(value = "the name of the collection", required = true) @PathParam("collection-id")
String collection,
@ApiParam(
value = "A JSON Lines payload where each line is a document to write",
required = true)
@ApiParam(value = "A JSON array where each element is a document to write", required = true)
@NonNull
InputStream payload,
@ApiParam(
Expand All @@ -219,12 +217,14 @@ public Response writeManyDocs(
required = false)
@QueryParam("id-path")
String idPath,
@QueryParam("profile") Boolean profile,
@Context HttpServletRequest request) {
// This route does nearly the same thing as PUT, except that it assigns an ID for the requester
// And returns it as a Location header/in JSON body
logger.debug("Batch Write: Collection = {}", collection);
return handle(
() -> {
ExecutionContext context = ExecutionContext.create(profile);
List<String> idsCreated =
documentService.writeManyDocs(
authToken,
Expand All @@ -233,12 +233,14 @@ public Response writeManyDocs(
payload,
Optional.ofNullable(idPath),
dbFactory,
context,
getAllHeaders(request));

return Response.created(
URI.create(
String.format("/v2/namespaces/%s/collections/%s", namespace, collection)))
.entity(mapper.writeValueAsString(new MultiDocsResponse(idsCreated)))
.entity(
mapper.writeValueAsString(new MultiDocsResponse(idsCreated, context.toProfile())))
.build();
});
}
Expand Down
@@ -1,6 +1,8 @@
package io.stargate.web.docsapi.service;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -391,8 +393,9 @@ private ImmutablePair<List<Object[]>, List<String>> shredForm(
return ImmutablePair.of(bindVariableList, firstLevelKeys);
}

private String convertToJsonPtr(String path) {
return "/" + path.replaceAll(PERIOD_PATTERN.pattern(), "/").replaceAll("\\[(\\d+)\\]", "$1");
private Optional<String> convertToJsonPtr(Optional<String> path) {
return path.map(
p -> "/" + p.replaceAll(PERIOD_PATTERN.pattern(), "/").replaceAll("\\[(\\d+)\\]", "$1"));
}

private DocumentDB maybeCreateTableAndIndexes(
Expand Down Expand Up @@ -420,6 +423,7 @@ public List<String> writeManyDocs(
InputStream payload,
Optional<String> idPath,
Db dbFactory,
ExecutionContext context,
Map<String, String> headers)
throws IOException, UnauthorizedException {

Expand All @@ -428,81 +432,68 @@ public List<String> writeManyDocs(

db = maybeCreateTableAndIndexes(dbFactory, db, keyspace, collection, headers, authToken);
List<String> idsWritten = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(payload, "UTF-8"))) {
Iterator<String> iter = reader.lines().iterator();
ExecutionContext context = ExecutionContext.NOOP_CONTEXT;
String docsPath = convertToJsonPtr(idPath.get());
int chunkIndex = 0;
final int CHUNK_SIZE = 250;
while (iter.hasNext()) {
chunkIndex++;
Map<String, String> docsInChunk = new LinkedHashMap<>();
while (docsInChunk.size() < CHUNK_SIZE && iter.hasNext()) {
String doc = iter.next();
JsonNode json = mapper.readTree(doc);

String docId = UUID.randomUUID().toString();
if (idPath.isPresent()) {
if (!json.at(docsPath).isTextual()) {
throw new ErrorCodeRuntimeException(
ErrorCode.DOCS_API_WRITE_BATCH_INVALID_ID_PATH,
String.format(
"Json Document %s requires a String value at the path %s, found %s."
+ " Batch %d failed, %d writes were successful. Repeated requests are "
+ "idempotent if the same `idPath` is defined.",
doc,
idPath.get(),
json.at(docsPath).toString(),
chunkIndex,
(chunkIndex - 1) * CHUNK_SIZE));
}
docId = json.requiredAt(docsPath).asText();
}
docsInChunk.put(docId, doc);
}
try (JsonParser jsonParser = mapper.getFactory().createParser(payload)) {
Optional<String> docsPath = convertToJsonPtr(idPath);

// Write the chunk of (at most) 250 documents by firing a single batch with the row inserts
List<Object[]> bindVariableList = new ArrayList<>();
DocumentDB finalDb = db;
List<String> ids =
docsInChunk.entrySet().stream()
.map(
data -> {
bindVariableList.addAll(
shredJson(
surfer,
finalDb,
Collections.emptyList(),
data.getKey(),
data.getValue(),
false)
.left);
return data.getKey();
})
.collect(Collectors.toList());
Map<String, String> docs = new LinkedHashMap<>();
if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
throw new IllegalArgumentException("Payload must be an array.");
}

long now = timeSource.currentTimeMicros();
try {
db.deleteManyThenInsertBatch(
keyspace,
collection,
ids,
bindVariableList,
Collections.emptyList(),
now,
context.nested("ASYNC INSERT"));
} catch (Exception e) {
throw new ErrorCodeRuntimeException(
ErrorCode.DOCS_API_WRITE_BATCH_FAILED,
String.format(
"Batch %d failed to write, %d document writes were successful. Repeated requests are "
+ "idempotent if the same `idPath` is defined. If not, you should remove "
+ "the first %d created elements from your initial request to avoid duplication.",
chunkIndex, (chunkIndex - 1) * CHUNK_SIZE, (chunkIndex - 1) * CHUNK_SIZE));
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
JsonNode json = mapper.readTree(jsonParser);
String docId;
if (idPath.isPresent()) {
if (!json.at(docsPath.get()).isTextual()) {
throw new ErrorCodeRuntimeException(
ErrorCode.DOCS_API_WRITE_BATCH_INVALID_ID_PATH,
String.format(
"Json Document %s requires a String value at the path %s, found %s."
+ " Batch write failed.",
json, idPath.get(), json.at(docsPath.get()).toString()));
}
docId = json.requiredAt(docsPath.get()).asText();
} else {
docId = UUID.randomUUID().toString();
}
docs.put(docId, json.toString());
}

idsWritten.addAll(ids);
// Write the chunk of (at most) 250 documents by firing a single batch with the row inserts
List<Object[]> bindVariableList = new ArrayList<>();
DocumentDB finalDb = db;
List<String> ids =
docs.entrySet().stream()
.map(
data -> {
bindVariableList.addAll(
shredJson(
surfer,
finalDb,
Collections.emptyList(),
data.getKey(),
data.getValue(),
false)
.left);
return data.getKey();
})
.collect(Collectors.toList());

long now = timeSource.currentTimeMicros();
try {
db.deleteManyThenInsertBatch(
keyspace,
collection,
ids,
bindVariableList,
Collections.emptyList(),
now,
context.nested("ASYNC INSERT"));
} catch (Exception e) {
throw new ErrorCodeRuntimeException(ErrorCode.DOCS_API_WRITE_BATCH_FAILED);
}

idsWritten.addAll(ids);
}
return idsWritten;
}
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void postMultiDoc_success() throws JsonProcessingException {

Response r =
documentResourceV2.writeManyDocs(
headers, ui, authToken, keyspace, collection, payload, null, httpServletRequest);
headers, ui, authToken, keyspace, collection, payload, null, false, httpServletRequest);

assertThat(r.getStatus()).isEqualTo(201);
mapper.readTree((String) r.getEntity()).requiredAt("/documentIds");
Expand Down
Expand Up @@ -53,6 +53,7 @@
import io.stargate.web.docsapi.dao.DocumentDB;
import io.stargate.web.docsapi.models.DocumentResponseWrapper;
import io.stargate.web.docsapi.models.ImmutableExecutionProfile;
import io.stargate.web.docsapi.models.MultiDocsResponse;
import io.stargate.web.docsapi.models.QueryInfo;
import io.stargate.web.docsapi.resources.DocumentResourceV2;
import io.stargate.web.resources.Db;
Expand Down Expand Up @@ -731,20 +732,27 @@ void testGetDocPathUnauthorized() throws UnauthorizedException {
@Test
void testWriteManyDocs() throws UnauthorizedException, IOException {
ByteArrayInputStream in =
new ByteArrayInputStream("{\"a\":\"b\"}".getBytes(StandardCharsets.UTF_8));
new ByteArrayInputStream("[{\"a\":\"b\"}]".getBytes(StandardCharsets.UTF_8));
now.set(200);
withQuery(table, "DELETE FROM %s USING TIMESTAMP ? WHERE key = ?", 199L, "b")
.returningNothing();
withQuery(table, insert, fillParams(70, "b", "a", SEPARATOR, "a", "b", null, null, 200L))
.returningNothing();
service.writeManyDocs(
authToken, keyspace.name(), table.name(), in, Optional.of("a"), db, Collections.emptyMap());
authToken,
keyspace.name(),
table.name(),
in,
Optional.of("a"),
db,
ExecutionContext.NOOP_CONTEXT,
Collections.emptyMap());
}

@Test
void testWriteManyDocs_invalidIdPath() {
ByteArrayInputStream in =
new ByteArrayInputStream("{\"a\":\"b\"}".getBytes(StandardCharsets.UTF_8));
new ByteArrayInputStream("[{\"a\":\"b\"}]".getBytes(StandardCharsets.UTF_8));
assertThatThrownBy(
() ->
service.writeManyDocs(
Expand All @@ -754,9 +762,10 @@ void testWriteManyDocs_invalidIdPath() {
in,
Optional.of("no.good"),
db,
ExecutionContext.NOOP_CONTEXT,
Collections.emptyMap()))
.hasMessage(
"Json Document {\"a\":\"b\"} requires a String value at the path no.good, found . Batch 1 failed, 0 writes were successful. Repeated requests are idempotent if the same `idPath` is defined.");
"Json Document {\"a\":\"b\"} requires a String value at the path no.good, found . Batch write failed.");
}

@Nested
Expand Down Expand Up @@ -899,7 +908,7 @@ void searchWithDocId() throws JsonProcessingException {
}

@Test
void putDoc() throws UnauthorizedException, JsonProcessingException {
void putDoc() throws JsonProcessingException {
when(headers.getHeaderString(eq(HttpHeaders.CONTENT_TYPE))).thenReturn("application/json");

String delete = "DELETE FROM test_docs.collection1 USING TIMESTAMP ? WHERE key = ?";
Expand Down Expand Up @@ -931,6 +940,43 @@ void putDoc() throws UnauthorizedException, JsonProcessingException {
.build());
}

@Test
void putManyDocs() throws JsonProcessingException {
String delete = "DELETE FROM test_docs.collection1 USING TIMESTAMP ? WHERE key = ?";
withQuery(table, delete, 199L, "123").returningNothing();
withQuery(table, delete, 199L, "234").returningNothing();
withQuery(table, insert, fillParams(70, "123", "a", SEPARATOR, "a", "123", null, null, 200L))
.returningNothing();
withQuery(table, insert, fillParams(70, "234", "a", SEPARATOR, "a", "234", null, null, 200L))
.returningNothing();

now.set(200);
Response r =
resource.writeManyDocs(
headers,
uriInfo,
authToken,
keyspace.name(),
table.name(),
new ByteArrayInputStream("[{\"a\":\"123\"},{\"a\":\"234\"}]".getBytes()),
"a",
true,
request);
@SuppressWarnings("unchecked")
MultiDocsResponse mdr = mapper.readValue((String) r.getEntity(), MultiDocsResponse.class);
assertThat(mdr.getProfile())
.isEqualTo(
ImmutableExecutionProfile.builder()
.description("root")
.addNested(
ImmutableExecutionProfile.builder()
.description("ASYNC INSERT")
// row count for DELETE is not known
.addQueries(QueryInfo.of(insert, 2, 2), QueryInfo.of(delete, 2, 0))
.build())
.build());
}

@Test
void patchDoc() throws JsonProcessingException {
when(headers.getHeaderString(eq(HttpHeaders.CONTENT_TYPE))).thenReturn("application/json");
Expand Down

0 comments on commit 2304be2

Please sign in to comment.