Skip to content

Commit

Permalink
Rebased onto master, still need to look at tests
Browse files Browse the repository at this point in the history
  • Loading branch information
garyluoex committed Apr 13, 2017
1 parent df64969 commit f5e3d97
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.yahoo.bard.webservice.data.metric.MetricColumn;
import com.yahoo.bard.webservice.table.Column;
import com.yahoo.bard.webservice.table.PhysicalTable;
import com.yahoo.bard.webservice.table.resolver.DataSourceConstraint;
import com.yahoo.bard.webservice.table.resolver.PhysicalDataSourceConstraint;
import com.yahoo.bard.webservice.util.IntervalUtils;
import com.yahoo.bard.webservice.util.SimplifiedIntervalList;
import com.yahoo.bard.webservice.util.Utils;
Expand All @@ -19,7 +19,6 @@

import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -65,51 +64,37 @@ public class MetricUnionAvailability implements Availability {
private static final Logger LOG = LoggerFactory.getLogger(MetricUnionAvailability.class);

private final Set<TableName> dataSourceNames;
private final Set<MetricColumn> metricColumns;
private final Map<Availability, Set<MetricColumn>> availabilitiesToAvailableColumns;
private final Set<String> metricNames;
private final Map<Availability, Set<String>> availabilitiesToMetricNames;

/**
* Constructor.
*
* @param physicalTables A set of <tt>PhysicalTable</tt>s whose Dimension schemas are the same and
* the Metric Schemas are fully different(i.e. no overlap) on every table
* the Metric columns are unique(i.e. no overlap) on every table
* @param columns The set of all configured columns, including dimension columns, that metric union availability
* will respond with
*/
public MetricUnionAvailability(@NotNull Set<PhysicalTable> physicalTables, @NotNull Set<Column> columns) {
metricColumns = Utils.getSubsetByType(columns, MetricColumn.class);
metricNames = Utils.getSubsetByType(columns, MetricColumn.class).stream()
.map(MetricColumn::getName)
.collect(Collectors.toSet());

// get a map from availability to its available metric columns intersected with configured metric columns
// i.e. metricColumns
availabilitiesToAvailableColumns = physicalTables.stream()
// map each metric to its corresponding availability object
availabilitiesToMetricNames = physicalTables.stream()
.map(PhysicalTable::getAvailability)
.collect(
Collectors.toMap(
Function.identity(),
availability ->
Sets.intersection(
Utils.getSubsetByType(
availability.getAllAvailableIntervals().keySet(),
MetricColumn.class
),
metricColumns
availability.getAllAvailableIntervals().keySet(),
metricNames
)
)
);

// validate metric uniqueness such that
// each table's underlying datasource schema don't have repeated metric column
Map<MetricColumn, Set<Availability>> duplicates = getDuplicateValues(availabilitiesToAvailableColumns);
if (!duplicates.isEmpty()) {
String message = String.format(
"While constructing MetricUnionAvailability, Metric columns are not unique - %s",
duplicates
);
LOG.error(message);
throw new IllegalArgumentException(message);
}

dataSourceNames = availabilitiesToAvailableColumns.keySet().stream()
dataSourceNames = availabilitiesToMetricNames.keySet().stream()
.map(Availability::getDataSourceNames)
.flatMap(Set::stream)
.collect(
Expand All @@ -118,6 +103,18 @@ public MetricUnionAvailability(@NotNull Set<PhysicalTable> physicalTables, @NotN
Collections::unmodifiableSet
)
);

// validate metric uniqueness such that
// each table's underlying datasource schema don't have repeated metric column
if (!isMetricUnique(availabilitiesToMetricNames)) {
String message = String.format(
"Metric columns must be unique across the metric union data sources, but duplicate was found " +
"across the following data sources: %s",
getDataSourceNames().stream().map(TableName::asName).collect(Collectors.joining(", "))
);
LOG.error(message);
throw new RuntimeException(message);
}
}

@Override
Expand All @@ -134,9 +131,9 @@ public Set<TableName> getDataSourceNames() {
* @return a map of column to all of its available intervals in union
*/
@Override
public Map<Column, List<Interval>> getAllAvailableIntervals() {
public Map<String, List<Interval>> getAllAvailableIntervals() {
// get all availabilities take available interval maps from all availabilities and merge the maps together
return availabilitiesToAvailableColumns.keySet().stream()
return availabilitiesToMetricNames.keySet().stream()
.map(Availability::getAllAvailableIntervals)
.map(Map::entrySet)
.flatMap(Set::stream)
Expand All @@ -150,7 +147,7 @@ public Map<Column, List<Interval>> getAllAvailableIntervals() {
}

@Override
public SimplifiedIntervalList getAvailableIntervals(DataSourceConstraint constraints) {
public SimplifiedIntervalList getAvailableIntervals(PhysicalDataSourceConstraint constraints) {
return new SimplifiedIntervalList(
constructSubConstraint(constraints).entrySet().stream()
.map(entry -> entry.getKey().getAvailableIntervals(entry.getValue()))
Expand All @@ -165,59 +162,26 @@ public String toString() {
dataSourceNames.stream()
.map(TableName::asName)
.collect(Collectors.joining(", ")),
metricColumns.stream()
.map(Column::getName)
metricNames.stream()
.collect(Collectors.joining(", "))
);
}

/**
* Returns duplicate values of <tt>MetricColumn</tt>s in a map of <tt>Availability</tt>
* to a set of <tt>MetricColumn</tt>'s contained in that <tt>Availability</tt>.
* <p>
* The return value is a map of duplicate <tt>MetricColumn</tt> to all <tt>Availabilities</tt>' that contains
* this <tt>MetricColumn</tt>
* <p>
* For example, when a map of {availability1: [metricCol1, metricCol2], availability2: [metricCol1]} is passed to
* this method, the method returns {metricCol1: [availability1, availability2]}
* Validates whether the metric columns are unique accross each of the underlying datasource.
*
* @param availabilityToAvailableColumns A map from <tt>Availability</tt> to set of <tt>MetricColumn</tt>
* @param availabilityToMetricNames A map from <tt>Availability</tt> to set of <tt>MetricColumn</tt>
* contained in that <tt>Availability</tt>
*
* @return duplicate values of <tt>MetricColumn</tt>s
* @return true if metric is unique across data sources, false otherwise
*/
private static Map<MetricColumn, Set<Availability>> getDuplicateValues(
Map<Availability, Set<MetricColumn>> availabilityToAvailableColumns
) {
Map<MetricColumn, Set<Availability>> metricColumnSetMap = new HashMap<>();

for (Map.Entry<Availability, Set<MetricColumn>> entry : availabilityToAvailableColumns.entrySet()) {
Availability availability = entry.getKey();
for (MetricColumn metricColumn : entry.getValue()) {
metricColumnSetMap.put(
metricColumn,
Sets.union(
metricColumnSetMap.getOrDefault(metricColumn, new HashSet<>()),
Sets.newHashSet(availability)
)
);
}
}
private static boolean isMetricUnique(Map<Availability, Set<String>> availabilityToMetricNames) {
Set<String> uniqueMetrics = new HashSet<>();

// For example, when a map of {availability1: [metricCol1, metricCol2], availability2: [metricCol1]} is
// passed to this method, at this point,
// "metricColumnSetMap" = {metricCol1: [availability1, availability2], {metricCol2: [availability1]}}
// The duplicate metric columns is "metricCol1" which can be selected by knowing that the value of "metricCol1"
// has a collection whose size is greater than 1; with that, we return
// {metricCol1: [availability1, availability2]}
return metricColumnSetMap.entrySet().stream()
.filter(entry -> entry.getValue().size() > 1)
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
)
);
return availabilityToMetricNames.entrySet().stream()
.map(Map.Entry::getValue)
.flatMap(Set::stream)
.allMatch(uniqueMetrics::add);
}

/**
Expand All @@ -233,20 +197,17 @@ private static Map<MetricColumn, Set<Availability>> getDuplicateValues(
*
* @return A map from <tt>Availability</tt> to <tt>DataSourceConstraint</tt> with non-empty metric names
*/
private Map<Availability, DataSourceConstraint> constructSubConstraint(DataSourceConstraint constraint) {
return availabilitiesToAvailableColumns.entrySet().stream()
private Map<Availability, PhysicalDataSourceConstraint> constructSubConstraint(
PhysicalDataSourceConstraint constraint
) {
return availabilitiesToMetricNames.entrySet().stream()
.map(entry ->
new AbstractMap.SimpleEntry<>(
entry.getKey(),
constraint.withMetricIntersection(entry.getValue())
)
)
.filter(entry -> !entry.getValue().getMetricNames().isEmpty())
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue
)
);
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public SimplifiedIntervalList getAvailableIntervals(PhysicalDataSourceConstraint

@Override
public String toString() {
return String.format("PermissiveAvailability with table name = %s and Configured columns = [%s]",
getName().asName(),
getColumnNames()
return String.format("PermissiveAvailability with table name = %s",
getName().asName()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package com.yahoo.bard.webservice.table.resolver;

import com.yahoo.bard.webservice.data.dimension.Dimension;
import com.yahoo.bard.webservice.data.metric.MetricColumn;
import com.yahoo.bard.webservice.druid.model.query.DruidAggregationQuery;
import com.yahoo.bard.webservice.web.ApiFilter;
import com.yahoo.bard.webservice.web.DataApiRequest;
Expand Down Expand Up @@ -87,6 +86,22 @@ protected DataSourceConstraint(
this.apiFilters = apiFilters;
}

/**
* Copy Constructor.
*
* @param dataSourceConstraint The data source constraint to copy from
*/
protected DataSourceConstraint(DataSourceConstraint dataSourceConstraint) {
this.requestDimensions = dataSourceConstraint.getRequestDimensions();
this.filterDimensions = dataSourceConstraint.getFilterDimensions();
this.metricDimensions = dataSourceConstraint.getMetricDimensions();
this.metricNames = dataSourceConstraint.getMetricNames();
this.apiFilters = dataSourceConstraint.getApiFilters();
this.allDimensions = dataSourceConstraint.getAllDimensions();
this.allDimensionNames = dataSourceConstraint.getAllDimensionNames();
this.allColumnNames = dataSourceConstraint.getAllColumnNames();
}

public Set<Dimension> getRequestDimensions() {
return requestDimensions;
}
Expand Down Expand Up @@ -126,19 +141,18 @@ public Map<Dimension, Set<ApiFilter>> getApiFilters() {
* The new set of metric names will be an intersection between old metric names and
* a user provided set of metric names
*
* @param metricColumns The set of metric columns that are to be intersected with metric names in
* @param metricNames The set of metric names that are to be intersected with metric names in
* <tt>this DataSourceConstraint</tt>
*
* @return the new <tt>DataSourceConstraint</tt> instance with a new subset of metric names
*/
public DataSourceConstraint withMetricIntersection(Set<MetricColumn> metricColumns) {
public DataSourceConstraint withMetricIntersection(Set<String> metricNames) {
return new DataSourceConstraint(
requestDimensions,
filterDimensions,
metricDimensions,
metricColumns.stream()
.map(MetricColumn::getName)
.filter(metricNames::contains)
metricNames.stream()
.filter(this.metricNames::contains)
.collect(Collectors.toSet()),
allDimensions,
allDimensionNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.table.resolver;

import com.yahoo.bard.webservice.data.dimension.Dimension;
import com.yahoo.bard.webservice.table.PhysicalTableSchema;
import com.yahoo.bard.webservice.web.ApiFilter;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -33,7 +37,59 @@ public PhysicalDataSourceConstraint(
this.allColumnPhysicalNames = dataSourceConstraint.getAllColumnNames().stream()
.filter(schemaColumnNames::contains)
.map(physicalTableSchema::getPhysicalColumnName)
.collect(Collectors.toSet());
.collect(Collectors.collectingAndThen(Collectors.toSet(), Collections::unmodifiableSet));
}

/**
* Constructor, use with care, beware of caching behavior.
*
* @param dataSourceConstraint Data source constraint containing all the column names as logical names
* @param allColumnPhysicalNames The physical names of the columns
*/
private PhysicalDataSourceConstraint(
@NotNull DataSourceConstraint dataSourceConstraint,
@NotNull Set<String> allColumnPhysicalNames
) {
super(dataSourceConstraint);
this.allColumnPhysicalNames = allColumnPhysicalNames;

}

/**
* Constructor.
*
* @param requestDimensions Dimensions contained in request
* @param filterDimensions Filtered dimensions
* @param metricDimensions Metric related dimensions
* @param metricNames Names of metrics
* @param allDimensions Set of all dimension objects
* @param allDimensionNames Set of all dimension names
* @param allColumnNames Set of all column names
* @param apiFilters Map of dimension to its set of API filters
* @param allColumnPhysicalNames Set of all column physical names
*/
protected PhysicalDataSourceConstraint(
Set<Dimension> requestDimensions,
Set<Dimension> filterDimensions,
Set<Dimension> metricDimensions,
Set<String> metricNames,
Set<Dimension> allDimensions,
Set<String> allDimensionNames,
Set<String> allColumnNames,
Map<Dimension, Set<ApiFilter>> apiFilters,
Set<String> allColumnPhysicalNames
) {
super(
requestDimensions,
filterDimensions,
metricDimensions,
metricNames,
allDimensions,
allDimensionNames,
allColumnNames,
apiFilters
);
this.allColumnPhysicalNames = allColumnPhysicalNames;
}

/**
Expand All @@ -44,4 +100,28 @@ public PhysicalDataSourceConstraint(
public Set<String> getAllColumnPhysicalNames() {
return allColumnPhysicalNames;
}

/**
* Create a new <tt>PhysicalDataSourceConstraint</tt> instance with a new subset of metric names.
* <p>
* The new set of metric names will be an intersection between old metric names and
* a user provided set of metric names
*
* @param metricNames The set of metric columns that are to be intersected with metric names in
* <tt>this DataSourceConstraint</tt>
*
* @return the new <tt>PhysicalDataSourceConstraint</tt> instance with a new subset of metric names
*/
@Override
public PhysicalDataSourceConstraint withMetricIntersection(Set<String> metricNames) {
Set<String> nonIntersectingMetric = getMetricNames().stream()
.filter(metricName -> !metricNames.contains(metricName))
.collect(Collectors.toSet());

Set<String> resultColumnNames = this.allColumnPhysicalNames.stream()
.filter(name -> !nonIntersectingMetric.contains(name))
.collect(Collectors.toSet());

return new PhysicalDataSourceConstraint(super.withMetricIntersection(metricNames), resultColumnNames);
}
}
Loading

0 comments on commit f5e3d97

Please sign in to comment.