-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
FileMoveProvider.java
182 lines (167 loc) · 6.97 KB
/
FileMoveProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
* Copyright (c) 2002-2018 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j Enterprise Edition. The included source
* code can be redistributed and/or modified under the terms of the
* GNU AFFERO GENERAL PUBLIC LICENSE Version 3
* (http://www.fsf.org/licensing/licenses/agpl-3.0.html) with the
* Commons Clause, as found in the associated LICENSE.txt file.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* Neo4j object code can be licensed independently from the source
* under separate terms from the AGPL. Inquiries can be directed to:
* licensing@neo4j.com
*
* More information is also available at:
* https://neo4j.com/licensing/
*/
package org.neo4j.com.storecopy;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.store.StoreType;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.empty;
public class FileMoveProvider
{
private final FileMoveActionInformer fileMoveActionInformer;
private final PageCache pageCache;
private final FileSystemAbstraction fs;
public FileMoveProvider( PageCache pageCache, FileSystemAbstraction fs )
{
this( pageCache, StoreType::canBeManagedByPageCache, fs );
}
public FileMoveProvider( PageCache pageCache, FileMoveActionInformer fileMoveActionInformer,
FileSystemAbstraction fs )
{
this.pageCache = pageCache;
this.fileMoveActionInformer = fileMoveActionInformer;
this.fs = fs;
}
/**
* Construct a stream of files that are to be moved
*
* @param dir the source location of the move action
* @return a stream of the entire contents of the source location that can be applied to a target location to
* perform a move
*/
public Stream<FileMoveAction> traverseForMoving( File dir )
{
return traverseForMoving( dir, dir );
}
/**
* Copies <b>the contents</b> from the directory to the base target path.
* <p>
* This is confusing, so here is an example
* <p>
* <p>
* <code>
* +Parent<br>
* |+--directoryA<br>
* |...+--fileA<br>
* |...+--fileB<br>
* </code>
* <p>
* Suppose we want to move to move <b>Parent/directoryA</b> to <b>Parent/directoryB</b>.<br>
* <p>
* <code>
* File directoryA = new File("Parent/directoryA");<br>
* Stream<FileMoveAction> fileMoveActions = new FileMoveProvider(pageCache).traverseGenerateMoveActions
* (directoryA, directoryA);<br>
* </code>
* </p>
* In the above we clearly generate actions for moving all the files contained in directoryA. directoryA is
* mentioned twice due to a implementation detail,
* hence the public method with only one parameter. We then actually perform the moves by applying the base
* target directory that we want to move to.
* <p>
* <code>
* File directoryB = new File("Parent/directoryB");<br>
* fileMoveActions.forEach( action -> action.move( directoryB ) );
* </code>
* </p>
*
* @param dir this directory and all the child paths under it are subject to move
* @param basePath this is the parent of your intended target directory.
* @return a stream of individual move actions which can be iterated and applied whenever
*/
private Stream<FileMoveAction> traverseForMoving( File dir, File basePath )
{
// Note that flatMap is an *intermediate operation* and therefor always lazy.
// It is very important that the stream we return only *lazily* calls out to expandTraverseFiles!
return Stream.of( dir ).flatMap( d -> expandTraverseFiles( d, basePath ) );
}
private Stream<FileMoveAction> expandTraverseFiles( File dir, File basePath )
{
List<File> listing = listFiles( dir );
if ( listing == null )
{
// This happens if what we were given as 'dir' is not actually a directory, but a single specific file.
// In that case, we will produce a stream of a single FileMoveAction for that file.
listing = Collections.singletonList( dir );
// This also means that the base path is currently the same as the file itself, which is wrong.
// We change the base path to be the parent directory of the file, so that we can relativise the filename
// correctly later.
basePath = dir.getParentFile();
}
File base = basePath; // Capture effectively-final base path snapshot.
Stream<File> files = listing.stream().filter( this::isFile );
Stream<File> dirs = listing.stream().filter( this::isDirectory );
Stream<FileMoveAction> moveFiles = files.map( f -> copyFileCorrectly( f, base ) );
Stream<FileMoveAction> traverseDirectories = dirs.flatMap( d -> traverseForMoving( d, base ) );
return Stream.concat( moveFiles, traverseDirectories );
}
private boolean isFile( File file )
{
if ( fileMoveActionInformer.shouldBeManagedByPageCache( file.getName() ) )
{
return !pageCache.getCachedFileSystem().isDirectory( file );
}
return !fs.isDirectory( file );
}
private boolean isDirectory( File file )
{
if ( fileMoveActionInformer.shouldBeManagedByPageCache( file.getName() ) )
{
return pageCache.getCachedFileSystem().isDirectory( file );
}
return fs.isDirectory( file );
}
private List<File> listFiles( File dir )
{
File[] cachedFiles = pageCache.getCachedFileSystem().listFiles( dir );
File[] fsaFiles = fs.listFiles( dir );
if ( cachedFiles == null && fsaFiles == null )
{
// This probably means 'dir' is actually a file, or it does not exist.
return null;
}
Stream<File> files = Stream.concat(
ofNullable( cachedFiles ).map( Arrays::stream ).orElse( empty() ),
ofNullable( fsaFiles ).map( Arrays::stream ).orElse( empty() ) );
return files.distinct().collect( toList() );
}
/**
* Some files are handled via page cache for CAPI flash, others are only used on the default file system. This
* contains the logic for handling files between
* the 2 systems
*/
private FileMoveAction copyFileCorrectly( File fileToMove, File basePath )
{
if ( fileMoveActionInformer.shouldBeManagedByPageCache( fileToMove.getName() ) )
{
return FileMoveAction.copyViaPageCache( fileToMove, pageCache );
}
return FileMoveAction.copyViaFileSystem( fileToMove, basePath );
}
}