Skip to content

Commit

Permalink
Issue #2874 new pages are not allowed to be created by load method.
Browse files Browse the repository at this point in the history
  • Loading branch information
laa committed Feb 17, 2015
1 parent 53dfc8d commit 3ad538e
Show file tree
Hide file tree
Showing 22 changed files with 368 additions and 172 deletions.
Expand Up @@ -55,6 +55,10 @@ public void sendShutdown() {
shutdownFlag = true;
}

public boolean isShutdownFlag() {
return shutdownFlag;
}

@Override
public void run() {
startup();
Expand Down
78 changes: 46 additions & 32 deletions ...ain/java/com/orientechnologies/orient/core/index/hashindex/local/OHashTableDirectory.java 100644 → 100755
@@ -1,22 +1,22 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/

package com.orientechnologies.orient.core.index.hashindex.local;

Expand All @@ -41,26 +41,26 @@
* @since 5/14/14
*/
public class OHashTableDirectory extends ODurableComponent {
public static final int ITEM_SIZE = OLongSerializer.LONG_SIZE;
public static final int ITEM_SIZE = OLongSerializer.LONG_SIZE;

public static final int LEVEL_SIZE = OLocalHashTable.MAX_LEVEL_SIZE;
public static final int LEVEL_SIZE = OLocalHashTable.MAX_LEVEL_SIZE;

public static final int BINARY_LEVEL_SIZE = LEVEL_SIZE * ITEM_SIZE + 3 * OByteSerializer.BYTE_SIZE;
public static final int BINARY_LEVEL_SIZE = LEVEL_SIZE * ITEM_SIZE + 3 * OByteSerializer.BYTE_SIZE;

private final String defaultExtension;
private final String name;
private final ODiskCache diskCache;
private final String defaultExtension;
private final String name;
private final ODiskCache diskCache;

private long fileId;
private long fileId;

private OCacheEntry firstEntry;
private List<OCacheEntry> entries;
private OCacheEntry firstEntry;
private List<OCacheEntry> entries;

private final boolean durableInNonTxMode;
private final boolean durableInNonTxMode;
private final OAbstractPaginatedStorage storage;

private final ODurablePage.TrackMode txTrackMode = ODurablePage.TrackMode.valueOf(OGlobalConfiguration.INDEX_TX_MODE
.getValueAsString().toUpperCase());
private final ODurablePage.TrackMode txTrackMode = ODurablePage.TrackMode.valueOf(OGlobalConfiguration.INDEX_TX_MODE
.getValueAsString().toUpperCase());

public OHashTableDirectory(String defaultExtension, String name, boolean durableInNonTxMode, OAbstractPaginatedStorage storage) {
this.defaultExtension = defaultExtension;
Expand Down Expand Up @@ -91,8 +91,15 @@ public void create() throws IOException {
private void init() throws IOException {
startAtomicOperation();
try {
boolean isNewPage = false;
firstEntry = diskCache.load(fileId, 0, true);

if (firstEntry == null) {
firstEntry = diskCache.allocateNewPage(fileId);
isNewPage = true;
assert firstEntry.getPageIndex() == 0;
}

diskCache.pinPage(firstEntry);

firstEntry.acquireExclusiveLock();
Expand All @@ -103,7 +110,7 @@ private void init() throws IOException {
firstPage.setTombstone(-1);

firstEntry.markDirty();
logPageChanges(firstPage, firstEntry.getFileId(), firstEntry.getPageIndex(), true);
logPageChanges(firstPage, firstEntry.getFileId(), firstEntry.getPageIndex(), isNewPage);
} finally {
firstEntry.releaseExclusiveLock();
diskCache.release(firstEntry);
Expand All @@ -126,6 +133,9 @@ public void open() throws IOException {
try {
fileId = diskCache.openFile(name + defaultExtension);
firstEntry = diskCache.load(fileId, 0, true);

assert firstEntry != null;

diskCache.pinPage(firstEntry);
diskCache.release(firstEntry);

Expand All @@ -135,6 +145,8 @@ public void open() throws IOException {

for (int i = 1; i < filledUpTo; i++) {
final OCacheEntry entry = diskCache.load(fileId, i, true);
assert entry != null;

diskCache.pinPage(entry);
diskCache.release(entry);

Expand Down Expand Up @@ -213,7 +225,9 @@ public int addNewNode(byte maxLeftChildDepth, byte maxRightChildDepth, byte node

boolean newPage = false;
while (entries.size() <= pageIndex) {
OCacheEntry cacheEntry = diskCache.load(fileId, entries.size() + 1, true);
OCacheEntry cacheEntry = diskCache.allocateNewPage(fileId);
assert cacheEntry.getPageIndex() == entries.size() + 1;

diskCache.pinPage(cacheEntry);
diskCache.release(cacheEntry);

Expand Down
@@ -1,22 +1,22 @@
/*
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
*
* * Copyright 2014 Orient Technologies LTD (info(at)orientechnologies.com)
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* *
* * For more information: http://www.orientechnologies.com
*
*/
package com.orientechnologies.orient.core.index.hashindex.local;

import com.orientechnologies.common.comparator.ODefaultComparator;
Expand Down Expand Up @@ -505,6 +505,9 @@ public V remove(K key) {
V removed = null;

OCacheEntry cacheEntry = diskCache.load(nullBucketFileId, 0, false);
if (cacheEntry == null)
cacheEntry = diskCache.allocateNewPage(nullBucketFileId);

cacheEntry.acquireExclusiveLock();
try {
final ONullBucket<V> nullBucket = new ONullBucket<V>(cacheEntry, getTrackMode(), valueSerializer, false);
Expand Down Expand Up @@ -712,6 +715,9 @@ public void deleteWithoutLoad(String name, OAbstractPaginatedStorage storageLoca

fileStateId = diskCache.openFile(name + metadataConfigurationFileExtension);
hashStateEntry = diskCache.load(fileStateId, 0, true);

assert hashStateEntry != null;

try {
OHashIndexFileLevelMetadataPage metadataPage = new OHashIndexFileLevelMetadataPage(hashStateEntry,
ODurablePage.TrackMode.NONE, false);
Expand Down Expand Up @@ -2082,7 +2088,12 @@ private OCacheEntry loadPageEntry(long pageIndex, int fileLevel) throws IOExcept
} finally {
diskCache.release(hashStateEntry);
}
return diskCache.load(fileId, pageIndex, false);

OCacheEntry entry = diskCache.load(fileId, pageIndex, false);
if (entry == null)
entry = diskCache.allocateNewPage(fileId);

return entry;
}

private BucketPath getBucket(final long hashCode) throws IOException {
Expand Down
Expand Up @@ -267,7 +267,9 @@ public void loadPinnedPage(final OCacheEntry cacheEntry) throws IOException {

@Override
public OCacheEntry load(final long fileId, final long pageIndex, final boolean checkPinnedPages) throws IOException {
final UpdateCacheResult cacheResult = doLoad(fileId, pageIndex, checkPinnedPages);
final UpdateCacheResult cacheResult = doLoad(fileId, pageIndex, checkPinnedPages, false);
if (cacheResult == null)
return null;

try {
if (cacheResult.removeColdPages)
Expand All @@ -282,7 +284,7 @@ public OCacheEntry load(final long fileId, final long pageIndex, final boolean c
return cacheResult.cacheEntry;
}

private UpdateCacheResult doLoad(long fileId, long pageIndex, boolean checkPinnedPages) throws IOException {
private UpdateCacheResult doLoad(long fileId, long pageIndex, boolean checkPinnedPages, boolean addNewPages) throws IOException {
boolean removeColdPages = false;
OCacheEntry cacheEntry = null;

Expand All @@ -299,7 +301,10 @@ private UpdateCacheResult doLoad(long fileId, long pageIndex, boolean checkPinne
cacheEntry = pinnedPages.get(new PinnedPage(fileId, pageIndex));

if (cacheEntry == null) {
UpdateCacheResult cacheResult = updateCache(fileId, pageIndex);
UpdateCacheResult cacheResult = updateCache(fileId, pageIndex, addNewPages);
if (cacheResult == null)
return null;

cacheEntry = cacheResult.cacheEntry;
removeColdPages = cacheResult.removeColdPages;
}
Expand Down Expand Up @@ -328,7 +333,7 @@ public OCacheEntry allocateNewPage(final long fileId) throws IOException {
fileLock = fileLockManager.acquireExclusiveLock(fileId);
try {
final long filledUpTo = getFilledUpTo(fileId);
cacheResult = doLoad(fileId, filledUpTo, false);
cacheResult = doLoad(fileId, filledUpTo, false, true);
} finally {
fileLockManager.releaseLock(fileLock);
}
Expand Down Expand Up @@ -637,7 +642,7 @@ public void removeLowDiskSpaceListener(OLowDiskSpaceListener listener) {
writeCache.removeLowDiskSpaceListener(listener);
}

private UpdateCacheResult updateCache(final long fileId, final long pageIndex) throws IOException {
private UpdateCacheResult updateCache(final long fileId, final long pageIndex, final boolean addNewPages) throws IOException {
final OProfilerMBean profiler = storageName != null ? Orient.instance().getProfiler() : null;
final long startTime = storageName != null ? System.currentTimeMillis() : 0;

Expand All @@ -657,8 +662,9 @@ private UpdateCacheResult updateCache(final long fileId, final long pageIndex) t

cacheEntry = a1out.remove(fileId, pageIndex);
if (cacheEntry != null) {
OCachePointer dataPointer = writeCache.load(fileId, pageIndex, false);

OCachePointer dataPointer = writeCache.load(fileId, pageIndex);
assert dataPointer != null;
assert cacheEntry.dataPointer == null;
assert !cacheEntry.isDirty;

Expand All @@ -673,7 +679,9 @@ private UpdateCacheResult updateCache(final long fileId, final long pageIndex) t
if (cacheEntry != null)
return new UpdateCacheResult(false, cacheEntry);

OCachePointer dataPointer = writeCache.load(fileId, pageIndex);
OCachePointer dataPointer = writeCache.load(fileId, pageIndex, addNewPages);
if (dataPointer == null)
return null;

cacheEntry = new OCacheEntry(fileId, pageIndex, dataPointer, false);
a1in.putToMRU(cacheEntry);
Expand Down
Expand Up @@ -78,7 +78,7 @@ public class OWOWCache {

private final long diskSizeCheckInterval = OGlobalConfiguration.DISC_CACHE_FREE_SPACE_CHECK_INTERVAL
.getValueAsInteger() * 1000;
private final List<WeakReference<OLowDiskSpaceListener>> listeners = new CopyOnWriteArrayList<WeakReference<OLowDiskSpaceListener>>();
private final List<WeakReference<OLowDiskSpaceListener>> listeners = new CopyOnWriteArrayList<WeakReference<OLowDiskSpaceListener>>();

private final AtomicLong lastDiskSpaceCheck = new AtomicLong(System.currentTimeMillis());
private final String storagePath;
Expand Down Expand Up @@ -400,7 +400,7 @@ public Future store(final long fileId, final long pageIndex, final OCachePointer
}
}

public OCachePointer load(long fileId, long pageIndex) throws IOException {
public OCachePointer load(long fileId, long pageIndex, boolean addNewPages) throws IOException {
filesLock.acquireReadLock();
try {
final GroupKey groupKey = new GroupKey(fileId, pageIndex >>> 4);
Expand All @@ -410,17 +410,22 @@ public OCachePointer load(long fileId, long pageIndex) throws IOException {

OCachePointer pagePointer;
if (writeGroup == null) {
pagePointer = cacheFileContent(fileId, pageIndex);
pagePointer.incrementReferrer();
pagePointer = cacheFileContent(fileId, pageIndex, addNewPages);
if (pagePointer == null)
return null;

pagePointer.incrementReferrer();
return pagePointer;
}

final int entryIndex = (int) (pageIndex & 15);
pagePointer = writeGroup.pages[entryIndex];

if (pagePointer == null)
pagePointer = cacheFileContent(fileId, pageIndex);
pagePointer = cacheFileContent(fileId, pageIndex, addNewPages);

if (pagePointer == null)
return null;

pagePointer.incrementReferrer();
return pagePointer;
Expand Down Expand Up @@ -883,7 +888,7 @@ private void removeCachedPages(long fileId) {
}
}

private OCachePointer cacheFileContent(long fileId, long pageIndex) throws IOException {
private OCachePointer cacheFileContent(long fileId, long pageIndex, boolean addNewPages) throws IOException {
final long startPosition = pageIndex * pageSize;
final long endPosition = startPosition + pageSize;

Expand All @@ -905,15 +910,16 @@ private OCachePointer cacheFileContent(long fileId, long pageIndex) throws IOExc
final ODirectMemoryPointer pointer = new ODirectMemoryPointer(content);

dataPointer = new OCachePointer(pointer, lastLsn);
} else {
} else if (addNewPages) {
final int space = (int) (endPosition - fileClassic.getFilledUpTo());
fileClassic.allocateSpace(space);

addAllocatedSpace(space);

final ODirectMemoryPointer pointer = new ODirectMemoryPointer(content);
dataPointer = new OCachePointer(pointer, lastLsn);
}
} else
return null;

return dataPointer;
}
Expand Down Expand Up @@ -1301,7 +1307,7 @@ public Void call() throws Exception {
}
}

private static class FlushThreadFactory implements ThreadFactory {
private static class FlushThreadFactory implements ThreadFactory {
private final String storageName;

private FlushThreadFactory(String storageName) {
Expand Down

0 comments on commit 3ad538e

Please sign in to comment.