Skip to content

Commit

Permalink
Integrate new composite table and availability core structure into Fili
Browse files Browse the repository at this point in the history
  • Loading branch information
garyluoex committed Feb 23, 2017
1 parent 2dba84e commit 24db2d4
Show file tree
Hide file tree
Showing 42 changed files with 449 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,12 @@ protected void configure() {
//Initialize the metrics filter helper
FieldConverterSupplier.metricsFilterSetBuilder = initializeMetricsFilterSetBuilder();

// Build the datasource metadata service containing the segment infos
DataSourceMetadataService dataSourceMetadataService = buildDataSourceMetadataService();
bind(dataSourceMetadataService).to(DataSourceMetadataService.class);

// Build the configuration loader and load configuration
ConfigurationLoader loader = buildConfigurationLoader();
ConfigurationLoader loader = buildConfigurationLoader(dataSourceMetadataService);
loader.load();

// Bind the configuration dictionaries
Expand Down Expand Up @@ -273,10 +277,6 @@ protected void configure() {
bind(PartialDataHandler.class).to(PartialDataHandler.class);
bind(getVolatileIntervalsService()).to(VolatileIntervalsService.class);

DataSourceMetadataService dataSourceMetadataService = buildDataSourceMetadataService();

bind(dataSourceMetadataService).to(DataSourceMetadataService.class);

QuerySigningService<?> querySigningService = buildQuerySigningService(
loader.getPhysicalTableDictionary(),
dataSourceMetadataService
Expand Down Expand Up @@ -833,13 +833,15 @@ protected final List<FeatureFlag> collectFeatureFlags(List<Class<? extends Featu
/**
* Build an application specific configuration loader initialized with pluggable loaders.
*
* @param metadataService datasource metadata service containing segments for building table
*
* @return A configuration loader instance
*/
protected final ConfigurationLoader buildConfigurationLoader() {
protected final ConfigurationLoader buildConfigurationLoader(DataSourceMetadataService metadataService) {
DimensionLoader dimensionLoader = getDimensionLoader();
TableLoader tableLoader = getTableLoader();
MetricLoader metricLoader = getMetricLoader();
return buildConfigurationLoader(dimensionLoader, metricLoader, tableLoader);
return buildConfigurationLoader(dimensionLoader, metricLoader, tableLoader, metadataService);
}

/**
Expand All @@ -848,15 +850,17 @@ protected final ConfigurationLoader buildConfigurationLoader() {
* @param dimensionLoader A dimension loader
* @param metricLoader A metric loader
* @param tableLoader A table loader
* @param metadataService datasource metadata service containing segments for building table
*
* @return A configurationLoader instance
*/
protected ConfigurationLoader buildConfigurationLoader(
DimensionLoader dimensionLoader,
MetricLoader metricLoader,
TableLoader tableLoader
TableLoader tableLoader,
DataSourceMetadataService metadataService
) {
return new ConfigurationLoader(dimensionLoader, metricLoader, tableLoader);
return new ConfigurationLoader(dimensionLoader, metricLoader, tableLoader, metadataService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public SimplifiedIntervalList findMissingTimeGrainIntervals(
public SimplifiedIntervalList getAvailability(PhysicalTable physicalTable, Set<String> columnNames) {
return PERMISSIVE_COLUMN_AVAILABILITY.isOn() ?
getUnionSubintervalsForColumns(columnNames, physicalTable) :
getIntersectSubintervalsForColumns(columnNames, physicalTable);
getIntersectSubintervalsForColumns(columnNames, physicalTable);
}

/**
Expand All @@ -139,9 +139,9 @@ public SimplifiedIntervalList getIntersectSubintervalsForColumns(
) {
return columnNames.isEmpty() ?
new SimplifiedIntervalList() :
new SimplifiedIntervalList(columnNames.stream()
.map(physicalTable::getIntervalsByColumnName)
.reduce(null, IntervalUtils::getOverlappingSubintervals));
new SimplifiedIntervalList(columnNames.stream()
.map(physicalTable::getIntervalsByColumnName)
.reduce(null, IntervalUtils::getOverlappingSubintervals));
}

/**
Expand All @@ -159,9 +159,9 @@ public SimplifiedIntervalList getUnionSubintervalsForColumns(
) {
return columnNames.isEmpty() ?
new SimplifiedIntervalList() :
columnNames.stream()
.map(physicalTable::getIntervalsByColumnName)
.flatMap(Set::stream)
.collect(SimplifiedIntervalList.getCollector());
columnNames.stream()
.map(physicalTable::getIntervalsByColumnName)
.flatMap(Set::stream)
.collect(SimplifiedIntervalList.getCollector());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.yahoo.bard.webservice.data.config.table.TableLoader;
import com.yahoo.bard.webservice.data.dimension.DimensionDictionary;
import com.yahoo.bard.webservice.data.metric.MetricDictionary;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.table.LogicalTableDictionary;
import com.yahoo.bard.webservice.table.PhysicalTableDictionary;

Expand All @@ -33,6 +34,8 @@ public class ConfigurationLoader {
protected final TableLoader tableLoader;
protected final MetricLoader metricLoader;

protected final DataSourceMetadataService metadataService;

// Default JodaTime zone to UTC
public static final String TIMEZONE = SYSTEM_CONFIG.getStringProperty(
SYSTEM_CONFIG.getPackageVariableName("timezone"),
Expand All @@ -45,9 +48,15 @@ public class ConfigurationLoader {
* @param dimensionLoader DimensionLoader to load dimensions from
* @param metricLoader MetricLoader to load metrics from
* @param tableLoader TableLoader to load tables from
* @param metadataService datasource metadata service containing segments for building table
*/
@Inject
public ConfigurationLoader(DimensionLoader dimensionLoader, MetricLoader metricLoader, TableLoader tableLoader) {
public ConfigurationLoader(
DimensionLoader dimensionLoader,
MetricLoader metricLoader,
TableLoader tableLoader,
DataSourceMetadataService metadataService
) {
DateTimeZone.setDefault(DateTimeZone.forID(TIMEZONE));

// Set the max lucene query clauses as high as it can go
Expand All @@ -56,6 +65,7 @@ public ConfigurationLoader(DimensionLoader dimensionLoader, MetricLoader metricL
this.dimensionLoader = dimensionLoader;
this.metricLoader = metricLoader;
this.tableLoader = tableLoader;
this.metadataService = metadataService;
}

/**
Expand All @@ -64,7 +74,7 @@ public ConfigurationLoader(DimensionLoader dimensionLoader, MetricLoader metricL
public void load() {
dimensionLoader.loadDimensionDictionary(dictionaries.getDimensionDictionary());
metricLoader.loadMetricDictionary(dictionaries.getMetricDictionary());
tableLoader.loadTableDictionary(dictionaries);
tableLoader.loadTableDictionary(dictionaries, metadataService);

LOG.info("Initialized ConfigurationLoader");
LOG.info(dictionaries.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.yahoo.bard.webservice.data.metric.MetricColumn;
import com.yahoo.bard.webservice.data.metric.MetricDictionary;
import com.yahoo.bard.webservice.druid.model.query.Granularity;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.table.Column;
import com.yahoo.bard.webservice.table.ConcretePhysicalTable;
import com.yahoo.bard.webservice.table.LogicalTable;
Expand Down Expand Up @@ -40,6 +41,8 @@ public abstract class BaseTableLoader implements TableLoader {

protected final DateTimeZone defaultTimeZone;

private DataSourceMetadataService metadataService;

/**
* A table loader using a time context and a default time zone.
*
Expand All @@ -56,9 +59,22 @@ protected BaseTableLoader() {
this(DateTimeZone.UTC);
}

@Override
/**
* Load user configured tables into resource dictionary.
*
* @param dictionaries dictionary to be loaded with user configured tables
*/
public abstract void loadTableDictionary(ResourceDictionaries dictionaries);

@Override
public void loadTableDictionary(
ResourceDictionaries dictionaries,
DataSourceMetadataService metadataService
) {
this.metadataService = metadataService;
this.loadTableDictionary(dictionaries);
}

/**
* Builds a table group.
* <p>
Expand Down Expand Up @@ -277,7 +293,11 @@ protected PhysicalTable loadPhysicalTable(
PhysicalTable existingTable = dictionaries.getPhysicalDictionary().get(definition.getName().asName());
if (existingTable == null) {
// Build the physical table
existingTable = buildPhysicalTable(definition, metricNames, dictionaries.getDimensionDictionary());
existingTable = buildPhysicalTable(
definition,
metricNames,
dictionaries.getDimensionDictionary()
);

// Add the table to the dictionary
LOG.debug("Physical table: {} \n\n" + "Cache: {} ",
Expand Down Expand Up @@ -324,7 +344,8 @@ protected PhysicalTable buildPhysicalTable(
definition.getName(),
definition.getGrain(),
columns,
definition.getLogicalToPhysicalNames()
definition.getLogicalToPhysicalNames(),
metadataService
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.yahoo.bard.webservice.data.config.table;

import com.yahoo.bard.webservice.data.config.ResourceDictionaries;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;

/**
* Defines the core interactions to load logical tables, physical tables, and table groups into the resource
Expand All @@ -14,7 +15,8 @@ public interface TableLoader {
* Load all of the logical tables, their table groups, and the physical tables for those groups.
*
* @param dictionaries The resource dictionaries that will be loaded with table configuration
* @param metadataService datasource metadata service containing segments for building table
*/
void loadTableDictionary(ResourceDictionaries dictionaries);
void loadTableDictionary(ResourceDictionaries dictionaries, DataSourceMetadataService metadataService);

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.druid.timeline.DataSegment;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -139,6 +140,9 @@ public Set<SortedMap<DateTime, Map<String, SegmentInfo>>> getTableSegments(Set<T
* @return A map of column name to a set of avialable intervals
*/
public Map<String, Set<Interval>> getAvailableIntervalsByTable(TableName physicalTableName) {
if (allSegmentsByColumn.get(physicalTableName) == null) {
return Collections.emptyMap();
}
return allSegmentsByColumn.get(physicalTableName).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
import com.yahoo.bard.webservice.druid.model.query.Granularity;
import com.yahoo.bard.webservice.metadata.SegmentMetadata;
import com.yahoo.bard.webservice.table.availability.Availability;
import com.yahoo.bard.webservice.table.availability.ImmutableAvailability;
import com.yahoo.bard.webservice.util.IntervalUtils;

import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.List;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -69,7 +69,7 @@ public BasePhysicalTable(
@Override
public DateTime getTableAlignment() {
return schema.getGranularity().roundFloor(
IntervalUtils.firstMoment(getAvailability().getAvailableIntervals().values()).orElse(new DateTime())
IntervalUtils.firstMoment(getAvailability().getAllAvailableIntervals().values()).orElse(new DateTime())
);
}

Expand Down Expand Up @@ -110,7 +110,9 @@ public PhysicalTableSchema getSchema() {
*/
@Override
public Set<Interval> getIntervalsByColumnName(String columnName) {
return new HashSet<>(getAvailability().getIntervalsByColumnName(columnName));
Set<Interval> availableIntervals = getAvailability().getAvailableIntervalsByName(columnName);
return Objects.isNull(availableIntervals) ? Collections.emptySet() :
getAvailability().getAvailableIntervalsByName(columnName);
}

/**
Expand Down Expand Up @@ -154,9 +156,12 @@ public Set<Column> getColumns() {
*
*/
@Override
public Map<Column, List<Interval>> getAvailableIntervals() {
return getAvailability().getAvailableIntervals();
public Map<Column, Set<Interval>> getAvailableIntervals() {
return getAvailability().getAllAvailableIntervals().entrySet().stream().filter(
entry -> schema.containsPhysicalName(entry.getKey())
).collect(Collectors.toMap(entry -> schema.getColumn(entry.getKey()).get(), Map.Entry::getValue));
}

/**
* Get the time grain from granularity.
*
Expand All @@ -180,17 +185,12 @@ public Granularity getGranularity() {
*
* @param segmentMetadata A map of names of metrics and sets of intervals over which they are valid
* @param dimensionDictionary The dimension dictionary from which to look up dimensions by name
*
* @deprecated no longer needed since we are using `DataSourceMetadataService` to store segment info
*/
@Deprecated
public void resetColumns(SegmentMetadata segmentMetadata, DimensionDictionary dimensionDictionary) {
Map<String, Set<Interval>> dimensionIntervals = segmentMetadata.getDimensionIntervals();
Map<String, Set<Interval>> metricIntervals = segmentMetadata.getMetricIntervals();
setAvailability(new ImmutableAvailability(
name,
schema,
dimensionIntervals,
metricIntervals,
dimensionDictionary
));
return;
}

public void setAvailability(Availability availability) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import com.yahoo.bard.webservice.data.config.names.TableName;
import com.yahoo.bard.webservice.data.time.ZonedTimeGrain;
import com.yahoo.bard.webservice.table.availability.ImmutableAvailability;
import com.yahoo.bard.webservice.metadata.DataSourceMetadataService;
import com.yahoo.bard.webservice.table.availability.ConcreteAvailability;

import java.util.Collections;
import java.util.Map;

import javax.validation.constraints.NotNull;
Expand All @@ -22,19 +22,21 @@ public class ConcretePhysicalTable extends BasePhysicalTable {
* @param timeGrain time grain of the table
* @param columns The columns for this table
* @param logicalToPhysicalColumnNames Mappings from logical to physical names
* @param metadataService datasource metadata service containing segments for building table
*/
public ConcretePhysicalTable(
@NotNull TableName name,
@NotNull ZonedTimeGrain timeGrain,
@NotNull Iterable<Column> columns,
@NotNull Map<String, String> logicalToPhysicalColumnNames
@NotNull Map<String, String> logicalToPhysicalColumnNames,
@NotNull DataSourceMetadataService metadataService
) {
super(
name,
timeGrain,
columns,
logicalToPhysicalColumnNames,
new ImmutableAvailability(name, Collections.emptyMap())
new ConcreteAvailability(name, metadataService)
);
}

Expand All @@ -45,6 +47,7 @@ public ConcretePhysicalTable(
* @param timeGrain time grain of the table
* @param columns The columns for this table
* @param logicalToPhysicalColumnNames Mappings from logical to physical names
* @param metadataService datasource metadata service containing segments for building table
*
* @deprecated Should use constructor with TableName instead of String as table name
*/
Expand All @@ -53,9 +56,10 @@ public ConcretePhysicalTable(
@NotNull String name,
@NotNull ZonedTimeGrain timeGrain,
@NotNull Iterable<Column> columns,
@NotNull Map<String, String> logicalToPhysicalColumnNames
@NotNull Map<String, String> logicalToPhysicalColumnNames,
@NotNull DataSourceMetadataService metadataService
) {
this(TableName.of(name), timeGrain, columns, logicalToPhysicalColumnNames);
this(TableName.of(name), timeGrain, columns, logicalToPhysicalColumnNames, metadataService);
}

public String getFactTableName() {
Expand Down
Loading

0 comments on commit 24db2d4

Please sign in to comment.