Skip to content

Commit

Permalink
update cube/aggregate with notion of a base cube to limit dupe agg ta…
Browse files Browse the repository at this point in the history
…ble creation.
  • Loading branch information
kookster committed Oct 27, 2011
1 parent c21847b commit 3ed6952
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 58 deletions.
139 changes: 81 additions & 58 deletions lib/active_warehouse/aggregate/pipelined_rolap_aggregate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ class PipelinedRolapAggregate < NoAggregate
include RolapCommon

attr_accessor :new_records_only, :new_records_dimension, :new_records_offset, :new_records_record

def sanitize(value)
result = value
if value.is_a?(Date) || value.is_a?(DateTime) || value.is_a?(Time)
result = value.to_s(:db)
end
connection.quote(result)
end

def query(*args)
options = parse_query_args(*args)

puts "#{self.class.name}.query(#{options.inspect})"

# throw an error if there is no column and/or row
cstage = options[:cstage] || 0
rstage = options[:rstage] || 0
Expand All @@ -50,7 +51,7 @@ def query(*args)
current_column_name = column_hierarchy[cstage]
full_column_name = "#{column_dimension_name}_#{current_column_name}"
end

row_dimension_name = options[:row_dimension_name] || options[:row]
row_dimension = fact_class.dimension_class(row_dimension_name)
row_hierarchy = dimension_hierarchy(row_dimension_name)
Expand All @@ -62,7 +63,7 @@ def query(*args)
current_row_name = row_hierarchy[rstage]
full_row_name = "#{row_dimension_name}_#{current_row_name}"
end

# if they try to query a hierarchy not in this cube, fallback on super (no_aggregate) query method
ach = options[:column_hierarchy_name]
cch = cube_class.dimensions_hierarchies[column_dimension_name]
Expand All @@ -71,22 +72,22 @@ def query(*args)
if ((ach && (ach != cch)) ||(arh && (arh != crh)))
return super
end

dimension_levels = {}
dimension_levels[column_dimension] = (cstage.to_s == 'all') ? 0 : [(cstage + 1), column_hierarchy.count].min
dimension_levels[row_dimension] = (rstage.to_s == 'all') ? 0 : [(rstage + 1), row_hierarchy.count].min

# build the where clause
where_clause = []

# I don't think I want these
where_clause << "#{full_column_name} is not null" unless cstage == 'all'
where_clause << "#{full_row_name} is not null" unless rstage == 'all'

# process all filters
filters.each do |key, value|
dimension_name, column = key.split('.')

dim_class = fact_class.dimension_class(dimension_name.to_sym)
dim_hierarchy = dimension_hierarchy(dimension_name.to_sym)
dim_level = dim_hierarchy.index(column.to_sym)
Expand All @@ -112,7 +113,7 @@ def query(*args)
current_level = dimension_levels[dim_class] || 0
dimension_levels[dim_class] = [current_level, (dim_level + 1)].max
end

end

end
Expand All @@ -125,41 +126,52 @@ def query(*args)
order = order_fields.join(', ')
end

# see if the levels are all > for any non-base dim
use_base = false
aggregate_options = cube_class.aggregate_options
aggregate_levels = aggregate_dimension_fields.collect{ |dim, levels|
[[(dimension_levels[dim] || 0), levels.count].min, 0].max
l = [[(dimension_levels[dim] || 0), levels.count].min, 0].max
unless create_all_level?(dim, aggregate_options) || (l > 0)
use_base = true
end
l
}

query_table_name = aggregate_rollup_name(aggregate_table_name, aggregate_levels)

