Skip to content

Commit

Permalink
Merge 63bc0c6 into 7526dd4
Browse files Browse the repository at this point in the history
  • Loading branch information
domoritz committed Jun 10, 2014
2 parents 7526dd4 + 63bc0c6 commit caa3432
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 122 deletions.
12 changes: 0 additions & 12 deletions src/edu/washington/escience/myria/MyriaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,11 @@ public final class MyriaConstants {
*/
public static final int MAX_ACTIVE_QUERIES = 5;

/**
* The relation that stores profiling information while it is being written.
*/
public static final RelationKey PROFILING_RELATION_TMP = new RelationKey("public", "tmp", "Profiling");

/**
* The relation that stores profiling information.
*/
public static final RelationKey PROFILING_RELATION = new RelationKey("public", "logs", "Profiling");

/**
* The schema of the {@link #PROFILING_RELATION_TMP}.
*/
public static final Schema PROFILING_SCHEMA_TMP = Schema.ofFields(Type.LONG_TYPE, Type.INT_TYPE, Type.INT_TYPE,
Type.LONG_TYPE, Type.LONG_TYPE, Type.STRING_TYPE, Type.INT_TYPE, "queryId", "fragmentId", "opId", "nanoTime",
"numTuples", "eventType", "traceId");

/**
* The schema of the {@link #PROFILING_RELATION}.
*/
Expand Down
23 changes: 4 additions & 19 deletions src/edu/washington/escience/myria/operator/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -15,8 +14,8 @@
import edu.washington.escience.myria.parallel.LocalFragment;
import edu.washington.escience.myria.parallel.LocalFragmentResourceManager;
import edu.washington.escience.myria.parallel.LocalSubQuery;
import edu.washington.escience.myria.parallel.ProfilingLogger;
import edu.washington.escience.myria.parallel.WorkerSubQuery;
import edu.washington.escience.myria.profiling.ProfilingLogger;
import edu.washington.escience.myria.storage.TupleBatch;

/**
Expand Down Expand Up @@ -92,12 +91,6 @@ public ImmutableMap<String, Object> getExecEnvVars() {
*/
private Boolean profilingMode;

/**
* A counter that tracks how many times {@link #nextReady()} has been called, thus helping to join the call and return
* log entries that belong to the same call.
*/
private final AtomicLong traceId = new AtomicLong();

/**
* @return the profilingLogger
*/
Expand Down Expand Up @@ -325,10 +318,9 @@ public final TupleBatch nextReady() throws DbException {
return null;
}

long startTime = -1;
if (isProfilingMode()) {
// use trace id to track corresponding events
long trace = traceId.incrementAndGet();
profilingLogger.recordEvent(this, -1, "call", trace);
startTime = profilingLogger.getTime(this);
}

TupleBatch result = null;
Expand All @@ -347,7 +339,7 @@ public final TupleBatch nextReady() throws DbException {
if (result != null) {
numberOfTupleReturned = result.numTuples();
}
profilingLogger.recordEvent(this, numberOfTupleReturned, "return", traceId.get());
profilingLogger.recordEvent(this, numberOfTupleReturned, startTime);
}
if (result == null) {
checkEOSAndEOI();
Expand Down Expand Up @@ -421,13 +413,6 @@ public final void open(final Map<String, Object> execEnvVars) throws DbException
*/
public final void setEOI(final boolean eoi) {
this.eoi = eoi;
if (isProfilingMode()) {
try {
profilingLogger.recordEvent(this, -1, "eoi", traceId.get());
} catch (Exception e) {
LOGGER.error("Failed to write profiling data:", e);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import edu.washington.escience.myria.operator.network.Consumer;
import edu.washington.escience.myria.operator.network.Producer;
import edu.washington.escience.myria.parallel.ipc.StreamIOChannelID;
import edu.washington.escience.myria.profiling.ProfilingLogger;
import edu.washington.escience.myria.util.AtomicUtils;
import edu.washington.escience.myria.util.concurrent.ReentrantSpinLock;

Expand Down
3 changes: 1 addition & 2 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,9 @@ public void start() throws Exception {
messageProcessingExecutor.submit(new MessageProcessor());
LOGGER.info("Server started on {}", masterSocketInfo);

if (getSchema(MyriaConstants.PROFILING_RELATION_TMP) == null
if (getSchema(MyriaConstants.PROFILING_RELATION) == null
&& getDBMS().equals(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL)) {
final Set<Integer> workerIds = workers.keySet();
importDataset(MyriaConstants.PROFILING_RELATION_TMP, MyriaConstants.PROFILING_SCHEMA_TMP, workerIds);
importDataset(MyriaConstants.PROFILING_RELATION, MyriaConstants.PROFILING_SCHEMA, workerIds);
importDataset(MyriaConstants.SENT_RELATION, MyriaConstants.SENT_SCHEMA, workerIds);
}
Expand Down
1 change: 1 addition & 0 deletions src/edu/washington/escience/myria/parallel/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import edu.washington.escience.myria.parallel.ipc.FlowControlBagInputBuffer;
import edu.washington.escience.myria.parallel.ipc.IPCConnectionPool;
import edu.washington.escience.myria.parallel.ipc.InJVMLoopbackChannelSink;
import edu.washington.escience.myria.profiling.ProfilingLogger;
import edu.washington.escience.myria.proto.ControlProto.ControlMessage;
import edu.washington.escience.myria.proto.TransportProto.TransportMessage;
import edu.washington.escience.myria.util.IPCUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import edu.washington.escience.myria.parallel.ipc.IPCEvent;
import edu.washington.escience.myria.parallel.ipc.IPCEventListener;
import edu.washington.escience.myria.parallel.ipc.StreamOutputChannel;
import edu.washington.escience.myria.profiling.ProfilingLogger;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.util.DateTimeUtils;

Expand Down Expand Up @@ -110,7 +111,7 @@ public void operationComplete(final LocalFragmentFuture future) throws Exception
}
if (isProfilingMode()) {
try {
getWorker().getProfilingLogger().flush(getSubQueryId().getQueryId());
getWorker().getProfilingLogger().flush();
} catch (DbException e) {
LOGGER.error("Error flushing profiling logger", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package edu.washington.escience.myria.parallel;
package edu.washington.escience.myria.profiling;

import java.sql.BatchUpdateException;
import java.sql.Connection;
Expand All @@ -7,7 +7,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

Expand All @@ -19,13 +18,14 @@
import edu.washington.escience.myria.accessmethod.ConnectionInfo;
import edu.washington.escience.myria.accessmethod.JdbcAccessMethod;
import edu.washington.escience.myria.operator.Operator;
import edu.washington.escience.myria.parallel.WorkerSubQuery;

/**
* A logger for profiling data.
*/
public class ProfilingLogger {
/** The logger for this class. */
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalFragment.class);
private static final org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ProfilingLogger.class);

/** The connection to the database database. */
private final JdbcAccessMethod accessMethod;
Expand All @@ -39,12 +39,6 @@ public class ProfilingLogger {
/** A query statement for batching. */
private PreparedStatement statementSent;

/**
* A query statement for transforming the profiling relation from {@link MyriaConstants.PROFILING_SCHEMA_TMP} to
* {@link MyriaConstants.PROFILING_SCHEMA}.
*/
private PreparedStatement statementTransform;

/** Number of rows in batch in {@link #statementEvent}. */
private int batchSizeEvents = 0;

Expand All @@ -65,26 +59,17 @@ public ProfilingLogger(final ConnectionInfo connectionInfo) throws DbException {
/* open the database connection */
accessMethod = (JdbcAccessMethod) AccessMethod.of(connectionInfo.getDbms(), connectionInfo, false);

accessMethod.createUnloggedTableIfNotExists(MyriaConstants.PROFILING_RELATION_TMP,
MyriaConstants.PROFILING_SCHEMA_TMP);
accessMethod.createTableIfNotExists(MyriaConstants.PROFILING_RELATION, MyriaConstants.PROFILING_SCHEMA);
accessMethod.createUnloggedTableIfNotExists(MyriaConstants.PROFILING_RELATION, MyriaConstants.PROFILING_SCHEMA);
accessMethod.createTableIfNotExists(MyriaConstants.SENT_RELATION, MyriaConstants.SENT_SCHEMA);

createProfilingIndexes();
createTmpProfilingIndexes();
createSentIndex();

connection = accessMethod.getConnection();
try {
statementEvent =
connection.prepareStatement(accessMethod.insertStatementFromSchema(MyriaConstants.PROFILING_SCHEMA_TMP,
MyriaConstants.PROFILING_RELATION_TMP));
} catch (SQLException e) {
throw new DbException(e);
}

try {
statementTransform = connection.prepareStatement(getTransformProfilingDataStatement());
connection.prepareStatement(accessMethod.insertStatementFromSchema(MyriaConstants.PROFILING_SCHEMA,
MyriaConstants.PROFILING_RELATION));
} catch (SQLException e) {
throw new DbException(e);
}
Expand Down Expand Up @@ -115,21 +100,6 @@ protected void createSentIndex() throws DbException {
}
}

/**
* @throws DbException if index cannot be created
*/
protected void createTmpProfilingIndexes() throws DbException {
final Schema schema = MyriaConstants.PROFILING_SCHEMA_TMP;

List<IndexRef> filterIndex = ImmutableList.of(IndexRef.of(schema, "queryId"));

try {
accessMethod.createIndexIfNotExists(MyriaConstants.PROFILING_RELATION_TMP, schema, filterIndex);
} catch (DbException e) {
LOGGER.error("Couldn't create index for profiling logs:", e);
}
}

/**
* @throws DbException if index cannot be created
*/
Expand Down Expand Up @@ -163,7 +133,7 @@ protected void createProfilingIndexes() throws DbException {
* @param operator the operator
* @return the time to record
*/
private long getTime(final Operator operator) {
public long getTime(final Operator operator) {
final WorkerSubQuery workerSubQuery = (WorkerSubQuery) operator.getLocalSubQuery();
final long workerStartTimeMillis = workerSubQuery.getBeginMilliseconds();
final long threadStartTimeMillis = operator.getFragment().getBeginMilliseconds();
Expand All @@ -179,46 +149,26 @@ private long getTime(final Operator operator) {
return TimeUnit.MILLISECONDS.toNanos(startupTimeMillis) + activeTimeNanos;
}

/**
* Creates a statement for transforming the profiling relation from {@link MyriaConstants.PROFILING_SCHEMA_TMP} to
* {@link MyriaConstants.PROFILING_SCHEMA}.
*
* @return the insert into statement
*/
private String getTransformProfilingDataStatement() {
return Joiner.on(' ').join("INSERT INTO",
MyriaConstants.PROFILING_RELATION.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL),
"SELECT c.queryid, c.fragmentid, c.opid, c.nanotime as startTime, r.nanotime as endTime, r.numtuples", "FROM",
MyriaConstants.PROFILING_RELATION_TMP.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL), "c,",
MyriaConstants.PROFILING_RELATION_TMP.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL), "r", "WHERE",
"c.queryid = r.queryid AND c.opid = r.opid AND c.fragmentid = r.fragmentid AND c.traceid = r.traceid",
"AND r.eventtype = 'return' AND c.eventtype = 'call' AND c.queryid = ?", "ORDER BY c.nanotime ASC;",
"DELETE FROM", MyriaConstants.PROFILING_RELATION_TMP.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL),
"WHERE", "queryid = ?");
}

/**
* Appends a single event appearing in an operator to a batch that is flushed either after a certain number of events
* are in the batch or {@link #flushProfilingEventsBatch()} is called.
*
* @param operator the operator where this record was logged
* @param numTuples the number of tuples
* @param eventType the type of the event to be logged
* @param traceId an id to trace corresponding events
* @param startTime the start time of the event in ns
*
* @throws DbException if insertion in the database fails
*/
public synchronized void recordEvent(final Operator operator, final long numTuples, final String eventType,
final long traceId) throws DbException {
public synchronized void recordEvent(final Operator operator, final long numTuples, final long startTime)
throws DbException {

try {
statementEvent.setLong(1, operator.getQueryId());
statementEvent.setInt(2, operator.getFragmentId());
statementEvent.setInt(3, operator.getOpId());
statementEvent.setLong(4, getTime(operator));
statementEvent.setLong(5, numTuples);
statementEvent.setString(6, eventType);
statementEvent.setLong(7, traceId);
statementEvent.setLong(4, startTime);
statementEvent.setLong(5, getTime(operator));
statementEvent.setLong(6, numTuples);

statementEvent.addBatch();
batchSizeEvents++;
Expand All @@ -234,35 +184,11 @@ public synchronized void recordEvent(final Operator operator, final long numTupl
/**
* Flush the tuple batch buffer and transform the profiling data.
*
* @param queryId the query id
* @throws DbException if insertion in the database fails
*/
public synchronized void flush(final long queryId) throws DbException {
public synchronized void flush() throws DbException {
flushSentBatch();
flushProfilingEventsBatch();
transformProfilingRelation(queryId);
}

/**
* Executes the transformation.
*
* @param queryId the query id
* @throws DbException if any error occurs
*/
private void transformProfilingRelation(final long queryId) throws DbException {
try {
statementTransform.setLong(1, queryId); // for insert statement
statementTransform.setLong(2, queryId); // for delete statement
connection.setAutoCommit(false);
statementTransform.executeUpdate();
connection.commit();
connection.setAutoCommit(true);
} catch (SQLException e) {
if (e instanceof BatchUpdateException) {
LOGGER.error("Error transforming profiling relation: ", e.getNextException());
}
throw new DbException(e);
}
}

/**
Expand All @@ -271,6 +197,7 @@ private void transformProfilingRelation(final long queryId) throws DbException {
* @throws DbException if insertion in the database fails
*/
private void flushProfilingEventsBatch() throws DbException {
final long startTime = System.nanoTime();
try {
if (batchSizeEvents == 0) {
return;
Expand All @@ -284,6 +211,9 @@ private void flushProfilingEventsBatch() throws DbException {
}
throw new DbException(e);
}
LOGGER.info("Flushing the profiling events batch took {} milliseconds.", TimeUnit.NANOSECONDS.toMillis(System
.nanoTime()
- startTime));
}

/**
Expand All @@ -292,6 +222,7 @@ private void flushProfilingEventsBatch() throws DbException {
* @throws DbException if insertion in the database fails
*/
private void flushSentBatch() throws DbException {
final long startTime = System.nanoTime();
try {
if (batchSizeSent > 0) {
statementSent.executeBatch();
Expand All @@ -304,6 +235,8 @@ private void flushSentBatch() throws DbException {
}
throw new DbException(e);
}
LOGGER.info("Flushing the sent batch took {} milliseconds.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- startTime));
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/edu/washington/escience/myria/profiling/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Query profiling functionality for the Myria project.
*/
package edu.washington.escience.myria.profiling;

0 comments on commit caa3432

Please sign in to comment.