Skip to content

Commit

Permalink
Adding file channel close as a temporary solution to max open files l…
Browse files Browse the repository at this point in the history
…imit '#31'
  • Loading branch information
ambud committed Jul 31, 2017
1 parent b5e1d9d commit dc4d448
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
Expand Up @@ -73,7 +73,7 @@ public void createMeasurement(@PathParam(DatabaseOpsApi.DB_NAME) String dbName,
}
}

@Path("/series/{seriesName}")
@Path("/series")
@PUT
@Consumes({ MediaType.APPLICATION_JSON })
public void createSeries(@PathParam(DatabaseOpsApi.DB_NAME) String dbName,
Expand All @@ -92,7 +92,7 @@ public void createSeries(@PathParam(DatabaseOpsApi.DB_NAME) String dbName,
}
}

@Path("/series/{seriesName}/retention/{retentionPolicy}")
@Path("/series/retention/{retentionPolicy}")
@PUT
public void updateRetentionPolicy(@PathParam(DatabaseOpsApi.DB_NAME) String dbName,
@PathParam(MEASUREMENT) String measurementName, @PathParam("retentionPolicy") int retentionPolicy) {
Expand Down
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.Set;
import java.util.Stack;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.ws.rs.NotFoundException;

Expand All @@ -45,6 +47,8 @@
*/
public class GrafanaUtils {

private static final Logger logger = Logger.getLogger(GrafanaUtils.class.getName());

private GrafanaUtils() {
}

Expand Down Expand Up @@ -127,6 +131,10 @@ public static AggregationFunction extractGrafanaAggregation(JsonObject jsonEleme
Object[] args = new Object[ary.size()];
for (JsonElement element : ary) {
JsonObject arg = element.getAsJsonObject();
// ignore invalid aggregation function
if (!arg.has("index") || !arg.has("value")) {
return null;
}
int index = arg.get("index").getAsInt();
switch (arg.get("type").getAsString().toLowerCase()) {
case "string":
Expand All @@ -146,8 +154,7 @@ public static AggregationFunction extractGrafanaAggregation(JsonObject jsonEleme
instance.init(args);
return instance;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.log(Level.FINE, "Failed to extract aggregate function:" + jsonElement, e);
}
}
return null;
Expand Down
Expand Up @@ -55,7 +55,6 @@ public class ByzantineWriter implements Writer {
private String seriesId;
private boolean onDisk;
private String outputFile;
private RandomAccessFile file;
private boolean closed;
private Map<String, String> conf;

Expand All @@ -72,6 +71,7 @@ public void configure(Map<String, String> conf) throws IOException {
this.conf = conf;
onDisk = Boolean.parseBoolean(conf.getOrDefault(StorageEngine.PERSISTENCE_DISK, "false"));
if (onDisk) {
RandomAccessFile file;
String outputDirectory = conf.getOrDefault("data.dir", "/tmp/sidewinder/data");
outputFile = outputDirectory + "/" + seriesId;
logger.fine("\n\n" + outputFile + "\t" + outputFile + "\n\n");
Expand All @@ -89,6 +89,7 @@ public void configure(Map<String, String> conf) throws IOException {
buf = file.getChannel().map(MapMode.READ_WRITE, 0, DEFAULT_BUFFER_INIT_SIZE);
buf.putInt(0);
}
file.close();
} else {
buf = ByteBuffer.allocateDirect(DEFAULT_BUFFER_INIT_SIZE);
buf.putInt(0);
Expand Down Expand Up @@ -163,18 +164,23 @@ private void compressAndWriteValue(ByteBuffer tBuf, long value) {

private void checkAndExpandBuffer() throws IOException {
if (buf.remaining() < 20) {
if (onDisk) {
expandBufferTo((int) (file.length() * 1.2));
} else {
expandBufferTo((int) (buf.capacity() * 1.2));
int cap = 0;
if (!onDisk) {
cap = (int) (buf.capacity() * 1.2);
if (cap < 0) {
throw new IOException("Buffer too large >2GB");
}
}
expandBufferTo(cap);
}
}

private void expandBufferTo(int newBufferSize) throws IOException {
ByteBuffer temp = null;
if (onDisk) {
temp = file.getChannel().map(MapMode.READ_WRITE, 0, newBufferSize);
RandomAccessFile file = new RandomAccessFile(outputFile, "rwd");
temp = file.getChannel().map(MapMode.READ_WRITE, 0, (int) (file.length() * 1.2));
file.close();
} else {
temp = ByteBuffer.allocateDirect(newBufferSize);
}
Expand Down Expand Up @@ -352,9 +358,7 @@ public void close() throws IOException {
if (onDisk && !closed) {
logger.info("Data file being freed to contain file IO for:" + seriesId + "\t" + outputFile);
((MappedByteBuffer) buf).force();
file.close();
buf = null;
file = null;
closed = true;
}
write.unlock();
Expand All @@ -367,10 +371,10 @@ public void setConf(Map<String, String> conf) {

@Override
public void delete() throws IOException {
if(!closed) {
if (!closed) {
close();
}
if(onDisk) {
if (onDisk) {
logger.info("Data file being deleted:" + seriesId + "\t" + outputFile);
new File(outputFile).delete();
}
Expand Down

0 comments on commit dc4d448

Please sign in to comment.