Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pixels-cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</dependency>

<dependency>
<groupId>com.coreos</groupId>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<optional>true</optional>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
*/
package io.pixelsdb.pixels.cache;

import com.coreos.jetcd.data.KeyValue;
import io.pixelsdb.pixels.common.exception.FSException;
import io.etcd.jetcd.KeyValue;
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.Compact;
import io.pixelsdb.pixels.common.metadata.domain.Layout;
Expand All @@ -35,6 +34,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.*;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -236,7 +236,7 @@ public int updateAll(int version, Layout layout)
logger.debug("Found no allocated files. No updates are needed. " + key);
return 0;
}
String fileStr = keyValue.getValue().toStringUtf8();
String fileStr = keyValue.getValue().toString(StandardCharsets.UTF_8);
String[] files = fileStr.split(";");
return internalUpdateAll(version, layout, files);
}
Expand Down Expand Up @@ -283,11 +283,11 @@ public int updateIncremental (int version, Layout layout)
logger.debug("Found no allocated files. No updates are needed. " + key);
return 0;
}
String fileStr = keyValue.getValue().toStringUtf8();
String fileStr = keyValue.getValue().toString(StandardCharsets.UTF_8);
String[] files = fileStr.split(";");
return internalUpdateIncremental(version, layout, files);
}
catch (IOException | FSException e)
catch (IOException e)
{
e.printStackTrace();
return -1;
Expand Down Expand Up @@ -419,10 +419,9 @@ private int internalUpdateAll(int version, Layout layout, String[] files)
* @param files
* @return
* @throws IOException
* @throws FSException
*/
private int internalUpdateIncremental(int version, Layout layout, String[] files)
throws IOException, FSException
throws IOException
{
int status = 0;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ public byte[] read(long offset, int length)

public long getCurrentBlockId() throws IOException
{
return physicalReader.getCurrentBlockId();
return physicalReader.getBlockId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.presto;
package io.pixelsdb.pixels.cache;

import io.pixelsdb.pixels.common.utils.EtcdUtil;
import com.coreos.jetcd.data.KeyValue;
import io.etcd.jetcd.KeyValue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
Expand All @@ -50,16 +51,16 @@ public void getNodeFiles ()
if(keyValue != null)
{
// 1. get version
cacheVersion = keyValue.getValue().toStringUtf8();
cacheVersion = keyValue.getValue().toString(StandardCharsets.UTF_8);
System.out.println("cache_version: " + cacheVersion);
// 2. get files of each node
List<KeyValue> nodeFiles = etcdUtil.getKeyValuesByPrefix("location_" + cacheVersion);
if(nodeFiles.size() > 0)
{
for (KeyValue kv : nodeFiles)
{
String node = kv.getKey().toStringUtf8().split("_")[2];
String[] files = kv.getValue().toStringUtf8().split(";");
String node = kv.getKey().toString(StandardCharsets.UTF_8).split("_")[2];
String[] files = kv.getValue().toString(StandardCharsets.UTF_8).split(";");
for(String file : files)
{
System.out.println(file + ", " + node);
Expand Down
18 changes: 1 addition & 17 deletions pixels-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
Expand Down Expand Up @@ -91,19 +85,9 @@
</dependency>

<dependency>
<groupId>com.coreos</groupId>
<groupId>io.etcd</groupId>
<artifactId>jetcd-core</artifactId>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2021 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.lock;

import io.etcd.jetcd.KeyValue;
import io.pixelsdb.pixels.common.utils.EtcdUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import static io.pixelsdb.pixels.common.utils.Constants.LOCK_PATH_PREFIX;

/**
* Created at: 8/29/21
* Author: hank
*/
public class EtcdAutoIncrement
{
private static final Logger logger = LogManager.getLogger(EtcdAutoIncrement.class);

private EtcdAutoIncrement() { }

/**
* Initialize the id (set init value to '0') by the id key.
* This method is idempotent.
* @param idKey
*/
public static void InitId(String idKey)
{
EtcdUtil etcd = EtcdUtil.Instance();
EtcdReadWriteLock readWriteLock = new EtcdReadWriteLock(etcd.getClient(),
LOCK_PATH_PREFIX + idKey);
EtcdMutex writeLock = readWriteLock.writeLock();
try
{
writeLock.acquire();
KeyValue idKV = etcd.getKeyValue(idKey);
if (idKV == null)
{
etcd.putKeyValue(idKey, "0");
}
} catch (Exception e)
{
logger.error(e);
e.printStackTrace();
} finally
{
try
{
writeLock.release();
} catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
}
}

public static long GenerateId(String idKey)
{
long id = 0;
EtcdUtil etcd = EtcdUtil.Instance();
EtcdReadWriteLock readWriteLock = new EtcdReadWriteLock(etcd.getClient(),
LOCK_PATH_PREFIX + idKey);
EtcdMutex writeLock = readWriteLock.writeLock();
try
{
writeLock.acquire();
KeyValue idKV = etcd.getKeyValue(idKey);
if (idKV != null)
{
id = Long.parseLong(new String(idKV.getValue().getBytes()));
id++;
etcd.putKeyValue(idKey, id + "");
}
} catch (Exception e)
{
logger.error(e);
e.printStackTrace();
} finally
{
try
{
writeLock.release();
} catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
}
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package io.pixelsdb.pixels.common.lock;

import com.coreos.jetcd.Client;
import io.etcd.jetcd.Client;
import com.google.common.collect.Maps;
import org.apache.curator.utils.PathUtils;

Expand All @@ -30,6 +30,7 @@

/**
* @author: tao
* @author hank
* @date: Create in 2018-10-27 14:33
**/
public class EtcdMutex implements InterProcessLock
Expand Down Expand Up @@ -62,6 +63,17 @@ public EtcdMutex(Client client, String path)
internals = new LockInternals(client, path, lockName);
}

public EtcdMutex verbose(boolean verbose)
{
this.internals.verbose(verbose);
return this;
}

public EtcdMutex verbose()
{
return verbose(true);
}

/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
Expand All @@ -71,17 +83,16 @@ public EtcdMutex(Client client, String path)
*/
public void acquire() throws Exception
{
if (!this.internalLock(-1L, (TimeUnit) null))
if (!this.internalLock(Long.MAX_VALUE, TimeUnit.SECONDS))
{
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}

}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread();
EtcdMutex.LockData lockData = (EtcdMutex.LockData) this.threadData.get(currentThread);
EtcdMutex.LockData lockData = this.threadData.get(currentThread);
if (lockData != null)
{
lockData.lockCount.incrementAndGet();
Expand All @@ -90,7 +101,6 @@ private boolean internalLock(long time, TimeUnit unit) throws Exception
else
{
String lockPath = internals.attemptLock(time, unit);
System.out.println("[attemptLock]: end, " + lockPath);
if (lockPath != null)
{
EtcdMutex.LockData newLockData = new EtcdMutex.LockData(currentThread, lockPath);
Expand Down Expand Up @@ -141,7 +151,8 @@ public void release() throws Exception
LockData lockData = threadData.get(currentThread);
if (lockData == null)
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
// don't throw exception if the lock is not owned by this thread
return;
}

int newLockCount = lockData.lockCount.decrementAndGet();
Expand Down Expand Up @@ -176,5 +187,4 @@ private LockData(Thread owningThread, String lockPath)
this.lockPath = lockPath;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package io.pixelsdb.pixels.common.lock;

import com.coreos.jetcd.Client;
import io.etcd.jetcd.Client;

/**
* @author: tao
Expand Down
Loading