Skip to content

Commit

Permalink
Add physical* and network* metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 authored and dain committed Jan 31, 2019
1 parent 10546ea commit bacf59f
Show file tree
Hide file tree
Showing 30 changed files with 683 additions and 16 deletions.
Expand Up @@ -41,6 +41,12 @@ public class BasicStageStats
new DataSize(0, BYTE),
0,

new DataSize(0, BYTE),
0,

new DataSize(0, BYTE),
0,

0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand All @@ -58,6 +64,10 @@ public class BasicStageStats
private final int queuedDrivers;
private final int runningDrivers;
private final int completedDrivers;
private final DataSize physicalInputDataSize;
private final long physicalInputPositions;
private final DataSize internalNetworkInputDataSize;
private final long internalNetworkInputPositions;
private final DataSize rawInputDataSize;
private final long rawInputPositions;
private final long cumulativeUserMemory;
Expand All @@ -77,6 +87,12 @@ public BasicStageStats(
int runningDrivers,
int completedDrivers,

DataSize physicalInputDataSize,
long physicalInputPositions,

DataSize internalNetworkInputDataSize,
long internalNetworkInputPositions,

DataSize rawInputDataSize,
long rawInputPositions,

Expand All @@ -97,6 +113,10 @@ public BasicStageStats(
this.queuedDrivers = queuedDrivers;
this.runningDrivers = runningDrivers;
this.completedDrivers = completedDrivers;
this.physicalInputDataSize = requireNonNull(physicalInputDataSize, "physicalInputDataSize is null");
this.physicalInputPositions = physicalInputPositions;
this.internalNetworkInputDataSize = requireNonNull(internalNetworkInputDataSize, "internalNetworkInputDataSize is null");
this.internalNetworkInputPositions = internalNetworkInputPositions;
this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null");
this.rawInputPositions = rawInputPositions;
this.cumulativeUserMemory = cumulativeUserMemory;
Expand Down Expand Up @@ -134,6 +154,26 @@ public int getCompletedDrivers()
return completedDrivers;
}

public DataSize getPhysicalInputDataSize()
{
return physicalInputDataSize;
}

public long getPhysicalInputPositions()
{
return physicalInputPositions;
}

public DataSize getInternalNetworkInputDataSize()
{
return internalNetworkInputDataSize;
}

public long getInternalNetworkInputPositions()
{
return internalNetworkInputPositions;
}

public DataSize getRawInputDataSize()
{
return rawInputDataSize;
Expand Down Expand Up @@ -198,6 +238,12 @@ public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats>
long totalScheduledTimeMillis = 0;
long totalCpuTime = 0;

long physicalInputDataSize = 0;
long physicalInputPositions = 0;

long internalNetworkInputDataSize = 0;
long internalNetworkInputPositions = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand All @@ -224,6 +270,12 @@ public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats>
fullyBlocked &= stageStats.isFullyBlocked();
blockedReasons.addAll(stageStats.getBlockedReasons());

physicalInputDataSize += stageStats.getPhysicalInputDataSize().toBytes();
physicalInputPositions += stageStats.getPhysicalInputPositions();

internalNetworkInputDataSize += stageStats.getInternalNetworkInputDataSize().toBytes();
internalNetworkInputPositions += stageStats.getInternalNetworkInputPositions();

rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();
}
Expand All @@ -241,6 +293,12 @@ public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats>
runningDrivers,
completedDrivers,

succinctBytes(physicalInputDataSize),
physicalInputPositions,

succinctBytes(internalNetworkInputDataSize),
internalNetworkInputPositions,

succinctBytes(rawInputDataSize),
rawInputPositions,

Expand Down
Expand Up @@ -300,6 +300,12 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
stageStats.getRunningDrivers(),
stageStats.getCompletedDrivers(),

stageStats.getPhysicalInputDataSize(),
stageStats.getPhysicalInputPositions(),

stageStats.getInternalNetworkInputDataSize(),
stageStats.getInternalNetworkInputPositions(),

stageStats.getRawInputDataSize(),
stageStats.getRawInputPositions(),

Expand Down Expand Up @@ -401,6 +407,12 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
long totalCpuTime = 0;
long totalBlockedTime = 0;

long physicalInputDataSize = 0;
long physicalInputPositions = 0;

long internalNetworkInputDataSize = 0;
long internalNetworkInputPositions = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand Down Expand Up @@ -444,6 +456,12 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)