# build the SQL query
sql = ''
sql << "SELECT\n"
sql << "#{full_column_name} AS '#{current_column_name}',\n"
sql << "#{full_row_name} AS '#{current_row_name}',\n"
sql << (aggregate_fields.collect{|c| "#{c.strategy_name == :avg ? :avg : :sum}(#{c.label_for_table}) AS '#{c.label}'"}.join(",\n") + "\n")
sql << "FROM #{query_table_name}\n"
sql << "WHERE (#{where_clause.join(") AND\n(")})\n"
sql << "AND (#{sanitize(conditions)})\n" if conditions
sql << "GROUP BY #{full_column_name}, #{full_row_name}\n"
sql << "ORDER BY #{order}\n" if order
sql << "LIMIT #{limit}\n" if limit

# execute the query and return the results as a CubeQueryResult object
result = ActiveWarehouse::CubeQueryResult.new(aggregate_fields)
rows = connection.select_all(sql)
rows.each do |row|
result.add_data(row.delete(current_row_name.to_s),
row.delete(current_column_name.to_s),
row) # the rest of the members of row are the fact columns
# puts "should you use the base for this query? #{use_base}"
if use_base
aggregate_options[:base].query(*args)
else
query_table_name = aggregate_rollup_name(aggregate_table_name, aggregate_levels)

# build the SQL query
sql = ''
sql << "SELECT\n"
sql << "#{full_column_name} AS '#{current_column_name}',\n"
sql << "#{full_row_name} AS '#{current_row_name}',\n"
sql << (aggregate_fields.collect{|c| "#{c.strategy_name == :avg ? :avg : :sum}(#{c.label_for_table}) AS '#{c.label}'"}.join(",\n") + "\n")
sql << "FROM #{query_table_name}\n"
sql << "WHERE (#{where_clause.join(") AND\n(")})\n"
sql << "AND (#{sanitize(conditions)})\n" if conditions
sql << "GROUP BY #{full_column_name}, #{full_row_name}\n"
sql << "ORDER BY #{order}\n" if order
sql << "LIMIT #{limit}\n" if limit

# execute the query and return the results as a CubeQueryResult object
result = ActiveWarehouse::CubeQueryResult.new(aggregate_fields)
rows = connection.select_all(sql)
rows.each do |row|
result.add_data(row.delete(current_row_name.to_s),
row.delete(current_column_name.to_s),
row) # the rest of the members of row are the fact columns
end
result
end
result
end

# Build and populate the data store
def populate(options={})
# puts "PipelinedRolapAggregate::populate #{options.inspect}"
@new_records_record = nil

# see if the options mean to do new records only
if(options[:new_records_only])
# need to know the name of the dimension and field to use to find new only
Expand All @@ -174,13 +186,16 @@ def populate(options={})
end

def create_and_populate_aggregate(options={})
# puts "PipelinedRolapAggregate::create_and_populate_aggregate #{options.inspect}"
puts "PipelinedRolapAggregate::create_and_populate_aggregate #{options.inspect}"
base_name = aggregate_table_name
dimension_fields = aggregate_dimension_fields
aggregate_levels = dimension_fields.collect{|dim, levels|
(0..levels.count).collect.reverse
aggregate_levels = dimension_fields.collect{|dim, levels|
min_level = create_all_level?(dim, options) ? 0 : 1
(min_level..levels.count).collect.reverse
}.sequence

puts "aggregate_levels:\n#{aggregate_levels.inspect}"

# first time through use the fact table, don't after that
options.merge!({:use_fact => true})
aggregate_levels.each do |levels|
Expand All @@ -190,7 +205,14 @@ def create_and_populate_aggregate(options={})
end

end


def create_all_level?(dim, options)
# see if a base is defined, if not, always make all levels
# if there is a base, see if this is a dim in the base
# if it is in the base, then we need to do all, if it is not in base, do not do all
options[:base] && options[:base].dimension_classes.include?(dim)
end

# build and populate a table which group by's all dimension columns.
# this should include all the columns from the hierarchies in the dimension
# should it have the column id levels? only if in the hierarchy
Expand All @@ -202,28 +224,29 @@ def create_aggregate_table(base_name, dimension_fields, current_levels, options)

table_name = aggregate_rollup_name(base_name, current_levels)
table_options = options[:aggregate_table_options] || {}

# # truncate if configured to, otherwise, just pile it on.
if (options[:truncate] && connection.tables.include?(table_name))
connection.drop_table(table_name)
end

unique_index_columns = []
index_columns = []

if !connection.tables.include?(table_name)
aggregate_table_options = (options[:aggregate_table_options] || {}).merge({:id => false})
puts "create_table: #{table_name}"
connection.create_table(table_name, aggregate_table_options) do |t|
dimension_fields.each_with_index do |pair, i|
dim = pair.first
levels = pair.last
max_level = current_levels[i]
# puts "create_aggregate_table: dim.name = #{dim.name}, max = #{max_level}, i = #{i}"
puts "dim.name = #{dim.name}, max = #{max_level}, i = #{i}"
levels.each_with_index do |field, j|
break if (j >= max_level)
column_options = {:null=>false}
# unique_index_columns << field.label if (j == (max_level-1))

# if it is a string or text column, then include the limit with the options
if [:string, :text].include?(field.column_type)
column_options[:limit] = field.limit
Expand All @@ -237,14 +260,14 @@ def create_aggregate_table(base_name, dimension_fields, current_levels, options)
t.column(field.label, field.column_type, column_options)
end
end

aggregate_fields.each do |field|
af_opts = {}

# By default the aggregate field column type will be a count
aggregate_type = :integer
af_opts[:limit] = 8

# But, if type is a decimal, and you do a sum or avg (not a count) then keep it a decimal
if [:float, :decimal].include?(field.type) && field.strategy_name != :count
af_opts[:limit] = field.type == :integer ? 8 : field.limit
Expand All @@ -257,7 +280,7 @@ def create_aggregate_table(base_name, dimension_fields, current_levels, options)
end

end

# add index per dimension here (not for aggregate fields)
index_columns.each{ |dimension_column|
puts "making index for: #{table_name} on: #{dimension_column}"
Expand All @@ -269,11 +292,11 @@ def create_aggregate_table(base_name, dimension_fields, current_levels, options)
puts "making unique index for: #{table_name} on: #{unique_index_columns.inspect}"
connection.add_index(table_name, unique_index_columns, :unique => true, :name => "by_unique_dims")
end

puts "create_aggregate_table end"
table_name
end

end

def populate_aggregate_table(base_name, dimension_fields, current_levels, options={})
Expand Down Expand Up @@ -360,13 +383,13 @@ def populate_aggregate_table(base_name, dimension_fields, current_levels, option
end

if options[:use_fact]
dimension_column_names << "coalesce(#{field.table_alias}.#{field.name}, #{field_default}) as #{field.table_alias}_#{field.name}"
load_dimension_column_names << "#{field.table_alias}_#{field.name}"
dimension_column_group_names << "#{field.table_alias}.#{field.name}"
dimension_column_names << "coalesce(#{field.table_alias}.#{field.name}, #{field_default}) as #{field.table_alias}_#{field.name}"
load_dimension_column_names << "#{field.table_alias}_#{field.name}"
dimension_column_group_names << "#{field.table_alias}.#{field.name}"
else
dimension_column_names << field.label
load_dimension_column_names << field.label
dimension_column_group_names << field.label
dimension_column_names << field.label
load_dimension_column_names << field.label
dimension_column_group_names << field.label
end
end

Expand Down
2 changes: 2 additions & 0 deletions lib/active_warehouse/aggregate/rolap_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ def aggregated_fact_column_sql
# Convenience accessor that delegates to cube class method aggregate_fields.
# Returns an array of AggregateField instances, which are the fact columns
# from the fact table.
# This is used by PipelinedRolapAggregate
def aggregate_fields
cube_class.aggregate_fields
end

# The SQL fragment for tables and joins which is used during the population
# of the "flattened" cube
# This is used by PipelinedRolapAggregate
def tables_and_joins
sql = "#{fact_class.table_name}"
cube_class.dimensions_hierarchies.each do |dimension_name, hierarchy_name|
Expand Down

0 comments on commit 3ed6952

Please sign in to comment.