Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3. CompositePhsyicalTable Core Components Refactor #179

Merged
merged 8 commits into from
Mar 27, 2017
26 changes: 21 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ pull request if there was one.
Current
-------
### Added:
- [Refactor DatasourceMetadataService to fit composite table needs](https://github.com/yahoo/fili/pull/173)
- [CompositePhsyicalTable Core Components Refactor](https://github.com/yahoo/fili/pull/179)
* Added `ConcretePhysicalTable` and `ConcreteAvailability` to model table in druid datasource and its availabillity in the new table availability structure
* Added class variable for `DataSourceMetadataService` and `ConfigurationLoader` into `AbstractBinderFactory` for application to access

- [Refactor DatasourceMetaDataService to fit composite table needs](https://github.com/yahoo/fili/pull/173)
* `DataSourceMetadataService` also stores interval data from segment data as intervals by column name map and provides method `getAvailableIntervalsByTable` to retrieve it

- [QueryPlanningConstraint and DataSourceConstraint](https://github.com/yahoo/fili/pull/169)
* Added `QueryPlanningConstraint` to replace current interface of Matchers and Resolvers arguments during query planning
* Added `DataSourceConstraint` to allow implementation of `PartitionedFactTable`'s availability in the near future


- [Major refactor for availability and schemas and tables](https://github.com/yahoo/fili/pull/165)
* `ImmutableAvailability` - provides immutable, typed replacement for maps of column availabilities
* New Table Implementations:
Expand All @@ -29,11 +32,10 @@ Current
* `ApiName`, `TableName`: Added static factory from String to Name
* `ErrorMessageFormat` for errors during `ResultSetMapper` cycle


- [Added default base class for all dimension types](https://github.com/yahoo/fili/pull/177)
* Added base classes `DefaultKeyValueStoreDimensionConfig`, `DefaultLookupDimensionConfig` and `DefaultRegisteredLookupDimensionConfig`
to create default dimensions.

- [dateTime based sort feature for the final ResultSet added](https://github.com/yahoo/fili/pull/178)
* Now we support dateTime column based sort in ASC or DESC order.
* Added `DateTimeSortMapper` to sort the time buckets and `DateTimeSortRequestHandler` to inject to the workflow
Expand All @@ -49,6 +51,10 @@ Current

### Changed:

- [CompositePhsyicalTable Core Components Refactor](https://github.com/yahoo/fili/pull/179)
* `TableLoader` now takes an additional constructor argument `DataSourceMetadataService` for creating tables
* `findMissingRequestTimeGrainIntervals` method in `PartialDataHandler` now takes `DataSourceConstraint`

- [Restored flexibility about columns for query from DruidResponseParser](https://github.com/yahoo/fili/pull/198)
* Immutable schemas prevented custom query types from changing `ResultSetSchema` columns.
* Columns are now sourced from `DruidResponseParser` and default implemented on `DruidAggregationQuery`
Expand All @@ -61,7 +67,6 @@ Current
* `QueryPlanningConstraint` replaces current interface of Matchers and Resolvers `DataApiRequest` and `TemplateDruidQuery` arguments during query planning
* Modified `findMissingTimeGrainIntervals` method in `PartialDataHandler` to take a set of columns instead of `DataApiRequest` and `DruidAggregationQuery`


- [Major refactor for availability and schemas and tables](https://github.com/yahoo/fili/pull/165)
* `Schema` and `Table` became interfaces
* `Table` has-a `Schema`
Expand Down Expand Up @@ -106,6 +111,9 @@ Current

### Deprecated:

- [CompositePhsyicalTable Core Components Refactor](https://github.com/yahoo/fili/pull/179)
* Deprecated `setAvailability` method on `BasePhysicalTable` to discourage using it for testing, should refine testing strategy to avoid it

- [`RequestLog::stopMostRecentTimer` has been deprecated](https://github.com/yahoo/fili/pull/143)
- This method is a part of the infrastructure to support the recently
deprecated `RequestLog::switchTiming`.
Expand Down Expand Up @@ -134,6 +142,14 @@ Current


### Removed:
- [CompositePhsyicalTable Core Components Refactor](https://github.com/yahoo/fili/pull/179)
* Removed deprecated method `findMissingRequestTimeGrainIntervals` from `PartialDataHandler`
* Removed `permissive_column_availability_enabled` feature flag support and corresponding functionality in `PartialDataHandler`, permissive availability will be a table configuration
* Removed `getIntersectSubintervalsForColumns` and `getUnionSubintervalsForColumns` from `PartialDataHandler` since the logic is pushed into `Availability` now
* Removed `getIntervalsByColumnName`, `resetColumns` and `hasLogicalMapping` methods in `PhysicalTable` since no longer needed with the availability structure
* Removed `getAvailability` method on `PartialDataHandler` since the logic is pushed into `Availability`
* Removed `SegmentMetadataLoader` and corresponding tests class which is deprecated in both fili and the corresponding endpoint in druid, use `DataSourceMetadataLoader` instead
* Removed `SegmentMetadataLoaderHealthCheck` and `SegmentMetadataLoaderHealthCheckSpec` classes since `SegmentMetadataLoader` is not longer available

- [Major refactor for availability and schemas and tables](https://github.com/yahoo/fili/pull/165)
* Removed `ZonedSchema` (all methods moved to child class ResultSetSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.yahoo.bard.webservice.application.healthchecks.AllDimensionsLoadedHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DataSourceMetadataLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.DruidDimensionsLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.SegmentMetadataLoaderHealthCheck;
import com.yahoo.bard.webservice.application.healthchecks.VersionHealthCheck;
import com.yahoo.bard.webservice.async.broadcastchannels.BroadcastChannel;
import com.yahoo.bard.webservice.async.broadcastchannels.SimpleBroadcastChannel;
Expand Down Expand Up @@ -72,7 +71,6 @@
import com.yahoo.bard.webservice.metadata.QuerySigningService;
import com.yahoo.bard.webservice.metadata.RequestedIntervalsFunction;
import com.yahoo.bard.webservice.metadata.SegmentIntervalsHashIdGenerator;
import com.yahoo.bard.webservice.metadata.SegmentMetadataLoader;
import com.yahoo.bard.webservice.table.LogicalTableDictionary;
import com.yahoo.bard.webservice.table.PhysicalTableDictionary;
import com.yahoo.bard.webservice.table.resolver.DefaultPhysicalTableResolver;
Expand Down Expand Up @@ -118,6 +116,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand All @@ -137,7 +136,6 @@ public abstract class AbstractBinderFactory implements BinderFactory {
private static final Logger LOG = LoggerFactory.getLogger(AbstractBinderFactory.class);
private static final SystemConfig SYSTEM_CONFIG = SystemConfigProvider.getInstance();

public static final String HEALTH_CHECK_NAME_SEGMENT_METADATA = "segment metadata loader";
public static final String HEALTH_CHECK_NAME_DATASOURCE_METADATA = "datasource metadata loader";
public static final String HEALTH_CHECK_NAME_DRUID_DIM_LOADER = "druid dimensions loader";
public static final String HEALTH_CHECK_VERSION = "version";
Expand Down Expand Up @@ -167,10 +165,17 @@ public abstract class AbstractBinderFactory implements BinderFactory {
SYSTEM_CONFIG.getPackageVariableName("loader_scheduler_thread_pool_size"),
LOADER_SCHEDULER_THREAD_POOL_SIZE_DEFAULT
);

public static final String DEPRECATED_PERMISSIVE_AVAILABILITY_FLAG = SYSTEM_CONFIG.getPackageVariableName(
"permissive_column_availability_enabled");

public static final String SYSTEM_CONFIG_TIMEZONE_KEY = "timezone";

private ObjectMappersSuite objectMappers;

private DataSourceMetadataService dataSourceMetadataService;
private ConfigurationLoader loader;

private final TaskScheduler loaderScheduler = new TaskScheduler(LOADER_SCHEDULER_THREAD_POOL_SIZE);

/**
Expand Down Expand Up @@ -224,10 +229,13 @@ protected void configure() {
FieldConverterSupplier.sketchConverter = initializeSketchConverter();

//Initialize the metrics filter helper
FieldConverterSupplier.metricsFilterSetBuilder = initializeMetricsFilterSetBuilder();
FieldConverterSupplier.metricsFilterSetBuilder = initializeMetricsFilterSetBuilder();

// Build the datasource metadata service containing the data segments
bind(getDataSourceMetadataService()).to(DataSourceMetadataService.class);

// Build the configuration loader and load configuration
ConfigurationLoader loader = buildConfigurationLoader();
loader = getConfigurationLoader();
loader.load();

// Bind the configuration dictionaries
Expand All @@ -240,23 +248,28 @@ protected void configure() {
// Bind the request mappers
Map<String, RequestMapper> requestMappers = getRequestMappers(loader.getDictionaries());
bind(requestMappers.getOrDefault(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE,
new DimensionApiRequestMapper(loader.getDictionaries())))
.named(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new DimensionApiRequestMapper(loader.getDictionaries())
)).named(DimensionsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(MetricsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(SlicesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(TablesApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(TablesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(TablesApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(DataApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(DataApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(DataApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

bind(requestMappers.getOrDefault(JobsApiRequest.REQUEST_MAPPER_NAMESPACE,
new NoOpRequestMapper(loader.getDictionaries())))
.named(JobsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);
new NoOpRequestMapper(loader.getDictionaries())
)).named(JobsApiRequest.REQUEST_MAPPER_NAMESPACE).to(RequestMapper.class);

// Setup end points and back end services
setupHealthChecks(healthCheckRegistry, loader.getDimensionDictionary());
Expand All @@ -273,23 +286,11 @@ protected void configure() {
bind(PartialDataHandler.class).to(PartialDataHandler.class);
bind(getVolatileIntervalsService()).to(VolatileIntervalsService.class);

DataSourceMetadataService dataSourceMetadataService = buildDataSourceMetadataService();

bind(dataSourceMetadataService).to(DataSourceMetadataService.class);

QuerySigningService<?> querySigningService = buildQuerySigningService(
loader.getPhysicalTableDictionary(),
dataSourceMetadataService
);

SegmentMetadataLoader segmentMetadataLoader = buildSegmentMetadataLoader(
uiDruidWebService,
loader.getPhysicalTableDictionary(),
loader.getDimensionDictionary(),
getMappers().getMapper()
);
setupPartialData(healthCheckRegistry, segmentMetadataLoader);

if (DRUID_COORDINATOR_METADATA.isOn()) {
DataSourceMetadataLoader dataSourceMetadataLoader = buildDataSourceMetadataLoader(
metadataDruidWebService,
Expand All @@ -300,6 +301,7 @@ protected void configure() {

setupDataSourceMetaData(healthCheckRegistry, dataSourceMetadataLoader);
}

bind(querySigningService).to(QuerySigningService.class);

bind(buildJobRowBuilder()).to(JobRowBuilder.class);
Expand All @@ -324,6 +326,11 @@ protected void configure() {
);
setupDruidDimensionsLoader(healthCheckRegistry, druidDimensionsLoader);
}
if (SYSTEM_CONFIG.getBooleanProperty(DEPRECATED_PERMISSIVE_AVAILABILITY_FLAG, false)) {
LOG.warn(
"Permissive column availability feature flag is no longer supported, please use " +
"PermissiveConcretePhysicalTable to enable permissive column availability.");
}
// Call post-binding hook to allow for additional binding
afterBinding(this);
}
Expand Down Expand Up @@ -573,12 +580,15 @@ protected DruidFilterBuilder buildDruidFilterBuilder() {
}

/**
* Create a service to store the datasource metadata.
* Get the stored metadata service, if not exist yet, create the service and store it.
*
* @return A datasource metadata service
*/
protected DataSourceMetadataService buildDataSourceMetadataService() {
return new DataSourceMetadataService();
protected DataSourceMetadataService getDataSourceMetadataService() {
if (Objects.isNull(dataSourceMetadataService)) {
dataSourceMetadataService = new DataSourceMetadataService();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should update the javadoc for this method to reflect that it should get one, which may be different from build one. In fact, it's probably OK to indicate that it will always give back the same one, making one if needed.

}
return dataSourceMetadataService;
}

/**
Expand Down Expand Up @@ -617,33 +627,6 @@ protected Map<Class, RequestedIntervalsFunction> buildSigningFunctions() {
return signingFunctions;
}

/**
* Build a segment metadata loader.
*
* @param webService The web service used by the timer to query partial data
* @param physicalTableDictionary The table to update dimensions on
* @param dimensionDictionary The dimensions to update
* @param mapper The object mapper to process segment metadata json
*
* @return A segment metadata loader
*
* @deprecated The http endpoints in Druid that this service relies on have been deprecated
*/
@Deprecated
protected SegmentMetadataLoader buildSegmentMetadataLoader(
DruidWebService webService,
PhysicalTableDictionary physicalTableDictionary,
DimensionDictionary dimensionDictionary,
ObjectMapper mapper
) {
return new SegmentMetadataLoader(
physicalTableDictionary,
dimensionDictionary,
webService,
mapper
);
}

/**
* Build a datasource metadata loader.
*
Expand Down Expand Up @@ -689,26 +672,6 @@ protected DruidDimensionsLoader buildDruidDimensionsLoader(
);
}

/**
* Schedule a segment metadata loader and register its health check.
*
* @param healthCheckRegistry The health check registry to register partial data health checks
* @param segmentMetadataLoader The segment metadata loader to use for partial data monitoring and health checks
*/
protected final void setupPartialData(
HealthCheckRegistry healthCheckRegistry,
SegmentMetadataLoader segmentMetadataLoader
) {
scheduleLoader(segmentMetadataLoader);

// Register Segment metadata loader health check
HealthCheck segMetaLoaderHealthCheck = new SegmentMetadataLoaderHealthCheck(
segmentMetadataLoader,
SEG_LOADER_HC_LAST_RUN_PERIOD_MILLIS
);
healthCheckRegistry.register(HEALTH_CHECK_NAME_SEGMENT_METADATA, segMetaLoaderHealthCheck);
}

/**
* Schedule a datasource metadata loader and register its health check.
*
Expand Down Expand Up @@ -831,15 +794,15 @@ protected final List<FeatureFlag> collectFeatureFlags(List<Class<? extends Featu
}

/**
* Build an application specific configuration loader initialized with pluggable loaders.
* Get the application specific configuration loader, if not exist already, initialize with pluggable loaders.
*
* @return A configuration loader instance
*/
protected final ConfigurationLoader buildConfigurationLoader() {
DimensionLoader dimensionLoader = getDimensionLoader();
TableLoader tableLoader = getTableLoader();
MetricLoader metricLoader = getMetricLoader();
return buildConfigurationLoader(dimensionLoader, metricLoader, tableLoader);
protected final ConfigurationLoader getConfigurationLoader() {
if (Objects.isNull(loader)) {
loader = buildConfigurationLoader(getDimensionLoader(), getMetricLoader(), getTableLoader());
}
return loader;
}

/**
Expand Down Expand Up @@ -873,7 +836,7 @@ protected DimensionLoader getDimensionLoader() {
/**
* The Builder to be used to serialize a JobRow into the the job to be returned to the user.
*
* @return A DefaultJobPayloadBuilder
* @return A DefaultJobPayloadBuilder
*/
protected JobPayloadBuilder buildJobPayloadBuilder() {
return new DefaultJobPayloadBuilder();
Expand Down Expand Up @@ -1046,13 +1009,13 @@ protected void afterBinding(AbstractBinder abstractBinder) {
protected void scheduleLoader(Loader<?> loader) {
loader.setFuture(
loader.isPeriodic ?
loaderScheduler.scheduleAtFixedRate(
loader,
loader.getDefinedDelay(),
loader.getDefinedPeriod(),
TimeUnit.MILLISECONDS
) :
loaderScheduler.schedule(loader, loader.getDefinedDelay(), TimeUnit.MILLISECONDS)
loaderScheduler.scheduleAtFixedRate(
loader,
loader.getDefinedDelay(),
loader.getDefinedPeriod(),
TimeUnit.MILLISECONDS
) :
loaderScheduler.schedule(loader, loader.getDefinedDelay(), TimeUnit.MILLISECONDS)
);
}

Expand Down
Loading