Skip to content

Commit

Permalink
fix: exception on concurrent download of ParseFile from multiple th…
Browse files Browse the repository at this point in the history
…reads (#1180)
  • Loading branch information
rommansabbir committed Aug 25, 2022
1 parent 351858c commit 44b1914
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 58 deletions.
135 changes: 77 additions & 58 deletions parse/src/main/java/com/parse/ParseFileController.java
Expand Up @@ -12,6 +12,8 @@
import com.parse.http.ParseHttpRequest;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.json.JSONObject;

Expand All @@ -21,6 +23,7 @@ class ParseFileController {
private final Object lock = new Object();
private final ParseHttpClient restClient;
private final File cachePath;
private final List<String> currentlyDownloadedFilesNames = new ArrayList<>();

private ParseHttpClient fileClient;

Expand Down Expand Up @@ -168,64 +171,80 @@ public Task<File> fetchAsync(
if (cancellationToken != null && cancellationToken.isCancelled()) {
return Task.cancelled();
}
final File cacheFile = getCacheFile(state);
return Task.call(cacheFile::exists, ParseExecutors.io())
.continueWithTask(
task -> {
boolean result = task.getResult();
if (result) {
return Task.forResult(cacheFile);
}
if (cancellationToken != null && cancellationToken.isCancelled()) {
return Task.cancelled();
return Task.call(
() -> {
final File cacheFile = getCacheFile(state);

synchronized (lock) {
if (currentlyDownloadedFilesNames.contains(state.name())) {
while (currentlyDownloadedFilesNames.contains(state.name())) {
lock.wait();
}

// Generate the temp file path for caching ParseFile content based on
// ParseFile's url
// The reason we do not write to the cacheFile directly is because there
// is no way we can
// verify if a cacheFile is complete or not. If download is interrupted
// in the middle, next
// time when we download the ParseFile, since cacheFile has already
// existed, we will return
// this incomplete cacheFile
final File tempFile = getTempFile(state);

// network
final ParseFileRequest request =
new ParseFileRequest(
ParseHttpRequest.Method.GET, state.url(), tempFile);

// We do not need to delete the temp file since we always try to
// overwrite it
return request.executeAsync(
fileClient(),
null,
downloadProgressCallback,
cancellationToken)
.continueWithTask(
task1 -> {
// If the top-level task was cancelled, don't
// actually set the data -- just move on.
if (cancellationToken != null
&& cancellationToken.isCancelled()) {
throw new CancellationException();
}
if (task1.isFaulted()) {
ParseFileUtils.deleteQuietly(tempFile);
return task1.cast();
}

// Since we give the cacheFile pointer to
// developers, it is not safe to guarantee
// cacheFile always does not exist here, so it is
// better to delete it manually,
// otherwise moveFile may throw an exception.
ParseFileUtils.deleteQuietly(cacheFile);
ParseFileUtils.moveFile(tempFile, cacheFile);
return Task.forResult(cacheFile);
},
ParseExecutors.io());
});
}

if (cacheFile.exists()) {
return cacheFile;
} else {
currentlyDownloadedFilesNames.add(state.name());
}
}

try {
if (cancellationToken != null && cancellationToken.isCancelled()) {
throw new CancellationException();
}

// Generate the temp file path for caching ParseFile content based on
// ParseFile's url
// The reason we do not write to the cacheFile directly is because there
// is no way we can
// verify if a cacheFile is complete or not. If download is interrupted
// in the middle, next
// time when we download the ParseFile, since cacheFile has already
// existed, we will return
// this incomplete cacheFile
final File tempFile = getTempFile(state);

// network
final ParseFileRequest request =
new ParseFileRequest(
ParseHttpRequest.Method.GET, state.url(), tempFile);

// We do not need to delete the temp file since we always try to
// overwrite it
Task<Void> downloadTask =
request.executeAsync(
fileClient(),
null,
downloadProgressCallback,
cancellationToken);
ParseTaskUtils.wait(downloadTask);

// If the top-level task was cancelled, don't
// actually set the data -- just move on.
if (cancellationToken != null && cancellationToken.isCancelled()) {
throw new CancellationException();
}
if (downloadTask.isFaulted()) {
ParseFileUtils.deleteQuietly(tempFile);
throw new RuntimeException(downloadTask.getError());
}

// Since we give the cacheFile pointer to
// developers, it is not safe to guarantee
// cacheFile always does not exist here, so it is
// better to delete it manually,
// otherwise moveFile may throw an exception.
ParseFileUtils.deleteQuietly(cacheFile);
ParseFileUtils.moveFile(tempFile, cacheFile);
return cacheFile;
} finally {
synchronized (lock) {
currentlyDownloadedFilesNames.remove(state.name());
lock.notifyAll();
}
}
},
ParseExecutors.io());
}
}
64 changes: 64 additions & 0 deletions parse/src/test/java/com/parse/ParseFileControllerTest.java
Expand Up @@ -28,6 +28,8 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -341,5 +343,67 @@ public void testFetchAsyncFailure() throws Exception {
assertFalse(controller.getTempFile(state).exists());
}

@Test
public void testFetchAsyncConcurrentCallsSuccess() throws Exception {
byte[] data = "hello".getBytes();
ParseHttpResponse mockResponse =
new ParseHttpResponse.Builder()
.setStatusCode(200)
.setTotalSize((long) data.length)
.setContent(new ByteArrayInputStream(data))
.build();

ParseHttpClient fileClient = mock(ParseHttpClient.class);
when(fileClient.execute(any(ParseHttpRequest.class))).thenReturn(mockResponse);
// Make sure cache dir does not exist
File root = new File(temporaryFolder.getRoot(), "cache");
assertFalse(root.exists());
ParseFileController controller = new ParseFileController(null, root).fileClient(fileClient);
ParseFile.State state = new ParseFile.State.Builder().name("file_name").url("url").build();
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicReference<File> file1Ref = new AtomicReference<>();
AtomicReference<File> file2Ref = new AtomicReference<>();

new Thread(
() -> {
try {
file1Ref.set(
ParseTaskUtils.wait(
controller.fetchAsync(state, null, null, null)));
} catch (ParseException e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
})
.start();

new Thread(
() -> {
try {
file2Ref.set(
ParseTaskUtils.wait(
controller.fetchAsync(state, null, null, null)));
} catch (ParseException e) {
throw new RuntimeException(e);
} finally {
countDownLatch.countDown();
}
})
.start();

countDownLatch.await();

File result1 = file1Ref.get();
File result2 = file2Ref.get();

assertTrue(result1.exists());
assertEquals("hello", ParseFileUtils.readFileToString(result1, "UTF-8"));
assertTrue(result2.exists());
assertEquals("hello", ParseFileUtils.readFileToString(result2, "UTF-8"));
verify(fileClient, times(1)).execute(any(ParseHttpRequest.class));
assertFalse(controller.getTempFile(state).exists());
}

// endregion
}

0 comments on commit 44b1914

Please sign in to comment.