PlanFragment plan = stageInfo.getPlan();
if (plan != null && plan.getPartitionedSourceNodes().stream().anyMatch(TableScanNode.class::isInstance)) {
physicalInputDataSize += stageStats.getPhysicalInputDataSize().toBytes();
physicalInputPositions += stageStats.getPhysicalInputPositions();

internalNetworkInputDataSize += stageStats.getInternalNetworkInputDataSize().toBytes();
internalNetworkInputPositions += stageStats.getInternalNetworkInputPositions();

rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();

Expand Down Expand Up @@ -507,6 +525,10 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
fullyBlocked,
blockedReasons,

succinctBytes(physicalInputDataSize),
physicalInputPositions,
succinctBytes(internalNetworkInputDataSize),
internalNetworkInputPositions,
succinctBytes(rawInputDataSize),
rawInputPositions,
succinctBytes(processedInputDataSize),
Expand Down Expand Up @@ -982,6 +1004,10 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getTotalBlockedTime(),
queryStats.isFullyBlocked(),
queryStats.getBlockedReasons(),
queryStats.getPhysicalInputDataSize(),
queryStats.getPhysicalInputPositions(),
queryStats.getInternalNetworkInputDataSize(),
queryStats.getInternalNetworkInputPositions(),
queryStats.getRawInputDataSize(),
queryStats.getRawInputPositions(),
queryStats.getProcessedInputDataSize(),
Expand Down
48 changes: 48 additions & 0 deletions presto-main/src/main/java/io/prestosql/execution/QueryStats.java
Expand Up @@ -79,6 +79,12 @@ public class QueryStats
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;

private final DataSize physicalInputDataSize;
private final long physicalInputPositions;

private final DataSize internalNetworkInputDataSize;
private final long internalNetworkInputPositions;

private final DataSize rawInputDataSize;
private final long rawInputPositions;

Expand Down Expand Up @@ -134,6 +140,12 @@ public QueryStats(
@JsonProperty("fullyBlocked") boolean fullyBlocked,
@JsonProperty("blockedReasons") Set<BlockedReason> blockedReasons,

@JsonProperty("physicalInputDataSize") DataSize physicalInputDataSize,
@JsonProperty("physicalInputPositions") long physicalInputPositions,

@JsonProperty("internalNetworkInputDataSize") DataSize internalNetworkInputDataSize,
@JsonProperty("internalNetworkInputPositions") long internalNetworkInputPositions,

@JsonProperty("rawInputDataSize") DataSize rawInputDataSize,
@JsonProperty("rawInputPositions") long rawInputPositions,

Expand Down Expand Up @@ -194,6 +206,14 @@ public QueryStats(
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));

this.physicalInputDataSize = requireNonNull(physicalInputDataSize, "physicalInputDataSize is null");
checkArgument(physicalInputPositions >= 0, "physicalInputPositions is negative");
this.physicalInputPositions = physicalInputPositions;

this.internalNetworkInputDataSize = requireNonNull(internalNetworkInputDataSize, "internalNetworkInputDataSize is null");
checkArgument(internalNetworkInputPositions >= 0, "internalNetworkInputPositions is negative");
this.internalNetworkInputPositions = internalNetworkInputPositions;

this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null");
checkArgument(rawInputPositions >= 0, "rawInputPositions is negative");
this.rawInputPositions = rawInputPositions;
Expand Down Expand Up @@ -256,6 +276,10 @@ public static QueryStats immediateFailureQueryStats()
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
0,
new DataSize(0, BYTE),
ImmutableList.of(),
ImmutableList.of());
}
Expand Down Expand Up @@ -453,6 +477,30 @@ public Set<BlockedReason> getBlockedReasons()
return blockedReasons;
}

@JsonProperty
public DataSize getPhysicalInputDataSize()
{
return physicalInputDataSize;
}

@JsonProperty
public long getPhysicalInputPositions()
{
return physicalInputPositions;
}

@JsonProperty
public DataSize getInternalNetworkInputDataSize()
{
return internalNetworkInputDataSize;
}

@JsonProperty
public long getInternalNetworkInputPositions()
{
return internalNetworkInputPositions;
}

@JsonProperty
public DataSize getRawInputDataSize()
{
Expand Down
Expand Up @@ -269,6 +269,12 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos
long totalScheduledTime = 0;
long totalCpuTime = 0;

long physicalInputDataSize = 0;
long physicalInputPositions = 0;

long internalNetworkInputDataSize = 0;
long internalNetworkInputPositions = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand Down Expand Up @@ -299,6 +305,12 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos
}

if (fragment.getPartitionedSourceNodes().stream().anyMatch(TableScanNode.class::isInstance)) {
physicalInputDataSize += taskStats.getPhysicalInputDataSize().toBytes();
physicalInputPositions += taskStats.getPhysicalInputPositions();

internalNetworkInputDataSize += taskStats.getInternalNetworkInputDataSize().toBytes();
internalNetworkInputPositions += taskStats.getInternalNetworkInputPositions();

rawInputDataSize += taskStats.getRawInputDataSize().toBytes();
rawInputPositions += taskStats.getRawInputPositions();
}
Expand All @@ -317,6 +329,12 @@ public BasicStageStats getBasicStageStats(Supplier<Iterable<TaskInfo>> taskInfos
runningDrivers,
completedDrivers,

succinctBytes(physicalInputDataSize),
physicalInputPositions,

succinctBytes(internalNetworkInputDataSize),
internalNetworkInputPositions,

succinctBytes(rawInputDataSize),
rawInputPositions,

Expand Down Expand Up @@ -367,6 +385,12 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long totalCpuTime = 0;
long totalBlockedTime = 0;

long physicalInputDataSize = 0;
long physicalInputPositions = 0;

long internalNetworkInputDataSize = 0;
long internalNetworkInputPositions = 0;

long rawInputDataSize = 0;
long rawInputPositions = 0;

Expand Down Expand Up @@ -421,6 +445,12 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
blockedReasons.addAll(taskStats.getBlockedReasons());
}

physicalInputDataSize += taskStats.getPhysicalInputDataSize().toBytes();
physicalInputPositions += taskStats.getPhysicalInputPositions();

internalNetworkInputDataSize += taskStats.getInternalNetworkInputDataSize().toBytes();
internalNetworkInputPositions += taskStats.getInternalNetworkInputPositions();

rawInputDataSize += taskStats.getRawInputDataSize().toBytes();
rawInputPositions += taskStats.getRawInputPositions();

Expand Down Expand Up @@ -473,8 +503,15 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
fullyBlocked && runningTasks > 0,
blockedReasons,

succinctBytes(physicalInputDataSize),
physicalInputPositions,

succinctBytes(internalNetworkInputDataSize),
internalNetworkInputPositions,

succinctBytes(rawInputDataSize),
rawInputPositions,

succinctBytes(processedInputDataSize),
processedInputPositions,
succinctBytes(bufferedDataSize),
Expand Down

0 comments on commit bacf59f

Please sign in to comment.