Skip to content

Commit

Permalink
NIFI-8200 Modified PutAzureDataLakeStorage to delete temp file if exc…
Browse files Browse the repository at this point in the history
…eption was thrown in uploadContent()

This closes apache#4815

Signed-off-by: Joey Frazee <jfrazee@apache.org>
  • Loading branch information
timeabarna authored and krisztina-zsihovszki committed Jun 27, 2022
1 parent 9670f80 commit a45909b
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (length > 0) {
try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) {
uploadContent(fileClient, bufferedIn, length);
} catch (Exception e) {
removeTempFile(fileClient);
throw e;
}
}

Expand Down Expand Up @@ -162,6 +165,14 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private void removeTempFile(DataLakeFileClient fileClient) {
try {
fileClient.delete();
} catch (Exception e) {
getLogger().error("Error while removing temp file on Azure Data Lake Storage", e);
}
}

@VisibleForTesting
static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
long chunkStart = 0;
Expand All @@ -181,4 +192,4 @@ static void uploadContent(DataLakeFileClient fileClient, InputStream in, long le

fileClient.flush(length);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutAzureDataLakeStorage</title>
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>

<body>

<p>
This processor is responsible for uploading files to Azure Data Lake Storage Gen2.
</p>

<h3>File uploading and cleanup process</h3>

<h4>New file</h4>

<ol>
<li>An empty file is created.</li>
<li>Content is appended to file.</li>
<li>In case append failure the file is deleted.</li>
<li>In case file deletion failure the empty file remains on the server.</li>
</ol>

<h4>Existing file</h4>

<ul>
<li>Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.</li>
<li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li>
<li>Processors with "replace" conflict resolution strategy:</li>

<ol>
<li>An empty file overwrites the existing file, the original file is lost.</li>
<li>Content is appended to file.</li>
<li>In case append failure the file is deleted.</li>
<li>In case file deletion failure the empty file remains on the server.</li>
</ol>
</ul>

</body>
</html>

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.Ignore;
import org.junit.Test;

import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,6 +44,11 @@
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {

Expand Down Expand Up @@ -241,6 +247,17 @@ public void testPutFileWithELButFileNameIsNotSpecified() {
assertFailure();
}

@Test(expected = NullPointerException.class)
public void testPutFileButFailedToAppend() {
DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
InputStream stream = mock(InputStream.class);
doThrow(NullPointerException.class).when(fileClient).append(any(InputStream.class), anyLong(), anyLong());

PutAzureDataLakeStorage.uploadContent(fileClient, stream, FILE_DATA.length);

verify(fileClient).delete();
}

private Map<String, String> createAttributesMap() {
Map<String, String> attributes = new HashMap<>();

Expand Down

0 comments on commit a45909b

Please sign in to comment.