From 3b34d7df0b961cc27a3629def5fdc8fbb838f4fc Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Mon, 30 Jun 2014 14:31:52 -0700 Subject: [PATCH 1/7] raco: move languages into language subpackage This is somewhat like the refactoring I did to move all the expressions into the expression subpackage. This time, I'm trying to consolidate all the languages. --- c++/scan_code.py | 4 +- c_test_environment/clang_tests.py | 2 +- c_test_environment/grappalang_tests.py | 2 +- examples/sp2bench/sp2bench_rdf_long.py | 22 +++--- raco/__init__.py | 2 +- raco/datalog/datalog_test.py | 5 +- raco/datalog/query_tests.py | 2 +- raco/{language.py => language/__init__.py} | 14 +--- .../c_templates/ascii_scan.template | 0 .../c_templates/base_query.template | 0 .../c_templates/binary_scan.template | 0 .../c_templates/emit_joined_tuple.template | 0 ...ltering_nestedloop_hashjoin_chain.template | 0 .../filtering_nestedloop_join.template | 0 .../filtering_nestedloop_join_chain.template | 0 .../c_templates/filteringhashjoin.template | 0 .../c_templates/hashjoin.template | 0 .../join_simple_hash_twopass.template | 0 .../c_templates/precount_select.template | 0 raco/{ => language}/c_templates/scan.template | 0 .../select_simple_twopass.template | 0 raco/{ => language}/clang.py | 9 ++- raco/{ => language}/clangcommon.py | 68 +++++++++---------- .../grappa_templates/base_query.template | 0 raco/{ => language}/grappalang.py | 9 ++- raco/{ => language}/myrialang.py | 4 +- raco/{ => language}/pseudocodelang.py | 0 raco/{ => language}/pythonlang.py | 0 raco/myrial/interpreter.py | 5 +- raco/myrial/myrial_test.py | 2 +- raco/myrial/optimizer_tests.py | 10 +-- raco/nary_join_rules_test.py | 2 +- raco/operator_test.py | 3 +- 33 files changed, 78 insertions(+), 87 deletions(-) rename raco/{language.py => language/__init__.py} (93%) rename raco/{ => language}/c_templates/ascii_scan.template (100%) rename raco/{ => language}/c_templates/base_query.template (100%) rename raco/{ => language}/c_templates/binary_scan.template (100%) rename raco/{ => language}/c_templates/emit_joined_tuple.template (100%) rename raco/{ => language}/c_templates/filtering_nestedloop_hashjoin_chain.template (100%) rename raco/{ => language}/c_templates/filtering_nestedloop_join.template (100%) rename raco/{ => language}/c_templates/filtering_nestedloop_join_chain.template (100%) rename raco/{ => language}/c_templates/filteringhashjoin.template (100%) rename raco/{ => language}/c_templates/hashjoin.template (100%) rename raco/{ => language}/c_templates/join_simple_hash_twopass.template (100%) rename raco/{ => language}/c_templates/precount_select.template (100%) rename raco/{ => language}/c_templates/scan.template (100%) rename raco/{ => language}/c_templates/select_simple_twopass.template (100%) rename raco/{ => language}/clang.py (98%) rename raco/{ => language}/clangcommon.py (96%) rename raco/{ => language}/grappa_templates/base_query.template (100%) rename raco/{ => language}/grappalang.py (99%) rename raco/{ => language}/myrialang.py (99%) rename raco/{ => language}/pseudocodelang.py (100%) rename raco/{ => language}/pythonlang.py (100%) diff --git a/c++/scan_code.py b/c++/scan_code.py index 1b0af22c..3ea36f97 100644 --- a/c++/scan_code.py +++ b/c++/scan_code.py @@ -1,5 +1,5 @@ import raco.algebra as alg -import raco.clang as clang +import raco.language.clang as clang import raco.boolean as rbool @@ -311,7 +311,7 @@ def visit (self,n): #generate code for a join chain elif isinstance(n,alg.NaryOperator) : - if isinstance(n,clang.FilteringNLJoinChain) : + if isinstance(n, clang.FilteringNLJoinChain) : for arg in n.args : self.visit(arg) self.generate_join_chain(n) diff --git a/c_test_environment/clang_tests.py b/c_test_environment/clang_tests.py index e177d4aa..1ad8448a 100644 --- a/c_test_environment/clang_tests.py +++ b/c_test_environment/clang_tests.py @@ -3,7 +3,7 @@ from testquery import ClangRunner from generate_test_relations import generate_default from generate_test_relations import need_generate -from raco.language import CCAlgebra +from raco.language.clang import CCAlgebra from platform_tests import PlatformTest import sys diff --git a/c_test_environment/grappalang_tests.py b/c_test_environment/grappalang_tests.py index 15ac239f..7cefc570 100644 --- a/c_test_environment/grappalang_tests.py +++ b/c_test_environment/grappalang_tests.py @@ -3,7 +3,7 @@ from testquery import GrappalangRunner from generate_test_relations import generate_default from generate_test_relations import need_generate -from raco.language import GrappaAlgebra +from raco.language.grappalang import GrappaAlgebra from platform_tests import PlatformTest from nose.plugins.skip import SkipTest diff --git a/examples/sp2bench/sp2bench_rdf_long.py b/examples/sp2bench/sp2bench_rdf_long.py index d55c8238..d08c40b0 100644 --- a/examples/sp2bench/sp2bench_rdf_long.py +++ b/examples/sp2bench/sp2bench_rdf_long.py @@ -1,7 +1,7 @@ import emitcode import raco.algebra as algebra import raco.rules as rules -from raco.grappalang import GrappaSymmetricHashJoin, GrappaHashJoin +from raco.language.grappalang import GrappaSymmetricHashJoin, GrappaHashJoin from raco.language import CCAlgebra, GrappaAlgebra @@ -48,7 +48,7 @@ # TODO be sure DISTINCT -# syntactically join with equality; +# syntactically join with equality; #queries['Q5a'] = """A(person, name) :- %(tr)s(article, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://localhost/vocabulary/bench/Article'), queries['Q5a'] = """A(person, name) :- %(tr)s(article, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://localhost/vocabulary/bench/Article'), %(tr)s(article, 'http://purl.org/dc/elements/1.1/creator', person), @@ -71,24 +71,24 @@ # TODO: Q7 requires double negation -#TODO: enable Q8, after dealing with HashJoin( $0 != $7 ) type of cases -#queries['Q8'] = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), -_ = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), - %(tr)s(erdoes, 'http://xmlns.com/foaf/0.1/name', "Paul Erdoes") +#TODO: enable Q8, after dealing with HashJoin( $0 != $7 ) type of cases +#queries['Q8'] = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), +_ = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), + %(tr)s(erdoes, 'http://xmlns.com/foaf/0.1/name', "Paul Erdoes") A(name) :- Erdoes(erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(doc2, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(doc2, 'http://purl.org/dc/elements/1.1/creator', author2), %(tr)s(author2, 'http://xmlns.com/foaf/0.1/name', name), - author != erdoes, - doc2 != doc, + author != erdoes, + doc2 != doc, author2 != erdoes, author2 != author - + A(name) :- Erdoes(erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', erdoes), - %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), + %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(author, 'http://xmlns.com/foaf/0.1/name', name), author != erdoes""" #TODO be sure DISTINCT @@ -105,7 +105,7 @@ queries['Q11'] = """A(ee) :- %(tr)s(publication, 'http://www.w3.org/2000/01/rdf-schema#seeAlso', ee)""" #TODO order by, limit, offset - + alg = CCAlgebra prefix="" import sys diff --git a/raco/__init__.py b/raco/__init__.py index 6074245c..9239e653 100644 --- a/raco/__init__.py +++ b/raco/__init__.py @@ -1,5 +1,5 @@ from raco.datalog.grammar import parse -from raco.language import MyriaLeftDeepTreeAlgebra +from raco.language.myrialang import MyriaLeftDeepTreeAlgebra from raco.algebra import LogicalAlgebra from raco.compile import optimize diff --git a/raco/datalog/datalog_test.py b/raco/datalog/datalog_test.py index 900eaa96..76ed382c 100644 --- a/raco/datalog/datalog_test.py +++ b/raco/datalog/datalog_test.py @@ -3,8 +3,9 @@ import raco.fakedb from raco import RACompiler -from raco.language import MyriaLeftDeepTreeAlgebra, MyriaHyperCubeAlgebra -from raco.myrialang import compile_to_json +from raco.language.myrialang import (compile_to_json, + MyriaLeftDeepTreeAlgebra, + MyriaHyperCubeAlgebra) from raco.catalog import FakeCatalog diff --git a/raco/datalog/query_tests.py b/raco/datalog/query_tests.py index 32f1fef1..8be2d19a 100644 --- a/raco/datalog/query_tests.py +++ b/raco/datalog/query_tests.py @@ -3,7 +3,7 @@ import raco.scheme as scheme import raco.datalog.datalog_test as datalog_test from raco import types -from raco.language import MyriaHyperCubeAlgebra +from raco.language.myrialang import MyriaHyperCubeAlgebra class TestQueryFunctions(datalog_test.DatalogTestCase): diff --git a/raco/language.py b/raco/language/__init__.py similarity index 93% rename from raco/language.py rename to raco/language/__init__.py index 5b30e957..90979b70 100644 --- a/raco/language.py +++ b/raco/language/__init__.py @@ -1,6 +1,5 @@ -from raco import expression - from abc import ABCMeta, abstractmethod +import raco.expression as expression import logging LOG = logging.getLogger(__name__) @@ -174,13 +173,4 @@ def visit_TIMES(self, binaryexpr): def visit_NEG(self, unaryexpr): inputexpr = self.stack.pop() - self.stack.append(self.language.negative(inputexpr)) - - -# import everything from each language -from raco.pythonlang import PythonAlgebra -from raco.pseudocodelang import PseudoCodeAlgebra -from raco.clang import CCAlgebra -from raco.myrialang import MyriaLeftDeepTreeAlgebra -from raco.myrialang import MyriaHyperCubeAlgebra -from raco.grappalang import GrappaAlgebra + self.stack.append(self.language.negative(inputexpr)) \ No newline at end of file diff --git a/raco/c_templates/ascii_scan.template b/raco/language/c_templates/ascii_scan.template similarity index 100% rename from raco/c_templates/ascii_scan.template rename to raco/language/c_templates/ascii_scan.template diff --git a/raco/c_templates/base_query.template b/raco/language/c_templates/base_query.template similarity index 100% rename from raco/c_templates/base_query.template rename to raco/language/c_templates/base_query.template diff --git a/raco/c_templates/binary_scan.template b/raco/language/c_templates/binary_scan.template similarity index 100% rename from raco/c_templates/binary_scan.template rename to raco/language/c_templates/binary_scan.template diff --git a/raco/c_templates/emit_joined_tuple.template b/raco/language/c_templates/emit_joined_tuple.template similarity index 100% rename from raco/c_templates/emit_joined_tuple.template rename to raco/language/c_templates/emit_joined_tuple.template diff --git a/raco/c_templates/filtering_nestedloop_hashjoin_chain.template b/raco/language/c_templates/filtering_nestedloop_hashjoin_chain.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_hashjoin_chain.template rename to raco/language/c_templates/filtering_nestedloop_hashjoin_chain.template diff --git a/raco/c_templates/filtering_nestedloop_join.template b/raco/language/c_templates/filtering_nestedloop_join.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_join.template rename to raco/language/c_templates/filtering_nestedloop_join.template diff --git a/raco/c_templates/filtering_nestedloop_join_chain.template b/raco/language/c_templates/filtering_nestedloop_join_chain.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_join_chain.template rename to raco/language/c_templates/filtering_nestedloop_join_chain.template diff --git a/raco/c_templates/filteringhashjoin.template b/raco/language/c_templates/filteringhashjoin.template similarity index 100% rename from raco/c_templates/filteringhashjoin.template rename to raco/language/c_templates/filteringhashjoin.template diff --git a/raco/c_templates/hashjoin.template b/raco/language/c_templates/hashjoin.template similarity index 100% rename from raco/c_templates/hashjoin.template rename to raco/language/c_templates/hashjoin.template diff --git a/raco/c_templates/join_simple_hash_twopass.template b/raco/language/c_templates/join_simple_hash_twopass.template similarity index 100% rename from raco/c_templates/join_simple_hash_twopass.template rename to raco/language/c_templates/join_simple_hash_twopass.template diff --git a/raco/c_templates/precount_select.template b/raco/language/c_templates/precount_select.template similarity index 100% rename from raco/c_templates/precount_select.template rename to raco/language/c_templates/precount_select.template diff --git a/raco/c_templates/scan.template b/raco/language/c_templates/scan.template similarity index 100% rename from raco/c_templates/scan.template rename to raco/language/c_templates/scan.template diff --git a/raco/c_templates/select_simple_twopass.template b/raco/language/c_templates/select_simple_twopass.template similarity index 100% rename from raco/c_templates/select_simple_twopass.template rename to raco/language/c_templates/select_simple_twopass.template diff --git a/raco/clang.py b/raco/language/clang.py similarity index 98% rename from raco/clang.py rename to raco/language/clang.py index 8c50154d..4a2cd219 100644 --- a/raco/clang.py +++ b/raco/language/clang.py @@ -3,13 +3,12 @@ from raco import algebra from raco import expression -from raco.language import Language +from raco.language import Language, clangcommon from raco import rules from raco.pipelines import Pipelined -from raco.clangcommon import StagedTupleRef -from raco import clangcommon +from raco.language.clangcommon import StagedTupleRef -from algebra import gensym +from raco.algebra import gensym import logging @@ -169,7 +168,7 @@ def new_tuple_ref(self, sym, scheme): return CStagedTupleRef(sym, scheme) -from algebra import UnaryOperator +from raco.algebra import UnaryOperator class MemoryScan(algebra.UnaryOperator, CCOperator): diff --git a/raco/clangcommon.py b/raco/language/clangcommon.py similarity index 96% rename from raco/clangcommon.py rename to raco/language/clangcommon.py index ab991cdb..71de112f 100644 --- a/raco/clangcommon.py +++ b/raco/language/clangcommon.py @@ -5,8 +5,8 @@ from raco import algebra from raco import expression from raco import catalog -from algebra import gensym -from expression.expression import UnnamedAttributeRef +from raco.algebra import gensym +from raco.expression import UnnamedAttributeRef import logging LOG = logging.getLogger(__name__) @@ -35,23 +35,23 @@ def ct(s): # TODO: # The following is actually a staged materialized tuple ref. -# we should also add a staged reference tuple ref that just has relationsymbol and row +# we should also add a staged reference tuple ref that just has relationsymbol and row class StagedTupleRef: nextid = 0 - + @classmethod def genname(cls): # use StagedTupleRef so everyone shares one mutable copy of nextid - x = StagedTupleRef.nextid + x = StagedTupleRef.nextid StagedTupleRef.nextid+=1 return "t_%03d" % x - + def __init__(self, relsym, scheme): self.name = self.genname() self.relsym = relsym self.scheme = scheme self.__typename = None - + def getTupleTypename(self): if self.__typename==None: fields = "" @@ -59,12 +59,12 @@ def getTupleTypename(self): for i in range(0, len(self.scheme)): fieldnum = i fields += "_%(fieldnum)s" % locals() - + self.__typename = "MaterializedTupleRef_%(relsym)s%(fields)s" % locals() - + return self.__typename - + def generateDefinition(self): fielddeftemplate = """int64_t _fields[%(numfields)s]; """ @@ -78,15 +78,15 @@ class %(tupletypename)s { int64_t get(int field) const { return _fields[field]; } - + void set(int field, int64_t val) { _fields[field] = val; } - + int numFields() const { return %(numfields)s; } - + %(tupletypename)s () { // no-op } @@ -94,7 +94,7 @@ class %(tupletypename)s { %(tupletypename)s (std::vector vals) { for (int i=0; i Date: Mon, 30 Jun 2014 18:33:30 -0700 Subject: [PATCH 2/7] raco: refactor rules and logical algebra 1. Move the myrialang rules that have nothing to do with Myria per se to the rules file. 2. Move the LogicalAlgebra from algebra.py to language/ package. 3. Create a OptLogicalAlgebra class that is an optimized logical algebra. --- examples/emitcode.py | 6 +- expressiontest.py | 60 ----- logical_ra_pb2.py | 401 --------------------------------- raco/__init__.py | 2 +- raco/algebra.py | 14 +- raco/language/clangcommon.py | 2 +- raco/language/logical.py | 24 ++ raco/language/myrialang.py | 314 +------------------------- raco/myrial/interpreter.py | 2 +- raco/myrial/optimizer_tests.py | 2 +- raco/operator_test.py | 1 + raco/rules.py | 301 ++++++++++++++++++++++++- 12 files changed, 342 insertions(+), 787 deletions(-) delete mode 100644 expressiontest.py delete mode 100644 logical_ra_pb2.py create mode 100644 raco/language/logical.py diff --git a/examples/emitcode.py b/examples/emitcode.py index 1ba64f2a..67186e2a 100644 --- a/examples/emitcode.py +++ b/examples/emitcode.py @@ -1,5 +1,5 @@ from raco import RACompiler -from raco.algebra import LogicalAlgebra +from raco.language.logical import LogicalAlgebra from raco.compile import compile import raco.viz as viz @@ -28,7 +28,7 @@ def emitCode(query, name, algebra): dlog.optimize(target=algebra, eliminate_common_subexpressions=False) LOG.info("physical: %s",dlog.physicalplan) - + print dlog.physicalplan physical_dot = viz.operator_to_dot(dlog.physicalplan) with open("%s.physical.dot"%(name), 'w') as dwf: @@ -38,7 +38,7 @@ def emitCode(query, name, algebra): code = "" code += comment("Query " + query) code += compile(dlog.physicalplan) - + fname = name+'.cpp' with open(fname, 'w') as f: f.write(code) diff --git a/expressiontest.py b/expressiontest.py deleted file mode 100644 index 45ace5cd..00000000 --- a/expressiontest.py +++ /dev/null @@ -1,60 +0,0 @@ -import raco.expression as e -from raco import RACompiler -from raco.language import MyriaAlgebra -from raco.scheme import Scheme -from raco.algebra import Select, Scan, Join, Apply, LogicalAlgebra -from raco.boolean import EQ, AND, OR, StringLiteral, NumericLiteral -from sampledb import btc_schema, Rr -from raco.compile import compile, optimize -import raco.catalog - -plus = e.PLUS(e.UnnamedAttributeRef(0), e.NamedAttributeRef("x")) -print plus - -minus = e.MINUS(e.Literal(2), e.Literal(5)) -print minus - -absf = e.ABS(e.UnnamedAttributeRef(2)) -print absf - -mix = e.DIVIDE(e.TIMES(plus, absf), minus) -print mix - - -def testRA(): - sch = Scheme([("y",float), ("x",int)]) - R = raco.catalog.Relation("R", sch) - J = Join(EQ(e.UnnamedAttributeRef(0), e.NamedAttributeRef("x")), Scan(R), Scan(R)) - A = Apply(J, z=e.PLUS(e.NamedAttributeRef("x"), e.NamedAttributeRef("y")), w=e.UnnamedAttributeRef(3)) - exprs = optimize([('A',A)], target=MyriaAlgebra(), source=LogicalAlgebra) - print exprs - print compile(exprs) - -def testDatalog(): - query = "A(x) :- R(x,y),S(y,z)" - - print "/*\n%s\n*/" % str(query) - - # Create a compiler object - dlog = RACompiler() - - # parse the query - dlog.fromDatalog(query) - print "************ LOGICAL PLAN *************" - print dlog.logicalplan - print - - # Optimize the query, includes producing a physical plan - print "************ PHYSICAL PLAN *************" - dlog.optimize(target=MyriaAlgebra(), eliminate_common_subexpressions=False) - print dlog.physicalplan - print - - # generate code in the target language - code = dlog.compile() - print "************ CODE *************" - print code - print - - -testRA() diff --git a/logical_ra_pb2.py b/logical_ra_pb2.py deleted file mode 100644 index c66ea291..00000000 --- a/logical_ra_pb2.py +++ /dev/null @@ -1,401 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from protobuf import descriptor -from protobuf import message -from protobuf import reflection -from protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - - -DESCRIPTOR = descriptor.FileDescriptor( - name='logical_ra.proto', - package='', - serialized_pb='\n\x10logical_ra.proto\"L\n\x15LogicalRaQueryMessage\x12\x0c\n\x04name\x18\x01 \x02(\t\x12%\n\toperators\x18\x03 \x03(\x0b\x32\x12.LogicalRaOperator\"\xd8\x02\n\x11LogicalRaOperator\x12\x36\n\x04type\x18\x01 \x02(\x0e\x32(.LogicalRaOperator.LogicalRaOperatorType\x12\x0c\n\x04name\x18\x02 \x02(\t\x12\x1a\n\x04scan\x18\x03 \x01(\x0b\x32\x0c.LogicalScan\x12\x1e\n\x06select\x18\x04 \x01(\x0b\x32\x0e.LogicalSelect\x12 \n\x07project\x18\x05 \x01(\x0b\x32\x0f.LogicalProject\x12\"\n\x08\x65quijoin\x18\x06 \x01(\x0b\x32\x10.LogicalEquiJoin\x12\x1c\n\x05\x63ross\x18\x07 \x01(\x0b\x32\r.LogicalCross\"]\n\x15LogicalRaOperatorType\x12\x08\n\x04SCAN\x10\x00\x12\x0b\n\x07PROJECT\x10\x01\x12\x08\n\x04JOIN\x10\x02\x12\n\n\x06SELECT\x10\x03\x12\x0c\n\x08\x45QUIJOIN\x10\x04\x12\t\n\x05\x43ROSS\x10\x05\"\x1f\n\x0bLogicalScan\x12\x10\n\x08relation\x18\x01 \x02(\t\"5\n\rLogicalSelect\x12\x11\n\tchildName\x18\x01 \x02(\t\x12\x11\n\tcondition\x18\x02 \x02(\t\"4\n\x0eLogicalProject\x12\x11\n\tchildName\x18\x01 \x02(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\x05\"k\n\x0fLogicalEquiJoin\x12\x15\n\rleftChildName\x18\x01 \x02(\t\x12\x13\n\x0bleftColumns\x18\x02 \x03(\x05\x12\x16\n\x0erightChildName\x18\x03 \x02(\t\x12\x14\n\x0crightColumns\x18\x04 \x03(\x05\"=\n\x0cLogicalCross\x12\x15\n\rleftChildName\x18\x01 \x02(\t\x12\x16\n\x0erightChildName\x18\x02 \x02(\tB6\n$edu.washington.escience.myriad.protoB\x0eLogicalRaProto') - - - -_LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE = descriptor.EnumDescriptor( - name='LogicalRaOperatorType', - full_name='LogicalRaOperator.LogicalRaOperatorType', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='SCAN', index=0, number=0, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='PROJECT', index=1, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='JOIN', index=2, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SELECT', index=3, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='EQUIJOIN', index=4, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CROSS', index=5, number=5, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=350, - serialized_end=443, -) - - -_LOGICALRAQUERYMESSAGE = descriptor.Descriptor( - name='LogicalRaQueryMessage', - full_name='LogicalRaQueryMessage', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='name', full_name='LogicalRaQueryMessage.name', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='operators', full_name='LogicalRaQueryMessage.operators', index=1, - number=3, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=20, - serialized_end=96, -) - - -_LOGICALRAOPERATOR = descriptor.Descriptor( - name='LogicalRaOperator', - full_name='LogicalRaOperator', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='type', full_name='LogicalRaOperator.type', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='name', full_name='LogicalRaOperator.name', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='scan', full_name='LogicalRaOperator.scan', index=2, - number=3, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='select', full_name='LogicalRaOperator.select', index=3, - number=4, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='project', full_name='LogicalRaOperator.project', index=4, - number=5, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='equijoin', full_name='LogicalRaOperator.equijoin', index=5, - number=6, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='cross', full_name='LogicalRaOperator.cross', index=6, - number=7, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=99, - serialized_end=443, -) - - -_LOGICALSCAN = descriptor.Descriptor( - name='LogicalScan', - full_name='LogicalScan', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='relation', full_name='LogicalScan.relation', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=445, - serialized_end=476, -) - - -_LOGICALSELECT = descriptor.Descriptor( - name='LogicalSelect', - full_name='LogicalSelect', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='childName', full_name='LogicalSelect.childName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='condition', full_name='LogicalSelect.condition', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=478, - serialized_end=531, -) - - -_LOGICALPROJECT = descriptor.Descriptor( - name='LogicalProject', - full_name='LogicalProject', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='childName', full_name='LogicalProject.childName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='columns', full_name='LogicalProject.columns', index=1, - number=2, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=533, - serialized_end=585, -) - - -_LOGICALEQUIJOIN = descriptor.Descriptor( - name='LogicalEquiJoin', - full_name='LogicalEquiJoin', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='leftChildName', full_name='LogicalEquiJoin.leftChildName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='leftColumns', full_name='LogicalEquiJoin.leftColumns', index=1, - number=2, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightChildName', full_name='LogicalEquiJoin.rightChildName', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightColumns', full_name='LogicalEquiJoin.rightColumns', index=3, - number=4, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=587, - serialized_end=694, -) - - -_LOGICALCROSS = descriptor.Descriptor( - name='LogicalCross', - full_name='LogicalCross', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='leftChildName', full_name='LogicalCross.leftChildName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightChildName', full_name='LogicalCross.rightChildName', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=696, - serialized_end=757, -) - -_LOGICALRAQUERYMESSAGE.fields_by_name['operators'].message_type = _LOGICALRAOPERATOR -_LOGICALRAOPERATOR.fields_by_name['type'].enum_type = _LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE -_LOGICALRAOPERATOR.fields_by_name['scan'].message_type = _LOGICALSCAN -_LOGICALRAOPERATOR.fields_by_name['select'].message_type = _LOGICALSELECT -_LOGICALRAOPERATOR.fields_by_name['project'].message_type = _LOGICALPROJECT -_LOGICALRAOPERATOR.fields_by_name['equijoin'].message_type = _LOGICALEQUIJOIN -_LOGICALRAOPERATOR.fields_by_name['cross'].message_type = _LOGICALCROSS -_LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE.containing_type = _LOGICALRAOPERATOR; -DESCRIPTOR.message_types_by_name['LogicalRaQueryMessage'] = _LOGICALRAQUERYMESSAGE -DESCRIPTOR.message_types_by_name['LogicalRaOperator'] = _LOGICALRAOPERATOR -DESCRIPTOR.message_types_by_name['LogicalScan'] = _LOGICALSCAN -DESCRIPTOR.message_types_by_name['LogicalSelect'] = _LOGICALSELECT -DESCRIPTOR.message_types_by_name['LogicalProject'] = _LOGICALPROJECT -DESCRIPTOR.message_types_by_name['LogicalEquiJoin'] = _LOGICALEQUIJOIN -DESCRIPTOR.message_types_by_name['LogicalCross'] = _LOGICALCROSS - -class LogicalRaQueryMessage(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALRAQUERYMESSAGE - - # @@protoc_insertion_point(class_scope:LogicalRaQueryMessage) - -class LogicalRaOperator(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALRAOPERATOR - - # @@protoc_insertion_point(class_scope:LogicalRaOperator) - -class LogicalScan(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALSCAN - - # @@protoc_insertion_point(class_scope:LogicalScan) - -class LogicalSelect(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALSELECT - - # @@protoc_insertion_point(class_scope:LogicalSelect) - -class LogicalProject(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALPROJECT - - # @@protoc_insertion_point(class_scope:LogicalProject) - -class LogicalEquiJoin(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALEQUIJOIN - - # @@protoc_insertion_point(class_scope:LogicalEquiJoin) - -class LogicalCross(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALCROSS - - # @@protoc_insertion_point(class_scope:LogicalCross) - -# @@protoc_insertion_point(module_scope) diff --git a/raco/__init__.py b/raco/__init__.py index 9239e653..b7ef3164 100644 --- a/raco/__init__.py +++ b/raco/__init__.py @@ -1,6 +1,6 @@ from raco.datalog.grammar import parse from raco.language.myrialang import MyriaLeftDeepTreeAlgebra -from raco.algebra import LogicalAlgebra +from raco.language.logical import LogicalAlgebra from raco.compile import optimize import logging diff --git a/raco/algebra.py b/raco/algebra.py index 5e7555cf..51724e51 100644 --- a/raco/algebra.py +++ b/raco/algebra.py @@ -1346,16 +1346,4 @@ def rewrite_node(node): else: return node.apply(rewrite_node) - return rewrite_node(dest_op) - - -class LogicalAlgebra(object): - operators = [ - Join, - Select, - Scan - ] - - @staticmethod - def opt_rules(): - return [] + return rewrite_node(dest_op) \ No newline at end of file diff --git a/raco/language/clangcommon.py b/raco/language/clangcommon.py index 71de112f..807f1af7 100644 --- a/raco/language/clangcommon.py +++ b/raco/language/clangcommon.py @@ -53,7 +53,7 @@ def __init__(self, relsym, scheme): self.__typename = None def getTupleTypename(self): - if self.__typename==None: + if self.__typename is None: fields = "" relsym = self.relsym for i in range(0, len(self.scheme)): diff --git a/raco/language/logical.py b/raco/language/logical.py new file mode 100644 index 00000000..c0bada56 --- /dev/null +++ b/raco/language/logical.py @@ -0,0 +1,24 @@ +import raco.rules as rules + + +class LogicalAlgebra(object): + @staticmethod + def opt_rules(): + return [] + + +class OptLogicalAlgebra(object): + @staticmethod + def opt_rules(): + return [rules.RemoveTrivialSequences(), + rules.SimpleGroupBy(), + rules.SplitSelects(), + rules.PushSelects(), + rules.MergeSelects(), + rules.ProjectingJoin(), + rules.JoinToProjectingJoin(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply()] \ No newline at end of file diff --git a/raco/language/myrialang.py b/raco/language/myrialang.py index c3015806..3cac0211 100644 --- a/raco/language/myrialang.py +++ b/raco/language/myrialang.py @@ -1116,49 +1116,6 @@ def resolve_finalizer_expr(logical_agg, pos): return algebra.Apply(gmappings + fmappings, op_out) -class SplitSelects(rules.Rule): - """Replace AND clauses with multiple consecutive selects.""" - - def fire(self, op): - if not isinstance(op, algebra.Select): - return op - - conjuncs = expression.extract_conjuncs(op.condition) - assert conjuncs # Must be at least 1 - - # Normalize named references to integer indexes - scheme = op.scheme() - conjuncs = [to_unnamed_recursive(c, scheme) - for c in conjuncs] - - op.condition = conjuncs[0] - op.has_been_pushed = False - for conjunc in conjuncs[1:]: - op = algebra.Select(conjunc, op) - op.has_been_pushed = False - return op - - def __str__(self): - return "Select => Select, Select" - - -class MergeSelects(rules.Rule): - """Merge consecutive Selects into a single conjunctive selection.""" - - def fire(self, op): - if not isinstance(op, algebra.Select): - return op - - while isinstance(op.input, algebra.Select): - conjunc = expression.AND(op.condition, op.input.condition) - op = algebra.Select(conjunc, op.input.input) - - return op - - def __str__(self): - return "Select, Select => Select" - - class ProjectToDistinctColumnSelect(rules.Rule): def fire(self, expr): # If not a Project, who cares? @@ -1175,259 +1132,6 @@ def fire(self, expr): return colSelect -def is_column_equality_comparison(cond): - """Return a tuple of column indexes if the condition is an equality test. - """ - - if (isinstance(cond, expression.EQ) and - isinstance(cond.left, UnnamedAttributeRef) and - isinstance(cond.right, UnnamedAttributeRef)): - return cond.left.position, cond.right.position - else: - return None - - -class PushApply(rules.Rule): - """Many Applies in MyriaL are added to select fewer columns from the - input. In some of these cases, we can do less work in the children by - preventing them from producing columns we will then immediately drop. - - Currently, this rule: - - merges consecutive Apply operations into one Apply, possibly dropping - some of the produced columns along the way. - - makes ProjectingJoin only produce columns that are later read. - TODO: drop the Apply if the column-selection pushed into the - ProjectingJoin is everything the Apply was doing. See note below. - """ - - def fire(self, op): - if not isinstance(op, algebra.Apply): - return op - - child = op.input - - if isinstance(child, algebra.Apply): - in_scheme = child.scheme() - child_in_scheme = child.input.scheme() - names, emits = zip(*op.emitters) - emits = [to_unnamed_recursive(e, in_scheme) - for e in emits] - child_emits = [to_unnamed_recursive(e[1], child_in_scheme) - for e in child.emitters] - - def convert(n): - if isinstance(n, expression.UnnamedAttributeRef): - n = child_emits[n.position] - else: - n.apply(convert) - return n - - emits = [convert(copy.deepcopy(e)) for e in emits] - - new_apply = algebra.Apply(emitters=zip(names, emits), - input=child.input) - return self.fire(new_apply) - - elif isinstance(child, algebra.ProjectingJoin): - in_scheme = child.scheme() - names, emits = zip(*op.emitters) - emits = [to_unnamed_recursive(e, in_scheme) - for e in emits] - accessed = sorted(set(itertools.chain(*(accessed_columns(e) - for e in emits)))) - index_map = {a: i for (i, a) in enumerate(accessed)} - child.output_columns = [child.output_columns[i] for i in accessed] - for e in emits: - expression.reindex_expr(e, index_map) - # TODO(dhalperi) we may not need the Apply if all it did was rename - # and/or select certain columns. Figure out these cases and omit - # the Apply - return algebra.Apply(emitters=zip(names, emits), - input=child) - - return op - - def __str__(self): - return 'Push Apply into Apply, ProjectingJoin' - - -class RemoveUnusedColumns(rules.Rule): - """For operators that construct new tuples (e.g., GroupBy or Join), we are - guaranteed that any columns from an input tuple that are ignored (neither - used internally nor to produce the output columns) cannot be used higher - in the query tree. For these cases, this rule will prepend an Apply that - keeps only the referenced columns. The goal is that after this rule, - a subsequent invocation of PushApply will be able to push that - column-selection operation further down the tree.""" - - def fire(self, op): - if isinstance(op, algebra.GroupBy): - child = op.input - child_scheme = child.scheme() - grp_list = [to_unnamed_recursive(g, child_scheme) - for g in op.grouping_list] - agg_list = [to_unnamed_recursive(a, child_scheme) - for a in op.aggregate_list] - agg = [accessed_columns(a) for a in agg_list] - pos = [g.position for g in grp_list] - accessed = sorted(set(itertools.chain(*(agg + [pos])))) - if not accessed: - # Bug #207: COUNTALL() does not access any columns. So if the - # query is just a COUNT(*), we would generate an empty Apply. - # If this happens, just keep the first column of the input. - accessed = [0] - if len(accessed) != len(child_scheme): - emitters = [(None, UnnamedAttributeRef(i)) for i in accessed] - new_apply = algebra.Apply(emitters, child) - index_map = {a: i for (i, a) in enumerate(accessed)} - for agg_expr in itertools.chain(grp_list, agg_list): - expression.reindex_expr(agg_expr, index_map) - op.grouping_list = grp_list - op.aggregate_list = agg_list - op.input = new_apply - return op - elif isinstance(op, algebra.ProjectingJoin): - l_scheme = op.left.scheme() - r_scheme = op.right.scheme() - in_scheme = l_scheme + r_scheme - condition = to_unnamed_recursive(op.condition, in_scheme) - column_list = [to_unnamed_recursive(c, in_scheme) - for c in op.output_columns] - - accessed = (accessed_columns(condition) - | set(c.position for c in op.output_columns)) - if len(accessed) == len(in_scheme): - return op - - accessed = sorted(accessed) - left = [a for a in accessed if a < len(l_scheme)] - if len(left) < len(l_scheme): - emits = [(None, UnnamedAttributeRef(a)) for a in left] - apply = algebra.Apply(emits, op.left) - op.left = apply - right = [a - len(l_scheme) for a in accessed - if a >= len(l_scheme)] - if len(right) < len(r_scheme): - emits = [(None, UnnamedAttributeRef(a)) for a in right] - apply = algebra.Apply(emits, op.right) - op.right = apply - index_map = {a: i for (i, a) in enumerate(accessed)} - expression.reindex_expr(condition, index_map) - [expression.reindex_expr(c, index_map) for c in column_list] - op.condition = condition - op.output_columns = column_list - return op - - return op - - def __str__(self): - return 'Remove unused columns' - - -class PushSelects(rules.Rule): - """Push selections.""" - - @staticmethod - def descend_tree(op, cond): - """Recursively push a selection condition down a tree of operators. - - :param op: The root of an operator tree - :type op: raco.algebra.Operator - :type cond: The selection condition - :type cond: raco.expression.expression - - :return: A (possibly modified) operator. - """ - - if isinstance(op, algebra.Select): - # Keep pushing; selects are commutative - op.input = PushSelects.descend_tree(op.input, cond) - return op - elif isinstance(op, algebra.CompositeBinaryOperator): - # Joins and cross-products; consider conversion to an equijoin - left_len = len(op.left.scheme()) - accessed = accessed_columns(cond) - in_left = [col < left_len for col in accessed] - if all(in_left): - # Push the select into the left sub-tree. - op.left = PushSelects.descend_tree(op.left, cond) - return op - elif not any(in_left): - # Push into right subtree; rebase column indexes - expression.rebase_expr(cond, left_len) - op.right = PushSelects.descend_tree(op.right, cond) - return op - else: - # Selection includes both children; attempt to create an - # equijoin condition - cols = is_column_equality_comparison(cond) - if cols: - return op.add_equijoin_condition(cols[0], cols[1]) - elif isinstance(op, algebra.Apply): - # Convert accessed to a list from a set to ensure consistent order - accessed = list(accessed_columns(cond)) - accessed_emits = [op.emitters[i][1] for i in accessed] - if all(isinstance(e, expression.AttributeRef) - for e in accessed_emits): - unnamed_emits = [expression.toUnnamed(e, op.input.scheme()) - for e in accessed_emits] - # This condition only touches columns that are copied verbatim - # from the child, so we can push it. - index_map = {a: e.position - for (a, e) in zip(accessed, unnamed_emits)} - expression.reindex_expr(cond, index_map) - op.input = PushSelects.descend_tree(op.input, cond) - return op - elif isinstance(op, algebra.GroupBy): - # Convert accessed to a list from a set to ensure consistent order - accessed = list(accessed_columns(cond)) - if all((a < len(op.grouping_list)) for a in accessed): - accessed_grps = [op.grouping_list[a] for a in accessed] - # This condition only touches columns that are copied verbatim - # from the child (grouping keys), so we can push it. - assert all(isinstance(e, expression.AttributeRef) - for e in op.grouping_list) - unnamed_grps = [expression.toUnnamed(e, op.input.scheme()) - for e in accessed_grps] - index_map = {a: e.position - for (a, e) in zip(accessed, unnamed_grps)} - expression.reindex_expr(cond, index_map) - op.input = PushSelects.descend_tree(op.input, cond) - return op - - # Can't push any more: instantiate the selection - new_op = algebra.Select(cond, op) - new_op.has_been_pushed = True - return new_op - - def fire(self, op): - if not isinstance(op, algebra.Select): - return op - if op.has_been_pushed: - return op - - new_op = PushSelects.descend_tree(op.input, op.condition) - - # The new root may also be a select, so fire the rule recursively - return self.fire(new_op) - - def __str__(self): - return ("Select, Cross/Join => Join;" - + " Select, Apply => Apply, Select;" - + " Select, GroupBy => GroupBy, Select") - - -class RemoveTrivialSequences(rules.Rule): - def fire(self, expr): - if not isinstance(expr, algebra.Sequence): - return expr - - if len(expr.args) == 1: - return expr.args[0] - else: - return expr - - class MergeToNaryJoin(rules.Rule): """Merge consecutive binary join into a single multiway join Note: this code assumes that the binary joins form a left deep tree @@ -1518,16 +1222,16 @@ def fire(self, expr): # logical groups of catalog transparent rules # 1. this must be applied first -remove_trivial_sequences = [RemoveTrivialSequences()] +remove_trivial_sequences = [rules.RemoveTrivialSequences()] # 2. simple group by simple_group_by = [rules.SimpleGroupBy()] # 3. push down selection push_select = [ - SplitSelects(), - PushSelects(), - MergeSelects() + rules.SplitSelects(), + rules.PushSelects(), + rules.MergeSelects() ] # 4. push projection @@ -1540,11 +1244,11 @@ def fire(self, expr): push_apply = [ # These really ought to be run until convergence. # For now, run twice and finish with PushApply. - PushApply(), - RemoveUnusedColumns(), - PushApply(), - RemoveUnusedColumns(), - PushApply(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply(), ] # 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra diff --git a/raco/myrial/interpreter.py b/raco/myrial/interpreter.py index a6f7aa25..4a44ed28 100644 --- a/raco/myrial/interpreter.py +++ b/raco/myrial/interpreter.py @@ -7,9 +7,9 @@ import raco.expression import raco.catalog import raco.scheme +from raco.language.logical import LogicalAlgebra, OptLogicalAlgebra from raco.language.myrialang import (MyriaLeftDeepTreeAlgebra, MyriaHyperCubeAlgebra) -from raco.algebra import LogicalAlgebra from raco.language.myrialang import compile_to_json from raco.compile import optimize from raco import relation_key diff --git a/raco/myrial/optimizer_tests.py b/raco/myrial/optimizer_tests.py index 481232a1..ec003896 100644 --- a/raco/myrial/optimizer_tests.py +++ b/raco/myrial/optimizer_tests.py @@ -10,7 +10,7 @@ MyriaHyperShuffleProducer) from raco.language.myrialang import (MyriaLeftDeepTreeAlgebra, MyriaHyperCubeAlgebra) -from raco.algebra import LogicalAlgebra +from raco.language.logical import LogicalAlgebra from raco.compile import optimize from raco import relation_key from raco.catalog import FakeCatalog diff --git a/raco/operator_test.py b/raco/operator_test.py index 2208e937..6d8ba325 100644 --- a/raco/operator_test.py +++ b/raco/operator_test.py @@ -3,6 +3,7 @@ import raco.fakedb from raco.relation_key import RelationKey +from raco.language.logical import LogicalAlgebra from raco.algebra import * from raco.expression import * import raco.relation_key as relation_key diff --git a/raco/rules.py b/raco/rules.py index 9c7514f5..e75c7103 100644 --- a/raco/rules.py +++ b/raco/rules.py @@ -1,8 +1,11 @@ from raco import algebra from raco import expression -from expression import UnnamedAttributeRef +from expression import (accessed_columns, UnnamedAttributeRef, + to_unnamed_recursive) from abc import ABCMeta, abstractmethod +import copy +import itertools class Rule(object): @@ -156,3 +159,299 @@ def is_simple_agg_expr(agg): # are mutating the objects it contains when we modify grp_expr or # agg_expr in the above for loops. return expr + + +class RemoveTrivialSequences(Rule): + def fire(self, expr): + if not isinstance(expr, algebra.Sequence): + return expr + + if len(expr.args) == 1: + return expr.args[0] + else: + return expr + + +class SplitSelects(Rule): + """Replace AND clauses with multiple consecutive selects.""" + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + + conjuncs = expression.extract_conjuncs(op.condition) + assert conjuncs # Must be at least 1 + + # Normalize named references to integer indexes + scheme = op.scheme() + conjuncs = [to_unnamed_recursive(c, scheme) + for c in conjuncs] + + op.condition = conjuncs[0] + op.has_been_pushed = False + for conjunc in conjuncs[1:]: + op = algebra.Select(conjunc, op) + op.has_been_pushed = False + return op + + def __str__(self): + return "Select => Select, Select" + + +class PushSelects(Rule): + """Push selections.""" + + @staticmethod + def is_column_equality_comparison(cond): + """Return a tuple of column indexes if the condition is an equality test. + """ + + if (isinstance(cond, expression.EQ) and + isinstance(cond.left, UnnamedAttributeRef) and + isinstance(cond.right, UnnamedAttributeRef)): + return cond.left.position, cond.right.position + else: + return None + + @staticmethod + def descend_tree(op, cond): + """Recursively push a selection condition down a tree of operators. + + :param op: The root of an operator tree + :type op: raco.algebra.Operator + :type cond: The selection condition + :type cond: raco.expression.expression + + :return: A (possibly modified) operator. + """ + + if isinstance(op, algebra.Select): + # Keep pushing; selects are commutative + op.input = PushSelects.descend_tree(op.input, cond) + return op + elif isinstance(op, algebra.CompositeBinaryOperator): + # Joins and cross-products; consider conversion to an equijoin + left_len = len(op.left.scheme()) + accessed = accessed_columns(cond) + in_left = [col < left_len for col in accessed] + if all(in_left): + # Push the select into the left sub-tree. + op.left = PushSelects.descend_tree(op.left, cond) + return op + elif not any(in_left): + # Push into right subtree; rebase column indexes + expression.rebase_expr(cond, left_len) + op.right = PushSelects.descend_tree(op.right, cond) + return op + else: + # Selection includes both children; attempt to create an + # equijoin condition + cols = PushSelects.is_column_equality_comparison(cond) + if cols: + return op.add_equijoin_condition(cols[0], cols[1]) + elif isinstance(op, algebra.Apply): + # Convert accessed to a list from a set to ensure consistent order + accessed = list(accessed_columns(cond)) + accessed_emits = [op.emitters[i][1] for i in accessed] + if all(isinstance(e, expression.AttributeRef) + for e in accessed_emits): + unnamed_emits = [expression.toUnnamed(e, op.input.scheme()) + for e in accessed_emits] + # This condition only touches columns that are copied verbatim + # from the child, so we can push it. + index_map = {a: e.position + for (a, e) in zip(accessed, unnamed_emits)} + expression.reindex_expr(cond, index_map) + op.input = PushSelects.descend_tree(op.input, cond) + return op + elif isinstance(op, algebra.GroupBy): + # Convert accessed to a list from a set to ensure consistent order + accessed = list(accessed_columns(cond)) + if all((a < len(op.grouping_list)) for a in accessed): + accessed_grps = [op.grouping_list[a] for a in accessed] + # This condition only touches columns that are copied verbatim + # from the child (grouping keys), so we can push it. + assert all(isinstance(e, expression.AttributeRef) + for e in op.grouping_list) + unnamed_grps = [expression.toUnnamed(e, op.input.scheme()) + for e in accessed_grps] + index_map = {a: e.position + for (a, e) in zip(accessed, unnamed_grps)} + expression.reindex_expr(cond, index_map) + op.input = PushSelects.descend_tree(op.input, cond) + return op + + # Can't push any more: instantiate the selection + new_op = algebra.Select(cond, op) + new_op.has_been_pushed = True + return new_op + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + if op.has_been_pushed: + return op + + new_op = PushSelects.descend_tree(op.input, op.condition) + + # The new root may also be a select, so fire the rule recursively + return self.fire(new_op) + + def __str__(self): + return ("Select, Cross/Join => Join;" + + " Select, Apply => Apply, Select;" + + " Select, GroupBy => GroupBy, Select") + + +class MergeSelects(Rule): + """Merge consecutive Selects into a single conjunctive selection.""" + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + + while isinstance(op.input, algebra.Select): + conjunc = expression.AND(op.condition, op.input.condition) + op = algebra.Select(conjunc, op.input.input) + + return op + + def __str__(self): + return "Select, Select => Select" + + +class PushApply(Rule): + """Many Applies in MyriaL are added to select fewer columns from the + input. In some of these cases, we can do less work in the children by + preventing them from producing columns we will then immediately drop. + + Currently, this rule: + - merges consecutive Apply operations into one Apply, possibly dropping + some of the produced columns along the way. + - makes ProjectingJoin only produce columns that are later read. + TODO: drop the Apply if the column-selection pushed into the + ProjectingJoin is everything the Apply was doing. See note below. + """ + + def fire(self, op): + if not isinstance(op, algebra.Apply): + return op + + child = op.input + + if isinstance(child, algebra.Apply): + in_scheme = child.scheme() + child_in_scheme = child.input.scheme() + names, emits = zip(*op.emitters) + emits = [to_unnamed_recursive(e, in_scheme) + for e in emits] + child_emits = [to_unnamed_recursive(e[1], child_in_scheme) + for e in child.emitters] + + def convert(n): + if isinstance(n, expression.UnnamedAttributeRef): + n = child_emits[n.position] + else: + n.apply(convert) + return n + + emits = [convert(copy.deepcopy(e)) for e in emits] + + new_apply = algebra.Apply(emitters=zip(names, emits), + input=child.input) + return self.fire(new_apply) + + elif isinstance(child, algebra.ProjectingJoin): + in_scheme = child.scheme() + names, emits = zip(*op.emitters) + emits = [to_unnamed_recursive(e, in_scheme) + for e in emits] + accessed = sorted(set(itertools.chain(*(accessed_columns(e) + for e in emits)))) + index_map = {a: i for (i, a) in enumerate(accessed)} + child.output_columns = [child.output_columns[i] for i in accessed] + for e in emits: + expression.reindex_expr(e, index_map) + # TODO(dhalperi) we may not need the Apply if all it did was rename + # and/or select certain columns. Figure out these cases and omit + # the Apply + return algebra.Apply(emitters=zip(names, emits), + input=child) + + return op + + def __str__(self): + return 'Push Apply into Apply, ProjectingJoin' + + +class RemoveUnusedColumns(Rule): + """For operators that construct new tuples (e.g., GroupBy or Join), we are + guaranteed that any columns from an input tuple that are ignored (neither + used internally nor to produce the output columns) cannot be used higher + in the query tree. For these cases, this rule will prepend an Apply that + keeps only the referenced columns. The goal is that after this rule, + a subsequent invocation of PushApply will be able to push that + column-selection operation further down the tree.""" + + def fire(self, op): + if isinstance(op, algebra.GroupBy): + child = op.input + child_scheme = child.scheme() + grp_list = [to_unnamed_recursive(g, child_scheme) + for g in op.grouping_list] + agg_list = [to_unnamed_recursive(a, child_scheme) + for a in op.aggregate_list] + agg = [accessed_columns(a) for a in agg_list] + pos = [g.position for g in grp_list] + accessed = sorted(set(itertools.chain(*(agg + [pos])))) + if not accessed: + # Bug #207: COUNTALL() does not access any columns. So if the + # query is just a COUNT(*), we would generate an empty Apply. + # If this happens, just keep the first column of the input. + accessed = [0] + if len(accessed) != len(child_scheme): + emitters = [(None, UnnamedAttributeRef(i)) for i in accessed] + new_apply = algebra.Apply(emitters, child) + index_map = {a: i for (i, a) in enumerate(accessed)} + for agg_expr in itertools.chain(grp_list, agg_list): + expression.reindex_expr(agg_expr, index_map) + op.grouping_list = grp_list + op.aggregate_list = agg_list + op.input = new_apply + return op + elif isinstance(op, algebra.ProjectingJoin): + l_scheme = op.left.scheme() + r_scheme = op.right.scheme() + in_scheme = l_scheme + r_scheme + condition = to_unnamed_recursive(op.condition, in_scheme) + column_list = [to_unnamed_recursive(c, in_scheme) + for c in op.output_columns] + + accessed = (accessed_columns(condition) + | set(c.position for c in op.output_columns)) + if len(accessed) == len(in_scheme): + return op + + accessed = sorted(accessed) + left = [a for a in accessed if a < len(l_scheme)] + if len(left) < len(l_scheme): + emits = [(None, UnnamedAttributeRef(a)) for a in left] + apply = algebra.Apply(emits, op.left) + op.left = apply + right = [a - len(l_scheme) for a in accessed + if a >= len(l_scheme)] + if len(right) < len(r_scheme): + emits = [(None, UnnamedAttributeRef(a)) for a in right] + apply = algebra.Apply(emits, op.right) + op.right = apply + index_map = {a: i for (i, a) in enumerate(accessed)} + expression.reindex_expr(condition, index_map) + [expression.reindex_expr(c, index_map) for c in column_list] + op.condition = condition + op.output_columns = column_list + return op + + return op + + def __str__(self): + return 'Remove unused columns' \ No newline at end of file From c64ad5402197c6af8c77a568dff46f04a535b98b Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Mon, 30 Jun 2014 19:08:34 -0700 Subject: [PATCH 3/7] catalog: add get_scheme as abstract method --- raco/catalog.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/raco/catalog.py b/raco/catalog.py index 9fa330c6..b8bb8540 100644 --- a/raco/catalog.py +++ b/raco/catalog.py @@ -31,6 +31,10 @@ class Catalog(object): def get_num_servers(self): """ Return number of servers in myria deployment """ + @abstractmethod + def get_scheme(self, rel_key): + """ Return scheme of tuples of rel_key """ + @abstractmethod def num_tuples(self, rel_key): """ Return number of tuples of rel_key """ @@ -54,3 +58,6 @@ def num_tuples(self, rel_key): if rel_key in self.cached: return self.cached[rel_key] return DEFAULT_CARDINALITY + + def get_scheme(self, rel_key): + raise NotImplementedError() From 7ecda6c65d3be435a667023116b929bb6beda7f2 Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Mon, 30 Jun 2014 23:59:19 -0700 Subject: [PATCH 4/7] scheme: add a get_names function --- raco/scheme.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/raco/scheme.py b/raco/scheme.py index 24844d7a..5727e341 100644 --- a/raco/scheme.py +++ b/raco/scheme.py @@ -44,6 +44,10 @@ def get_types(self): """Return a list of the types in this scheme.""" return [_type for name, _type in self.attributes] + def get_names(self): + """Return a list of the names in this scheme.""" + return [name for name, _type in self.attributes] + def typecheck(self, tup): rmap = raco.types.reverse_python_type_map try: From 3ba09ca8d51037ae49ad7a72fde1b407a37fea07 Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Mon, 30 Jun 2014 17:56:12 -0700 Subject: [PATCH 5/7] sql backend: basic partial implementation and some tests --- raco/language/sql/__init__.py | 0 raco/language/sql/catalog.py | 170 +++++++++++++++++++++++++++++++++ raco/language/sql/test_case.py | 54 +++++++++++ raco/language/sql/test_sql.py | 93 ++++++++++++++++++ setup.py | 2 +- 5 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 raco/language/sql/__init__.py create mode 100644 raco/language/sql/catalog.py create mode 100644 raco/language/sql/test_case.py create mode 100644 raco/language/sql/test_sql.py diff --git a/raco/language/sql/__init__.py b/raco/language/sql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/raco/language/sql/catalog.py b/raco/language/sql/catalog.py new file mode 100644 index 00000000..203b4828 --- /dev/null +++ b/raco/language/sql/catalog.py @@ -0,0 +1,170 @@ +from sqlalchemy import (Column, Table, MetaData, Integer, String, + Float, create_engine, select, func) + +import raco.algebra as algebra +from raco.catalog import Catalog +import raco.expression as expression +import raco.scheme as scheme +import raco.types as types + + +type_to_raco = {Integer: types.LONG_TYPE, + String: types.STRING_TYPE, + Float: types.FLOAT_TYPE} + + +raco_to_type = {types.LONG_TYPE: Integer, + types.INT_TYPE: Integer, + types.STRING_TYPE: String, + types.FLOAT_TYPE: Float, + types.DOUBLE_TYPE: Float} + + +class SQLCatalog(Catalog): + def __init__(self, engine=None): + if not engine: + self.engine = create_engine('sqlite:///:memory:', echo=True) + else: + self.engine = engine + self.metadata = MetaData() + + def get_num_servers(self): + """ Return number of servers in myria deployment """ + return 1 + + def num_tuples(self, rel_key): + """ Return number of tuples of rel_key """ + table = self.metadata.tables[str(rel_key)] + return self.engine.execute(table.count()).scalar() + + def get_scheme(self, rel_key): + table = self.metadata.tables[str(rel_key)] + return scheme.Scheme((c.name, type_to_raco[type(c.type)]) + for c in table.columns) + + def add_table(self, name, schema, tuples=None): + columns = [Column(n, raco_to_type[t](), nullable=False) + for n, t in schema.attributes] + table = Table(name, self.metadata, *columns) + table.create(self.engine) + if tuples: + self.engine.execute(table.insert().values(tuples)) + + def _convert_expr(self, cols, expr, input_scheme): + if isinstance(expr, expression.AttributeRef): + return self._convert_attribute_ref(cols, expr, input_scheme) + if isinstance(expr, expression.ZeroaryOperator): + return self._convert_zeroary_expr(cols, expr, input_scheme) + if isinstance(expr, expression.UnaryOperator): + return self._convert_unary_expr(cols, expr, input_scheme) + if isinstance(expr, expression.BinaryOperator): + return self._convert_binary_expr(cols, expr, input_scheme) + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_attribute_ref(self, cols, expr, input_scheme): + print cols + print expr + if isinstance(expr, expression.UnnamedAttributeRef): + return cols[expr.position] + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_zeroary_expr(self, cols, expr, input_scheme): + if isinstance(expr, expression.COUNTALL): + return func.count(cols[0]) + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_unary_expr(self, cols, expr, input_scheme): + input = self._convert_expr(cols, expr.input, input_scheme) + if isinstance(expr, expression.MAX): + return func.max(input) + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_binary_expr(self, cols, expr, input_scheme): + left = self._convert_expr(cols, expr.left, input_scheme) + right = self._convert_expr(cols, expr.right, input_scheme) + + if isinstance(expr, expression.AND): + return left & right + if isinstance(expr, expression.OR): + return left | right + if isinstance(expr, expression.EQ): + return left == right + if isinstance(expr, expression.NEQ): + return left != right + if isinstance(expr, expression.LT): + return left < right + if isinstance(expr, expression.LTEQ): + return left <= right + if isinstance(expr, expression.GT): + return left > right + if isinstance(expr, expression.GTEQ): + return left >= right + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _get_zeroary_sql(self, plan): + if isinstance(plan, algebra.Scan): + return self.metadata.tables[str(plan.relation_key)].select() + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def _get_unary_sql(self, plan): + input = self.get_sql(plan.input).alias("input") + input_sch = plan.input.scheme() + cols = list(input.c) + + if isinstance(plan, algebra.Select): + cond = self._convert_expr(cols, plan.condition, input_sch) + return input.select(cond) + + elif isinstance(plan, algebra.Apply): + clause = [self._convert_expr(cols, e, input_sch).label(name) + for (name, e) in plan.emitters] + return select(clause) + + elif isinstance(plan, algebra.GroupBy): + a = [self._convert_expr(cols, e, input_sch) + for e in plan.aggregate_list] + g = [self._convert_expr(cols, e, input_sch) + for e in plan.grouping_list] + sel = select(g + a) + if not plan.grouping_list: + return sel + return sel.group_by(*g) + + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def _get_binary_sql(self, plan): + # Use aliases to resolve duplicate names + left = self.get_sql(plan.left).alias("left") + right = self.get_sql(plan.right).alias("right") + all_cols = left.c + right.c + all_sch = plan.left.scheme() + plan.right.scheme() + assert len(all_cols) == len(all_sch) + + if isinstance(plan, algebra.CrossProduct): + out_cols = zip(all_cols, all_sch.get_names()) + return select([col.label(name) for col, name in out_cols]) + + elif isinstance(plan, algebra.ProjectingJoin): + cond = self._convert_expr(all_cols, plan.condition, all_sch) + return left.join(right, cond) + + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def get_sql(self, plan): + if isinstance(plan, algebra.ZeroaryOperator): + return self._get_zeroary_sql(plan) + elif isinstance(plan, algebra.UnaryOperator): + return self._get_unary_sql(plan) + elif isinstance(plan, algebra.BinaryOperator): + return self._get_binary_sql(plan) + elif isinstance(plan, algebra.NaryOperator): + return self._get_nary_sql(plan) + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def evaluate(self, plan): + statement = self.get_sql(plan) + print statement + return (tuple(t) for t in self.engine.execute(statement)) \ No newline at end of file diff --git a/raco/language/sql/test_case.py b/raco/language/sql/test_case.py new file mode 100644 index 00000000..2d55538a --- /dev/null +++ b/raco/language/sql/test_case.py @@ -0,0 +1,54 @@ +from collections import Counter +import unittest + +import raco.algebra as algebra +from raco.compile import optimize_by_rules +from raco.language.logical import OptLogicalAlgebra +import raco.myrial.interpreter as interpreter +import raco.myrial.parser as parser +import raco.scheme as scheme +from .catalog import SQLCatalog +import raco.types as types + + +class SQLTestCase(unittest.TestCase): + """A base for testing the compilation of RACO programs to SQL queries""" + + emp_table = [ + # id dept_id name salary + (1, 2, "Bill Howe", 25000), + (2, 1, "Dan Halperin", 90000), + (3, 1, "Andrew Whitaker", 5000), + (4, 2, "Shumo Chu", 5000), + (5, 1, "Victor Almeida", 25000), + (6, 3, "Dan Suciu", 90000), + (7, 1, "Magdalena Balazinska", 25000)] + + emp_schema = scheme.Scheme([("id", types.INT_TYPE), + ("dept_id", types.INT_TYPE), + ("name", types.STRING_TYPE), + ("salary", types.LONG_TYPE)]) + + emp_key = "public:adhoc:employee" + + def setUp(self): + # SQLAlchemy + self.db = SQLCatalog() + self.db.add_table(self.emp_key, self.emp_schema, self.emp_table) + # MyriaL + self.parser = parser.Parser() + self.processor = interpreter.StatementProcessor(self.db) + + def query_to_phys_plan(self, query): + statements = self.parser.parse(query) + self.processor.evaluate(statements) + p = self.processor.get_logical_plan() + p = optimize_by_rules(p, OptLogicalAlgebra.opt_rules()) + if isinstance(p, (algebra.Store, algebra.StoreTemp)): + p = p.input + return p + + def execute(self, query, expected): + p = self.query_to_phys_plan(query) + ans = self.db.evaluate(p) + self.assertEquals(expected, Counter(ans)) \ No newline at end of file diff --git a/raco/language/sql/test_sql.py b/raco/language/sql/test_sql.py new file mode 100644 index 00000000..edbb9537 --- /dev/null +++ b/raco/language/sql/test_sql.py @@ -0,0 +1,93 @@ +from collections import Counter, defaultdict +from raco.language.sql.test_case import SQLTestCase + +import raco.scheme as scheme +import raco.types as types + + +class TestScheme(SQLTestCase): + """Test that we can convert Raco schemes to SQL Alchemy schemes""" + def test_simple_scheme(self): + sch = scheme.Scheme() + sch.addAttribute('w', types.FLOAT_TYPE) + sch.addAttribute('x', types.INT_TYPE) + sch.addAttribute('y', types.LONG_TYPE) + sch.addAttribute('z', types.STRING_TYPE) + + self.db.add_table('simple', sch) + + sch2 = self.db.get_scheme('simple') + self.assertEquals(sch, sch2) + + +class TestQuery(SQLTestCase): + """Test actually compiling plans to SQL.""" + def test_catalog(self): + self.assertEquals(len(self.emp_table), + self.db.num_tuples(self.emp_key)) + + def test_simple_scan(self): + query = """x = scan({emp}); + store(x, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter(self.emp_table) + self.execute(query, expected) + + def test_column_select(self): + query = """x = scan({emp}); + y = [from x emit $2]; + store(y, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((x[2],) for x in self.emp_table) + self.execute(query, expected) + + def test_rename_column_select(self): + query = """x = scan({emp}); + y = [from x emit $0 as a, $0 as b]; + store(y, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((x[0], x[0]) for x in self.emp_table) + self.execute(query, expected) + + def test_count_query(self): + query = """x = countall(scan({emp})); + store(x, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter([(len(self.emp_table),)]) + self.execute(query, expected) + + def test_theta_join_query(self): + query = """x = scan({emp}); + y = scan({emp}); + z = [from x,y where x.salary < y.salary emit x.*]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((a, b, c, d) + for (a, b, c, d) in self.emp_table + for (_, _, _, d2) in self.emp_table + if d < d2) + self.execute(query, expected) + + def test_join_query(self): + query = """x = scan({emp}); + y = scan({emp}); + z = [from x,y where x.salary = y.salary emit count(*) as cnt]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + size = len([1 + for (_, _, _, d) in self.emp_table + for (_, _, _, d2) in self.emp_table + if d == d2]) + expected = Counter([(size,)]) + self.execute(query, expected) + + def test_complex_agg_query(self): + query = """x = scan({emp}); + z = [from x emit dept_id, max(salary) as max_salary]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + d = defaultdict(int) + for _, dept_id, _, salary in self.emp_table: + d[dept_id] = max(d[dept_id], salary) + expected = Counter(d.items()) + self.execute(query, expected) diff --git a/setup.py b/setup.py index d2f580fe..405df59d 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,6 @@ def find_packages(path=".", base="", exclude=None): url='https://github.com/uwescience/datalogcompiler', packages=find_packages(exclude=['clang']), package_data={'': ['c_templates/*.template','grappa_templates/*.template']}, - install_requires=['networkx', 'ply', 'pyparsing'], + install_requires=['networkx', 'ply', 'pyparsing', 'SQLAlchemy'], scripts=['scripts/myrial'] ) From 625f1972b029965e41dbbbe8edc2d945149afdb6 Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Tue, 1 Jul 2014 11:25:30 -0700 Subject: [PATCH 6/7] travis: fix sqlite3 in ci environment --- raco/language/sql/catalog.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/raco/language/sql/catalog.py b/raco/language/sql/catalog.py index 203b4828..b9c763b9 100644 --- a/raco/language/sql/catalog.py +++ b/raco/language/sql/catalog.py @@ -48,7 +48,9 @@ def add_table(self, name, schema, tuples=None): table = Table(name, self.metadata, *columns) table.create(self.engine) if tuples: - self.engine.execute(table.insert().values(tuples)) + tuples = [{n: v for n, v in zip(schema.get_names(), tup)} + for tup in tuples] + self.engine.execute(table.insert(), tuples) def _convert_expr(self, cols, expr, input_scheme): if isinstance(expr, expression.AttributeRef): From 3cf3dea2c0873432ecfdb6dcf8a3038cf7a3a2a4 Mon Sep 17 00:00:00 2001 From: Daniel Halperin Date: Tue, 1 Jul 2014 22:11:18 -0700 Subject: [PATCH 7/7] sql.catalog: add a module-level comment --- raco/language/sql/catalog.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/raco/language/sql/catalog.py b/raco/language/sql/catalog.py index b9c763b9..e2153cba 100644 --- a/raco/language/sql/catalog.py +++ b/raco/language/sql/catalog.py @@ -1,3 +1,7 @@ +""" +A RACO language to compile expressions to SQL. +""" + from sqlalchemy import (Column, Table, MetaData, Integer, String, Float, create_engine, select, func)