diff --git a/c++/scan_code.py b/c++/scan_code.py index 1b0af22c..3ea36f97 100644 --- a/c++/scan_code.py +++ b/c++/scan_code.py @@ -1,5 +1,5 @@ import raco.algebra as alg -import raco.clang as clang +import raco.language.clang as clang import raco.boolean as rbool @@ -311,7 +311,7 @@ def visit (self,n): #generate code for a join chain elif isinstance(n,alg.NaryOperator) : - if isinstance(n,clang.FilteringNLJoinChain) : + if isinstance(n, clang.FilteringNLJoinChain) : for arg in n.args : self.visit(arg) self.generate_join_chain(n) diff --git a/c_test_environment/clang_tests.py b/c_test_environment/clang_tests.py index df08db89..95d4a765 100644 --- a/c_test_environment/clang_tests.py +++ b/c_test_environment/clang_tests.py @@ -3,7 +3,7 @@ from testquery import ClangRunner from generate_test_relations import generate_default from generate_test_relations import need_generate -from raco.language import CCAlgebra +from raco.language.clang import CCAlgebra from platform_tests import PlatformTest import sys diff --git a/c_test_environment/grappalang_tests.py b/c_test_environment/grappalang_tests.py index 15ac239f..7cefc570 100644 --- a/c_test_environment/grappalang_tests.py +++ b/c_test_environment/grappalang_tests.py @@ -3,7 +3,7 @@ from testquery import GrappalangRunner from generate_test_relations import generate_default from generate_test_relations import need_generate -from raco.language import GrappaAlgebra +from raco.language.grappalang import GrappaAlgebra from platform_tests import PlatformTest from nose.plugins.skip import SkipTest diff --git a/examples/emitcode.py b/examples/emitcode.py index 3b48e6fa..231f5be1 100644 --- a/examples/emitcode.py +++ b/examples/emitcode.py @@ -1,9 +1,7 @@ from raco import RACompiler -from raco.algebra import LogicalAlgebra -import raco.algebra as algebra from raco.compile import compile -from raco.grappalang import GrappaShuffleHashJoin, GrappaSymmetricHashJoin, GrappaHashJoin -import raco.rules as rules +from raco.language.grappalang import (GrappaShuffleHashJoin, + GrappaSymmetricHashJoin) import raco.viz as viz import logging @@ -19,7 +17,7 @@ def hack_plan(alg, plan): alg.set_join_type(GrappaSymmetricHashJoin) elif plan == "shuf": alg.set_join_type(GrappaShuffleHashJoin) - + def emitCode(query, name, algType, plan=""): alg = algType() hack_plan(alg, plan) @@ -42,7 +40,7 @@ def emitCode(query, name, algType, plan=""): dlog.optimize(target=alg, eliminate_common_subexpressions=False) LOG.info("physical: %s",dlog.physicalplan) - + print dlog.physicalplan physical_dot = viz.operator_to_dot(dlog.physicalplan) with open("%s.physical.dot"%(name), 'w') as dwf: @@ -52,7 +50,7 @@ def emitCode(query, name, algType, plan=""): code = "" code += comment("Query " + query) code += compile(dlog.physicalplan) - + fname = name+'.cpp' with open(fname, 'w') as f: f.write(code) diff --git a/examples/sp2bench/sp2bench_rdf_long.py b/examples/sp2bench/sp2bench_rdf_long.py index 48bd7afd..2aad3445 100644 --- a/examples/sp2bench/sp2bench_rdf_long.py +++ b/examples/sp2bench/sp2bench_rdf_long.py @@ -1,6 +1,6 @@ from emitcode import emitCode -import raco.algebra as algebra -from raco.language import CCAlgebra, GrappaAlgebra +from raco.language.grappalang import GrappaAlgebra +from raco.language.clang import CCAlgebra import logging logging.basicConfig(level=logging.DEBUG) @@ -49,7 +49,7 @@ # TODO be sure DISTINCT -# syntactically join with equality; +# syntactically join with equality; #queries['Q5a'] = """A(person, name) :- %(tr)s(article, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://localhost/vocabulary/bench/Article'), queries['Q5a'] = """A(person, name) :- %(tr)s(article, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://localhost/vocabulary/bench/Article'), %(tr)s(article, 'http://purl.org/dc/elements/1.1/creator', person), @@ -72,24 +72,24 @@ # TODO: Q7 requires double negation -#TODO: enable Q8, after dealing with HashJoin( $0 != $7 ) type of cases -#queries['Q8'] = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), -_ = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), - %(tr)s(erdoes, 'http://xmlns.com/foaf/0.1/name', "Paul Erdoes") +#TODO: enable Q8, after dealing with HashJoin( $0 != $7 ) type of cases +#queries['Q8'] = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), +_ = """Erdoes(erdoes) :- %(tr)s(erdoes, 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://xmlns.com/foaf/0.1/Person'), + %(tr)s(erdoes, 'http://xmlns.com/foaf/0.1/name', "Paul Erdoes") A(name) :- Erdoes(erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(doc2, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(doc2, 'http://purl.org/dc/elements/1.1/creator', author2), %(tr)s(author2, 'http://xmlns.com/foaf/0.1/name', name), - author != erdoes, - doc2 != doc, + author != erdoes, + doc2 != doc, author2 != erdoes, author2 != author - + A(name) :- Erdoes(erdoes), %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', erdoes), - %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), + %(tr)s(doc, 'http://purl.org/dc/elements/1.1/creator', author), %(tr)s(author, 'http://xmlns.com/foaf/0.1/name', name), author != erdoes""" #TODO be sure DISTINCT @@ -106,7 +106,7 @@ queries['Q11'] = """A(ee) :- %(tr)s(publication, 'http://www.w3.org/2000/01/rdf-schema#seeAlso', ee)""" #TODO order by, limit, offset - + alg = CCAlgebra prefix="" import sys diff --git a/expressiontest.py b/expressiontest.py deleted file mode 100644 index 45ace5cd..00000000 --- a/expressiontest.py +++ /dev/null @@ -1,60 +0,0 @@ -import raco.expression as e -from raco import RACompiler -from raco.language import MyriaAlgebra -from raco.scheme import Scheme -from raco.algebra import Select, Scan, Join, Apply, LogicalAlgebra -from raco.boolean import EQ, AND, OR, StringLiteral, NumericLiteral -from sampledb import btc_schema, Rr -from raco.compile import compile, optimize -import raco.catalog - -plus = e.PLUS(e.UnnamedAttributeRef(0), e.NamedAttributeRef("x")) -print plus - -minus = e.MINUS(e.Literal(2), e.Literal(5)) -print minus - -absf = e.ABS(e.UnnamedAttributeRef(2)) -print absf - -mix = e.DIVIDE(e.TIMES(plus, absf), minus) -print mix - - -def testRA(): - sch = Scheme([("y",float), ("x",int)]) - R = raco.catalog.Relation("R", sch) - J = Join(EQ(e.UnnamedAttributeRef(0), e.NamedAttributeRef("x")), Scan(R), Scan(R)) - A = Apply(J, z=e.PLUS(e.NamedAttributeRef("x"), e.NamedAttributeRef("y")), w=e.UnnamedAttributeRef(3)) - exprs = optimize([('A',A)], target=MyriaAlgebra(), source=LogicalAlgebra) - print exprs - print compile(exprs) - -def testDatalog(): - query = "A(x) :- R(x,y),S(y,z)" - - print "/*\n%s\n*/" % str(query) - - # Create a compiler object - dlog = RACompiler() - - # parse the query - dlog.fromDatalog(query) - print "************ LOGICAL PLAN *************" - print dlog.logicalplan - print - - # Optimize the query, includes producing a physical plan - print "************ PHYSICAL PLAN *************" - dlog.optimize(target=MyriaAlgebra(), eliminate_common_subexpressions=False) - print dlog.physicalplan - print - - # generate code in the target language - code = dlog.compile() - print "************ CODE *************" - print code - print - - -testRA() diff --git a/logical_ra_pb2.py b/logical_ra_pb2.py deleted file mode 100644 index c66ea291..00000000 --- a/logical_ra_pb2.py +++ /dev/null @@ -1,401 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! - -from protobuf import descriptor -from protobuf import message -from protobuf import reflection -from protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - - - -DESCRIPTOR = descriptor.FileDescriptor( - name='logical_ra.proto', - package='', - serialized_pb='\n\x10logical_ra.proto\"L\n\x15LogicalRaQueryMessage\x12\x0c\n\x04name\x18\x01 \x02(\t\x12%\n\toperators\x18\x03 \x03(\x0b\x32\x12.LogicalRaOperator\"\xd8\x02\n\x11LogicalRaOperator\x12\x36\n\x04type\x18\x01 \x02(\x0e\x32(.LogicalRaOperator.LogicalRaOperatorType\x12\x0c\n\x04name\x18\x02 \x02(\t\x12\x1a\n\x04scan\x18\x03 \x01(\x0b\x32\x0c.LogicalScan\x12\x1e\n\x06select\x18\x04 \x01(\x0b\x32\x0e.LogicalSelect\x12 \n\x07project\x18\x05 \x01(\x0b\x32\x0f.LogicalProject\x12\"\n\x08\x65quijoin\x18\x06 \x01(\x0b\x32\x10.LogicalEquiJoin\x12\x1c\n\x05\x63ross\x18\x07 \x01(\x0b\x32\r.LogicalCross\"]\n\x15LogicalRaOperatorType\x12\x08\n\x04SCAN\x10\x00\x12\x0b\n\x07PROJECT\x10\x01\x12\x08\n\x04JOIN\x10\x02\x12\n\n\x06SELECT\x10\x03\x12\x0c\n\x08\x45QUIJOIN\x10\x04\x12\t\n\x05\x43ROSS\x10\x05\"\x1f\n\x0bLogicalScan\x12\x10\n\x08relation\x18\x01 \x02(\t\"5\n\rLogicalSelect\x12\x11\n\tchildName\x18\x01 \x02(\t\x12\x11\n\tcondition\x18\x02 \x02(\t\"4\n\x0eLogicalProject\x12\x11\n\tchildName\x18\x01 \x02(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\x05\"k\n\x0fLogicalEquiJoin\x12\x15\n\rleftChildName\x18\x01 \x02(\t\x12\x13\n\x0bleftColumns\x18\x02 \x03(\x05\x12\x16\n\x0erightChildName\x18\x03 \x02(\t\x12\x14\n\x0crightColumns\x18\x04 \x03(\x05\"=\n\x0cLogicalCross\x12\x15\n\rleftChildName\x18\x01 \x02(\t\x12\x16\n\x0erightChildName\x18\x02 \x02(\tB6\n$edu.washington.escience.myriad.protoB\x0eLogicalRaProto') - - - -_LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE = descriptor.EnumDescriptor( - name='LogicalRaOperatorType', - full_name='LogicalRaOperator.LogicalRaOperatorType', - filename=None, - file=DESCRIPTOR, - values=[ - descriptor.EnumValueDescriptor( - name='SCAN', index=0, number=0, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='PROJECT', index=1, number=1, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='JOIN', index=2, number=2, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='SELECT', index=3, number=3, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='EQUIJOIN', index=4, number=4, - options=None, - type=None), - descriptor.EnumValueDescriptor( - name='CROSS', index=5, number=5, - options=None, - type=None), - ], - containing_type=None, - options=None, - serialized_start=350, - serialized_end=443, -) - - -_LOGICALRAQUERYMESSAGE = descriptor.Descriptor( - name='LogicalRaQueryMessage', - full_name='LogicalRaQueryMessage', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='name', full_name='LogicalRaQueryMessage.name', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='operators', full_name='LogicalRaQueryMessage.operators', index=1, - number=3, type=11, cpp_type=10, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=20, - serialized_end=96, -) - - -_LOGICALRAOPERATOR = descriptor.Descriptor( - name='LogicalRaOperator', - full_name='LogicalRaOperator', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='type', full_name='LogicalRaOperator.type', index=0, - number=1, type=14, cpp_type=8, label=2, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='name', full_name='LogicalRaOperator.name', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='scan', full_name='LogicalRaOperator.scan', index=2, - number=3, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='select', full_name='LogicalRaOperator.select', index=3, - number=4, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='project', full_name='LogicalRaOperator.project', index=4, - number=5, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='equijoin', full_name='LogicalRaOperator.equijoin', index=5, - number=6, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='cross', full_name='LogicalRaOperator.cross', index=6, - number=7, type=11, cpp_type=10, label=1, - has_default_value=False, default_value=None, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - _LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE, - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=99, - serialized_end=443, -) - - -_LOGICALSCAN = descriptor.Descriptor( - name='LogicalScan', - full_name='LogicalScan', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='relation', full_name='LogicalScan.relation', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=445, - serialized_end=476, -) - - -_LOGICALSELECT = descriptor.Descriptor( - name='LogicalSelect', - full_name='LogicalSelect', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='childName', full_name='LogicalSelect.childName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='condition', full_name='LogicalSelect.condition', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=478, - serialized_end=531, -) - - -_LOGICALPROJECT = descriptor.Descriptor( - name='LogicalProject', - full_name='LogicalProject', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='childName', full_name='LogicalProject.childName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='columns', full_name='LogicalProject.columns', index=1, - number=2, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=533, - serialized_end=585, -) - - -_LOGICALEQUIJOIN = descriptor.Descriptor( - name='LogicalEquiJoin', - full_name='LogicalEquiJoin', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='leftChildName', full_name='LogicalEquiJoin.leftChildName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='leftColumns', full_name='LogicalEquiJoin.leftColumns', index=1, - number=2, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightChildName', full_name='LogicalEquiJoin.rightChildName', index=2, - number=3, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightColumns', full_name='LogicalEquiJoin.rightColumns', index=3, - number=4, type=5, cpp_type=1, label=3, - has_default_value=False, default_value=[], - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=587, - serialized_end=694, -) - - -_LOGICALCROSS = descriptor.Descriptor( - name='LogicalCross', - full_name='LogicalCross', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - descriptor.FieldDescriptor( - name='leftChildName', full_name='LogicalCross.leftChildName', index=0, - number=1, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - descriptor.FieldDescriptor( - name='rightChildName', full_name='LogicalCross.rightChildName', index=1, - number=2, type=9, cpp_type=9, label=2, - has_default_value=False, default_value=unicode("", "utf-8"), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - extension_ranges=[], - serialized_start=696, - serialized_end=757, -) - -_LOGICALRAQUERYMESSAGE.fields_by_name['operators'].message_type = _LOGICALRAOPERATOR -_LOGICALRAOPERATOR.fields_by_name['type'].enum_type = _LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE -_LOGICALRAOPERATOR.fields_by_name['scan'].message_type = _LOGICALSCAN -_LOGICALRAOPERATOR.fields_by_name['select'].message_type = _LOGICALSELECT -_LOGICALRAOPERATOR.fields_by_name['project'].message_type = _LOGICALPROJECT -_LOGICALRAOPERATOR.fields_by_name['equijoin'].message_type = _LOGICALEQUIJOIN -_LOGICALRAOPERATOR.fields_by_name['cross'].message_type = _LOGICALCROSS -_LOGICALRAOPERATOR_LOGICALRAOPERATORTYPE.containing_type = _LOGICALRAOPERATOR; -DESCRIPTOR.message_types_by_name['LogicalRaQueryMessage'] = _LOGICALRAQUERYMESSAGE -DESCRIPTOR.message_types_by_name['LogicalRaOperator'] = _LOGICALRAOPERATOR -DESCRIPTOR.message_types_by_name['LogicalScan'] = _LOGICALSCAN -DESCRIPTOR.message_types_by_name['LogicalSelect'] = _LOGICALSELECT -DESCRIPTOR.message_types_by_name['LogicalProject'] = _LOGICALPROJECT -DESCRIPTOR.message_types_by_name['LogicalEquiJoin'] = _LOGICALEQUIJOIN -DESCRIPTOR.message_types_by_name['LogicalCross'] = _LOGICALCROSS - -class LogicalRaQueryMessage(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALRAQUERYMESSAGE - - # @@protoc_insertion_point(class_scope:LogicalRaQueryMessage) - -class LogicalRaOperator(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALRAOPERATOR - - # @@protoc_insertion_point(class_scope:LogicalRaOperator) - -class LogicalScan(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALSCAN - - # @@protoc_insertion_point(class_scope:LogicalScan) - -class LogicalSelect(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALSELECT - - # @@protoc_insertion_point(class_scope:LogicalSelect) - -class LogicalProject(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALPROJECT - - # @@protoc_insertion_point(class_scope:LogicalProject) - -class LogicalEquiJoin(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALEQUIJOIN - - # @@protoc_insertion_point(class_scope:LogicalEquiJoin) - -class LogicalCross(message.Message): - __metaclass__ = reflection.GeneratedProtocolMessageType - DESCRIPTOR = _LOGICALCROSS - - # @@protoc_insertion_point(class_scope:LogicalCross) - -# @@protoc_insertion_point(module_scope) diff --git a/raco/__init__.py b/raco/__init__.py index 6074245c..b7ef3164 100644 --- a/raco/__init__.py +++ b/raco/__init__.py @@ -1,6 +1,6 @@ from raco.datalog.grammar import parse -from raco.language import MyriaLeftDeepTreeAlgebra -from raco.algebra import LogicalAlgebra +from raco.language.myrialang import MyriaLeftDeepTreeAlgebra +from raco.language.logical import LogicalAlgebra from raco.compile import optimize import logging diff --git a/raco/algebra.py b/raco/algebra.py index 5e7555cf..51724e51 100644 --- a/raco/algebra.py +++ b/raco/algebra.py @@ -1346,16 +1346,4 @@ def rewrite_node(node): else: return node.apply(rewrite_node) - return rewrite_node(dest_op) - - -class LogicalAlgebra(object): - operators = [ - Join, - Select, - Scan - ] - - @staticmethod - def opt_rules(): - return [] + return rewrite_node(dest_op) \ No newline at end of file diff --git a/raco/catalog.py b/raco/catalog.py index 9fa330c6..b8bb8540 100644 --- a/raco/catalog.py +++ b/raco/catalog.py @@ -31,6 +31,10 @@ class Catalog(object): def get_num_servers(self): """ Return number of servers in myria deployment """ + @abstractmethod + def get_scheme(self, rel_key): + """ Return scheme of tuples of rel_key """ + @abstractmethod def num_tuples(self, rel_key): """ Return number of tuples of rel_key """ @@ -54,3 +58,6 @@ def num_tuples(self, rel_key): if rel_key in self.cached: return self.cached[rel_key] return DEFAULT_CARDINALITY + + def get_scheme(self, rel_key): + raise NotImplementedError() diff --git a/raco/datalog/datalog_test.py b/raco/datalog/datalog_test.py index 900eaa96..76ed382c 100644 --- a/raco/datalog/datalog_test.py +++ b/raco/datalog/datalog_test.py @@ -3,8 +3,9 @@ import raco.fakedb from raco import RACompiler -from raco.language import MyriaLeftDeepTreeAlgebra, MyriaHyperCubeAlgebra -from raco.myrialang import compile_to_json +from raco.language.myrialang import (compile_to_json, + MyriaLeftDeepTreeAlgebra, + MyriaHyperCubeAlgebra) from raco.catalog import FakeCatalog diff --git a/raco/datalog/query_tests.py b/raco/datalog/query_tests.py index e965ff54..3188d155 100644 --- a/raco/datalog/query_tests.py +++ b/raco/datalog/query_tests.py @@ -3,7 +3,7 @@ import raco.scheme as scheme import raco.datalog.datalog_test as datalog_test from raco import types -from raco.language import MyriaHyperCubeAlgebra +from raco.language.myrialang import MyriaHyperCubeAlgebra class TestQueryFunctions(datalog_test.DatalogTestCase): diff --git a/raco/language.py b/raco/language/__init__.py similarity index 93% rename from raco/language.py rename to raco/language/__init__.py index 5b30e957..90979b70 100644 --- a/raco/language.py +++ b/raco/language/__init__.py @@ -1,6 +1,5 @@ -from raco import expression - from abc import ABCMeta, abstractmethod +import raco.expression as expression import logging LOG = logging.getLogger(__name__) @@ -174,13 +173,4 @@ def visit_TIMES(self, binaryexpr): def visit_NEG(self, unaryexpr): inputexpr = self.stack.pop() - self.stack.append(self.language.negative(inputexpr)) - - -# import everything from each language -from raco.pythonlang import PythonAlgebra -from raco.pseudocodelang import PseudoCodeAlgebra -from raco.clang import CCAlgebra -from raco.myrialang import MyriaLeftDeepTreeAlgebra -from raco.myrialang import MyriaHyperCubeAlgebra -from raco.grappalang import GrappaAlgebra + self.stack.append(self.language.negative(inputexpr)) \ No newline at end of file diff --git a/raco/c_templates/ascii_scan.template b/raco/language/c_templates/ascii_scan.template similarity index 100% rename from raco/c_templates/ascii_scan.template rename to raco/language/c_templates/ascii_scan.template diff --git a/raco/c_templates/base_query.template b/raco/language/c_templates/base_query.template similarity index 100% rename from raco/c_templates/base_query.template rename to raco/language/c_templates/base_query.template diff --git a/raco/c_templates/binary_scan.template b/raco/language/c_templates/binary_scan.template similarity index 100% rename from raco/c_templates/binary_scan.template rename to raco/language/c_templates/binary_scan.template diff --git a/raco/c_templates/emit_joined_tuple.template b/raco/language/c_templates/emit_joined_tuple.template similarity index 100% rename from raco/c_templates/emit_joined_tuple.template rename to raco/language/c_templates/emit_joined_tuple.template diff --git a/raco/c_templates/filtering_nestedloop_hashjoin_chain.template b/raco/language/c_templates/filtering_nestedloop_hashjoin_chain.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_hashjoin_chain.template rename to raco/language/c_templates/filtering_nestedloop_hashjoin_chain.template diff --git a/raco/c_templates/filtering_nestedloop_join.template b/raco/language/c_templates/filtering_nestedloop_join.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_join.template rename to raco/language/c_templates/filtering_nestedloop_join.template diff --git a/raco/c_templates/filtering_nestedloop_join_chain.template b/raco/language/c_templates/filtering_nestedloop_join_chain.template similarity index 100% rename from raco/c_templates/filtering_nestedloop_join_chain.template rename to raco/language/c_templates/filtering_nestedloop_join_chain.template diff --git a/raco/c_templates/filteringhashjoin.template b/raco/language/c_templates/filteringhashjoin.template similarity index 100% rename from raco/c_templates/filteringhashjoin.template rename to raco/language/c_templates/filteringhashjoin.template diff --git a/raco/c_templates/hashjoin.template b/raco/language/c_templates/hashjoin.template similarity index 100% rename from raco/c_templates/hashjoin.template rename to raco/language/c_templates/hashjoin.template diff --git a/raco/c_templates/join_simple_hash_twopass.template b/raco/language/c_templates/join_simple_hash_twopass.template similarity index 100% rename from raco/c_templates/join_simple_hash_twopass.template rename to raco/language/c_templates/join_simple_hash_twopass.template diff --git a/raco/c_templates/precount_select.template b/raco/language/c_templates/precount_select.template similarity index 100% rename from raco/c_templates/precount_select.template rename to raco/language/c_templates/precount_select.template diff --git a/raco/c_templates/scan.template b/raco/language/c_templates/scan.template similarity index 100% rename from raco/c_templates/scan.template rename to raco/language/c_templates/scan.template diff --git a/raco/c_templates/select_simple_twopass.template b/raco/language/c_templates/select_simple_twopass.template similarity index 100% rename from raco/c_templates/select_simple_twopass.template rename to raco/language/c_templates/select_simple_twopass.template diff --git a/raco/clang.py b/raco/language/clang.py similarity index 98% rename from raco/clang.py rename to raco/language/clang.py index 84fdb88b..776384d6 100644 --- a/raco/clang.py +++ b/raco/language/clang.py @@ -3,13 +3,12 @@ from raco import algebra from raco import expression -from raco.language import Language +from raco.language import Language, clangcommon from raco import rules from raco.pipelines import Pipelined -from raco.clangcommon import StagedTupleRef -from raco import clangcommon +from raco.language.clangcommon import StagedTupleRef -from algebra import gensym +from raco.algebra import gensym import logging @@ -174,7 +173,7 @@ def new_tuple_ref(self, sym, scheme): return CStagedTupleRef(sym, scheme) -from algebra import UnaryOperator +from raco.algebra import UnaryOperator class MemoryScan(algebra.UnaryOperator, CCOperator): diff --git a/raco/clangcommon.py b/raco/language/clangcommon.py similarity index 96% rename from raco/clangcommon.py rename to raco/language/clangcommon.py index 53eb6dc9..1df24497 100644 --- a/raco/clangcommon.py +++ b/raco/language/clangcommon.py @@ -5,8 +5,8 @@ from raco import algebra from raco import expression from raco import catalog -from algebra import gensym -from expression.expression import UnnamedAttributeRef +from raco.algebra import gensym +from raco.expression import UnnamedAttributeRef import logging LOG = logging.getLogger(__name__) @@ -35,36 +35,36 @@ def ct(s): # TODO: # The following is actually a staged materialized tuple ref. -# we should also add a staged reference tuple ref that just has relationsymbol and row +# we should also add a staged reference tuple ref that just has relationsymbol and row class StagedTupleRef: nextid = 0 - + @classmethod def genname(cls): # use StagedTupleRef so everyone shares one mutable copy of nextid - x = StagedTupleRef.nextid + x = StagedTupleRef.nextid StagedTupleRef.nextid+=1 return "t_%03d" % x - + def __init__(self, relsym, scheme): self.name = self.genname() self.relsym = relsym self.scheme = scheme self.__typename = None - + def getTupleTypename(self): - if self.__typename==None: + if self.__typename is None: fields = "" relsym = self.relsym for i in range(0, len(self.scheme)): fieldnum = i fields += "_%(fieldnum)s" % locals() - + self.__typename = "MaterializedTupleRef_%(relsym)s%(fields)s" % locals() - + return self.__typename - + def generateDefinition(self): fielddeftemplate = """int64_t _fields[%(numfields)s]; """ @@ -78,15 +78,15 @@ class %(tupletypename)s { int64_t get(int field) const { return _fields[field]; } - + void set(int field, int64_t val) { _fields[field] = val; } - + int numFields() const { return %(numfields)s; } - + %(tupletypename)s () { // no-op } @@ -94,7 +94,7 @@ class %(tupletypename)s { %(tupletypename)s (std::vector vals) { for (int i=0; i Select, Select" - - -class MergeSelects(rules.Rule): - """Merge consecutive Selects into a single conjunctive selection.""" - - def fire(self, op): - if not isinstance(op, algebra.Select): - return op - - while isinstance(op.input, algebra.Select): - conjunc = expression.AND(op.condition, op.input.condition) - op = algebra.Select(conjunc, op.input.input) - - return op - - def __str__(self): - return "Select, Select => Select" - - class ProjectToDistinctColumnSelect(rules.Rule): def fire(self, expr): # If not a Project, who cares? @@ -1175,259 +1132,6 @@ def fire(self, expr): return colSelect -def is_column_equality_comparison(cond): - """Return a tuple of column indexes if the condition is an equality test. - """ - - if (isinstance(cond, expression.EQ) and - isinstance(cond.left, UnnamedAttributeRef) and - isinstance(cond.right, UnnamedAttributeRef)): - return cond.left.position, cond.right.position - else: - return None - - -class PushApply(rules.Rule): - """Many Applies in MyriaL are added to select fewer columns from the - input. In some of these cases, we can do less work in the children by - preventing them from producing columns we will then immediately drop. - - Currently, this rule: - - merges consecutive Apply operations into one Apply, possibly dropping - some of the produced columns along the way. - - makes ProjectingJoin only produce columns that are later read. - TODO: drop the Apply if the column-selection pushed into the - ProjectingJoin is everything the Apply was doing. See note below. - """ - - def fire(self, op): - if not isinstance(op, algebra.Apply): - return op - - child = op.input - - if isinstance(child, algebra.Apply): - in_scheme = child.scheme() - child_in_scheme = child.input.scheme() - names, emits = zip(*op.emitters) - emits = [to_unnamed_recursive(e, in_scheme) - for e in emits] - child_emits = [to_unnamed_recursive(e[1], child_in_scheme) - for e in child.emitters] - - def convert(n): - if isinstance(n, expression.UnnamedAttributeRef): - n = child_emits[n.position] - else: - n.apply(convert) - return n - - emits = [convert(copy.deepcopy(e)) for e in emits] - - new_apply = algebra.Apply(emitters=zip(names, emits), - input=child.input) - return self.fire(new_apply) - - elif isinstance(child, algebra.ProjectingJoin): - in_scheme = child.scheme() - names, emits = zip(*op.emitters) - emits = [to_unnamed_recursive(e, in_scheme) - for e in emits] - accessed = sorted(set(itertools.chain(*(accessed_columns(e) - for e in emits)))) - index_map = {a: i for (i, a) in enumerate(accessed)} - child.output_columns = [child.output_columns[i] for i in accessed] - for e in emits: - expression.reindex_expr(e, index_map) - # TODO(dhalperi) we may not need the Apply if all it did was rename - # and/or select certain columns. Figure out these cases and omit - # the Apply - return algebra.Apply(emitters=zip(names, emits), - input=child) - - return op - - def __str__(self): - return 'Push Apply into Apply, ProjectingJoin' - - -class RemoveUnusedColumns(rules.Rule): - """For operators that construct new tuples (e.g., GroupBy or Join), we are - guaranteed that any columns from an input tuple that are ignored (neither - used internally nor to produce the output columns) cannot be used higher - in the query tree. For these cases, this rule will prepend an Apply that - keeps only the referenced columns. The goal is that after this rule, - a subsequent invocation of PushApply will be able to push that - column-selection operation further down the tree.""" - - def fire(self, op): - if isinstance(op, algebra.GroupBy): - child = op.input - child_scheme = child.scheme() - grp_list = [to_unnamed_recursive(g, child_scheme) - for g in op.grouping_list] - agg_list = [to_unnamed_recursive(a, child_scheme) - for a in op.aggregate_list] - agg = [accessed_columns(a) for a in agg_list] - pos = [g.position for g in grp_list] - accessed = sorted(set(itertools.chain(*(agg + [pos])))) - if not accessed: - # Bug #207: COUNTALL() does not access any columns. So if the - # query is just a COUNT(*), we would generate an empty Apply. - # If this happens, just keep the first column of the input. - accessed = [0] - if len(accessed) != len(child_scheme): - emitters = [(None, UnnamedAttributeRef(i)) for i in accessed] - new_apply = algebra.Apply(emitters, child) - index_map = {a: i for (i, a) in enumerate(accessed)} - for agg_expr in itertools.chain(grp_list, agg_list): - expression.reindex_expr(agg_expr, index_map) - op.grouping_list = grp_list - op.aggregate_list = agg_list - op.input = new_apply - return op - elif isinstance(op, algebra.ProjectingJoin): - l_scheme = op.left.scheme() - r_scheme = op.right.scheme() - in_scheme = l_scheme + r_scheme - condition = to_unnamed_recursive(op.condition, in_scheme) - column_list = [to_unnamed_recursive(c, in_scheme) - for c in op.output_columns] - - accessed = (accessed_columns(condition) - | set(c.position for c in op.output_columns)) - if len(accessed) == len(in_scheme): - return op - - accessed = sorted(accessed) - left = [a for a in accessed if a < len(l_scheme)] - if len(left) < len(l_scheme): - emits = [(None, UnnamedAttributeRef(a)) for a in left] - apply = algebra.Apply(emits, op.left) - op.left = apply - right = [a - len(l_scheme) for a in accessed - if a >= len(l_scheme)] - if len(right) < len(r_scheme): - emits = [(None, UnnamedAttributeRef(a)) for a in right] - apply = algebra.Apply(emits, op.right) - op.right = apply - index_map = {a: i for (i, a) in enumerate(accessed)} - expression.reindex_expr(condition, index_map) - [expression.reindex_expr(c, index_map) for c in column_list] - op.condition = condition - op.output_columns = column_list - return op - - return op - - def __str__(self): - return 'Remove unused columns' - - -class PushSelects(rules.Rule): - """Push selections.""" - - @staticmethod - def descend_tree(op, cond): - """Recursively push a selection condition down a tree of operators. - - :param op: The root of an operator tree - :type op: raco.algebra.Operator - :type cond: The selection condition - :type cond: raco.expression.expression - - :return: A (possibly modified) operator. - """ - - if isinstance(op, algebra.Select): - # Keep pushing; selects are commutative - op.input = PushSelects.descend_tree(op.input, cond) - return op - elif isinstance(op, algebra.CompositeBinaryOperator): - # Joins and cross-products; consider conversion to an equijoin - left_len = len(op.left.scheme()) - accessed = accessed_columns(cond) - in_left = [col < left_len for col in accessed] - if all(in_left): - # Push the select into the left sub-tree. - op.left = PushSelects.descend_tree(op.left, cond) - return op - elif not any(in_left): - # Push into right subtree; rebase column indexes - expression.rebase_expr(cond, left_len) - op.right = PushSelects.descend_tree(op.right, cond) - return op - else: - # Selection includes both children; attempt to create an - # equijoin condition - cols = is_column_equality_comparison(cond) - if cols: - return op.add_equijoin_condition(cols[0], cols[1]) - elif isinstance(op, algebra.Apply): - # Convert accessed to a list from a set to ensure consistent order - accessed = list(accessed_columns(cond)) - accessed_emits = [op.emitters[i][1] for i in accessed] - if all(isinstance(e, expression.AttributeRef) - for e in accessed_emits): - unnamed_emits = [expression.toUnnamed(e, op.input.scheme()) - for e in accessed_emits] - # This condition only touches columns that are copied verbatim - # from the child, so we can push it. - index_map = {a: e.position - for (a, e) in zip(accessed, unnamed_emits)} - expression.reindex_expr(cond, index_map) - op.input = PushSelects.descend_tree(op.input, cond) - return op - elif isinstance(op, algebra.GroupBy): - # Convert accessed to a list from a set to ensure consistent order - accessed = list(accessed_columns(cond)) - if all((a < len(op.grouping_list)) for a in accessed): - accessed_grps = [op.grouping_list[a] for a in accessed] - # This condition only touches columns that are copied verbatim - # from the child (grouping keys), so we can push it. - assert all(isinstance(e, expression.AttributeRef) - for e in op.grouping_list) - unnamed_grps = [expression.toUnnamed(e, op.input.scheme()) - for e in accessed_grps] - index_map = {a: e.position - for (a, e) in zip(accessed, unnamed_grps)} - expression.reindex_expr(cond, index_map) - op.input = PushSelects.descend_tree(op.input, cond) - return op - - # Can't push any more: instantiate the selection - new_op = algebra.Select(cond, op) - new_op.has_been_pushed = True - return new_op - - def fire(self, op): - if not isinstance(op, algebra.Select): - return op - if op.has_been_pushed: - return op - - new_op = PushSelects.descend_tree(op.input, op.condition) - - # The new root may also be a select, so fire the rule recursively - return self.fire(new_op) - - def __str__(self): - return ("Select, Cross/Join => Join;" - + " Select, Apply => Apply, Select;" - + " Select, GroupBy => GroupBy, Select") - - -class RemoveTrivialSequences(rules.Rule): - def fire(self, expr): - if not isinstance(expr, algebra.Sequence): - return expr - - if len(expr.args) == 1: - return expr.args[0] - else: - return expr - - class MergeToNaryJoin(rules.Rule): """Merge consecutive binary join into a single multiway join Note: this code assumes that the binary joins form a left deep tree @@ -1518,16 +1222,16 @@ def fire(self, expr): # logical groups of catalog transparent rules # 1. this must be applied first -remove_trivial_sequences = [RemoveTrivialSequences()] +remove_trivial_sequences = [rules.RemoveTrivialSequences()] # 2. simple group by simple_group_by = [rules.SimpleGroupBy()] # 3. push down selection push_select = [ - SplitSelects(), - PushSelects(), - MergeSelects() + rules.SplitSelects(), + rules.PushSelects(), + rules.MergeSelects() ] # 4. push projection @@ -1540,11 +1244,11 @@ def fire(self, expr): push_apply = [ # These really ought to be run until convergence. # For now, run twice and finish with PushApply. - PushApply(), - RemoveUnusedColumns(), - PushApply(), - RemoveUnusedColumns(), - PushApply(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply(), + rules.RemoveUnusedColumns(), + rules.PushApply(), ] # 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra diff --git a/raco/pseudocodelang.py b/raco/language/pseudocodelang.py similarity index 100% rename from raco/pseudocodelang.py rename to raco/language/pseudocodelang.py diff --git a/raco/pythonlang.py b/raco/language/pythonlang.py similarity index 100% rename from raco/pythonlang.py rename to raco/language/pythonlang.py diff --git a/raco/language/sql/__init__.py b/raco/language/sql/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/raco/language/sql/catalog.py b/raco/language/sql/catalog.py new file mode 100644 index 00000000..e2153cba --- /dev/null +++ b/raco/language/sql/catalog.py @@ -0,0 +1,176 @@ +""" +A RACO language to compile expressions to SQL. +""" + +from sqlalchemy import (Column, Table, MetaData, Integer, String, + Float, create_engine, select, func) + +import raco.algebra as algebra +from raco.catalog import Catalog +import raco.expression as expression +import raco.scheme as scheme +import raco.types as types + + +type_to_raco = {Integer: types.LONG_TYPE, + String: types.STRING_TYPE, + Float: types.FLOAT_TYPE} + + +raco_to_type = {types.LONG_TYPE: Integer, + types.INT_TYPE: Integer, + types.STRING_TYPE: String, + types.FLOAT_TYPE: Float, + types.DOUBLE_TYPE: Float} + + +class SQLCatalog(Catalog): + def __init__(self, engine=None): + if not engine: + self.engine = create_engine('sqlite:///:memory:', echo=True) + else: + self.engine = engine + self.metadata = MetaData() + + def get_num_servers(self): + """ Return number of servers in myria deployment """ + return 1 + + def num_tuples(self, rel_key): + """ Return number of tuples of rel_key """ + table = self.metadata.tables[str(rel_key)] + return self.engine.execute(table.count()).scalar() + + def get_scheme(self, rel_key): + table = self.metadata.tables[str(rel_key)] + return scheme.Scheme((c.name, type_to_raco[type(c.type)]) + for c in table.columns) + + def add_table(self, name, schema, tuples=None): + columns = [Column(n, raco_to_type[t](), nullable=False) + for n, t in schema.attributes] + table = Table(name, self.metadata, *columns) + table.create(self.engine) + if tuples: + tuples = [{n: v for n, v in zip(schema.get_names(), tup)} + for tup in tuples] + self.engine.execute(table.insert(), tuples) + + def _convert_expr(self, cols, expr, input_scheme): + if isinstance(expr, expression.AttributeRef): + return self._convert_attribute_ref(cols, expr, input_scheme) + if isinstance(expr, expression.ZeroaryOperator): + return self._convert_zeroary_expr(cols, expr, input_scheme) + if isinstance(expr, expression.UnaryOperator): + return self._convert_unary_expr(cols, expr, input_scheme) + if isinstance(expr, expression.BinaryOperator): + return self._convert_binary_expr(cols, expr, input_scheme) + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_attribute_ref(self, cols, expr, input_scheme): + print cols + print expr + if isinstance(expr, expression.UnnamedAttributeRef): + return cols[expr.position] + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_zeroary_expr(self, cols, expr, input_scheme): + if isinstance(expr, expression.COUNTALL): + return func.count(cols[0]) + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_unary_expr(self, cols, expr, input_scheme): + input = self._convert_expr(cols, expr.input, input_scheme) + if isinstance(expr, expression.MAX): + return func.max(input) + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _convert_binary_expr(self, cols, expr, input_scheme): + left = self._convert_expr(cols, expr.left, input_scheme) + right = self._convert_expr(cols, expr.right, input_scheme) + + if isinstance(expr, expression.AND): + return left & right + if isinstance(expr, expression.OR): + return left | right + if isinstance(expr, expression.EQ): + return left == right + if isinstance(expr, expression.NEQ): + return left != right + if isinstance(expr, expression.LT): + return left < right + if isinstance(expr, expression.LTEQ): + return left <= right + if isinstance(expr, expression.GT): + return left > right + if isinstance(expr, expression.GTEQ): + return left >= right + + raise NotImplementedError("expression {} to sql".format(type(expr))) + + def _get_zeroary_sql(self, plan): + if isinstance(plan, algebra.Scan): + return self.metadata.tables[str(plan.relation_key)].select() + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def _get_unary_sql(self, plan): + input = self.get_sql(plan.input).alias("input") + input_sch = plan.input.scheme() + cols = list(input.c) + + if isinstance(plan, algebra.Select): + cond = self._convert_expr(cols, plan.condition, input_sch) + return input.select(cond) + + elif isinstance(plan, algebra.Apply): + clause = [self._convert_expr(cols, e, input_sch).label(name) + for (name, e) in plan.emitters] + return select(clause) + + elif isinstance(plan, algebra.GroupBy): + a = [self._convert_expr(cols, e, input_sch) + for e in plan.aggregate_list] + g = [self._convert_expr(cols, e, input_sch) + for e in plan.grouping_list] + sel = select(g + a) + if not plan.grouping_list: + return sel + return sel.group_by(*g) + + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def _get_binary_sql(self, plan): + # Use aliases to resolve duplicate names + left = self.get_sql(plan.left).alias("left") + right = self.get_sql(plan.right).alias("right") + all_cols = left.c + right.c + all_sch = plan.left.scheme() + plan.right.scheme() + assert len(all_cols) == len(all_sch) + + if isinstance(plan, algebra.CrossProduct): + out_cols = zip(all_cols, all_sch.get_names()) + return select([col.label(name) for col, name in out_cols]) + + elif isinstance(plan, algebra.ProjectingJoin): + cond = self._convert_expr(all_cols, plan.condition, all_sch) + return left.join(right, cond) + + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def get_sql(self, plan): + if isinstance(plan, algebra.ZeroaryOperator): + return self._get_zeroary_sql(plan) + elif isinstance(plan, algebra.UnaryOperator): + return self._get_unary_sql(plan) + elif isinstance(plan, algebra.BinaryOperator): + return self._get_binary_sql(plan) + elif isinstance(plan, algebra.NaryOperator): + return self._get_nary_sql(plan) + raise NotImplementedError("convert {op} to sql".format(op=type(plan))) + + def evaluate(self, plan): + statement = self.get_sql(plan) + print statement + return (tuple(t) for t in self.engine.execute(statement)) \ No newline at end of file diff --git a/raco/language/sql/test_case.py b/raco/language/sql/test_case.py new file mode 100644 index 00000000..2d55538a --- /dev/null +++ b/raco/language/sql/test_case.py @@ -0,0 +1,54 @@ +from collections import Counter +import unittest + +import raco.algebra as algebra +from raco.compile import optimize_by_rules +from raco.language.logical import OptLogicalAlgebra +import raco.myrial.interpreter as interpreter +import raco.myrial.parser as parser +import raco.scheme as scheme +from .catalog import SQLCatalog +import raco.types as types + + +class SQLTestCase(unittest.TestCase): + """A base for testing the compilation of RACO programs to SQL queries""" + + emp_table = [ + # id dept_id name salary + (1, 2, "Bill Howe", 25000), + (2, 1, "Dan Halperin", 90000), + (3, 1, "Andrew Whitaker", 5000), + (4, 2, "Shumo Chu", 5000), + (5, 1, "Victor Almeida", 25000), + (6, 3, "Dan Suciu", 90000), + (7, 1, "Magdalena Balazinska", 25000)] + + emp_schema = scheme.Scheme([("id", types.INT_TYPE), + ("dept_id", types.INT_TYPE), + ("name", types.STRING_TYPE), + ("salary", types.LONG_TYPE)]) + + emp_key = "public:adhoc:employee" + + def setUp(self): + # SQLAlchemy + self.db = SQLCatalog() + self.db.add_table(self.emp_key, self.emp_schema, self.emp_table) + # MyriaL + self.parser = parser.Parser() + self.processor = interpreter.StatementProcessor(self.db) + + def query_to_phys_plan(self, query): + statements = self.parser.parse(query) + self.processor.evaluate(statements) + p = self.processor.get_logical_plan() + p = optimize_by_rules(p, OptLogicalAlgebra.opt_rules()) + if isinstance(p, (algebra.Store, algebra.StoreTemp)): + p = p.input + return p + + def execute(self, query, expected): + p = self.query_to_phys_plan(query) + ans = self.db.evaluate(p) + self.assertEquals(expected, Counter(ans)) \ No newline at end of file diff --git a/raco/language/sql/test_sql.py b/raco/language/sql/test_sql.py new file mode 100644 index 00000000..edbb9537 --- /dev/null +++ b/raco/language/sql/test_sql.py @@ -0,0 +1,93 @@ +from collections import Counter, defaultdict +from raco.language.sql.test_case import SQLTestCase + +import raco.scheme as scheme +import raco.types as types + + +class TestScheme(SQLTestCase): + """Test that we can convert Raco schemes to SQL Alchemy schemes""" + def test_simple_scheme(self): + sch = scheme.Scheme() + sch.addAttribute('w', types.FLOAT_TYPE) + sch.addAttribute('x', types.INT_TYPE) + sch.addAttribute('y', types.LONG_TYPE) + sch.addAttribute('z', types.STRING_TYPE) + + self.db.add_table('simple', sch) + + sch2 = self.db.get_scheme('simple') + self.assertEquals(sch, sch2) + + +class TestQuery(SQLTestCase): + """Test actually compiling plans to SQL.""" + def test_catalog(self): + self.assertEquals(len(self.emp_table), + self.db.num_tuples(self.emp_key)) + + def test_simple_scan(self): + query = """x = scan({emp}); + store(x, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter(self.emp_table) + self.execute(query, expected) + + def test_column_select(self): + query = """x = scan({emp}); + y = [from x emit $2]; + store(y, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((x[2],) for x in self.emp_table) + self.execute(query, expected) + + def test_rename_column_select(self): + query = """x = scan({emp}); + y = [from x emit $0 as a, $0 as b]; + store(y, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((x[0], x[0]) for x in self.emp_table) + self.execute(query, expected) + + def test_count_query(self): + query = """x = countall(scan({emp})); + store(x, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter([(len(self.emp_table),)]) + self.execute(query, expected) + + def test_theta_join_query(self): + query = """x = scan({emp}); + y = scan({emp}); + z = [from x,y where x.salary < y.salary emit x.*]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + expected = Counter((a, b, c, d) + for (a, b, c, d) in self.emp_table + for (_, _, _, d2) in self.emp_table + if d < d2) + self.execute(query, expected) + + def test_join_query(self): + query = """x = scan({emp}); + y = scan({emp}); + z = [from x,y where x.salary = y.salary emit count(*) as cnt]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + size = len([1 + for (_, _, _, d) in self.emp_table + for (_, _, _, d2) in self.emp_table + if d == d2]) + expected = Counter([(size,)]) + self.execute(query, expected) + + def test_complex_agg_query(self): + query = """x = scan({emp}); + z = [from x emit dept_id, max(salary) as max_salary]; + store(z, OUTPUT);""".format(emp=self.emp_key) + + d = defaultdict(int) + for _, dept_id, _, salary in self.emp_table: + d[dept_id] = max(d[dept_id], salary) + expected = Counter(d.items()) + self.execute(query, expected) diff --git a/raco/myrial/interpreter.py b/raco/myrial/interpreter.py index 54c49318..4a44ed28 100644 --- a/raco/myrial/interpreter.py +++ b/raco/myrial/interpreter.py @@ -7,9 +7,10 @@ import raco.expression import raco.catalog import raco.scheme -from raco.language import MyriaLeftDeepTreeAlgebra, MyriaHyperCubeAlgebra -from raco.algebra import LogicalAlgebra -from raco.myrialang import compile_to_json +from raco.language.logical import LogicalAlgebra, OptLogicalAlgebra +from raco.language.myrialang import (MyriaLeftDeepTreeAlgebra, + MyriaHyperCubeAlgebra) +from raco.language.myrialang import compile_to_json from raco.compile import optimize from raco import relation_key diff --git a/raco/myrial/myrial_test.py b/raco/myrial/myrial_test.py index 8aa63016..a4be6a1c 100644 --- a/raco/myrial/myrial_test.py +++ b/raco/myrial/myrial_test.py @@ -1,7 +1,7 @@ import json import unittest -from raco.myrialang import compile_to_json +from raco.language.myrialang import compile_to_json import raco.fakedb import raco.myrial.interpreter as interpreter import raco.myrial.parser as parser diff --git a/raco/myrial/optimizer_tests.py b/raco/myrial/optimizer_tests.py index 1801e874..ec003896 100644 --- a/raco/myrial/optimizer_tests.py +++ b/raco/myrial/optimizer_tests.py @@ -5,16 +5,16 @@ from raco.algebra import * from raco.expression import NamedAttributeRef as AttRef from raco.expression import UnnamedAttributeRef as AttIndex -from raco.myrialang import (MyriaShuffleConsumer, MyriaShuffleProducer, - MyriaHyperShuffleProducer) -from raco.language import MyriaLeftDeepTreeAlgebra -from raco.language import MyriaHyperCubeAlgebra -from raco.algebra import LogicalAlgebra +from raco.language.myrialang import (MyriaShuffleConsumer, + MyriaShuffleProducer, + MyriaHyperShuffleProducer) +from raco.language.myrialang import (MyriaLeftDeepTreeAlgebra, + MyriaHyperCubeAlgebra) +from raco.language.logical import LogicalAlgebra from raco.compile import optimize from raco import relation_key from raco.catalog import FakeCatalog -import raco.expression as expression import raco.scheme as scheme import raco.myrial.myrial_test as myrial_test from raco import types diff --git a/raco/nary_join_rules_test.py b/raco/nary_join_rules_test.py index a49301c0..971645f1 100644 --- a/raco/nary_join_rules_test.py +++ b/raco/nary_join_rules_test.py @@ -1,8 +1,8 @@ -import myrialang from raco import RACompiler import unittest import algebra from raco.catalog import FakeCatalog +from raco.language import myrialang class testNaryJoin(unittest.TestCase): diff --git a/raco/operator_test.py b/raco/operator_test.py index 0bb5db67..6d8ba325 100644 --- a/raco/operator_test.py +++ b/raco/operator_test.py @@ -3,6 +3,7 @@ import raco.fakedb from raco.relation_key import RelationKey +from raco.language.logical import LogicalAlgebra from raco.algebra import * from raco.expression import * import raco.relation_key as relation_key @@ -76,7 +77,8 @@ def test_running_mean_stateful_apply(self): self.assertEqual([x[0] for x in result][-1], 37857) # test whether we can generate json without errors - from myrialang import compile_to_json, MyriaLeftDeepTreeAlgebra + from raco.language.myrialang import (compile_to_json, + MyriaLeftDeepTreeAlgebra) from compile import optimize import json json_string = json.dumps(compile_to_json("", None, optimize(store, MyriaLeftDeepTreeAlgebra(), LogicalAlgebra))) # noqa diff --git a/raco/rules.py b/raco/rules.py index 9c7514f5..e75c7103 100644 --- a/raco/rules.py +++ b/raco/rules.py @@ -1,8 +1,11 @@ from raco import algebra from raco import expression -from expression import UnnamedAttributeRef +from expression import (accessed_columns, UnnamedAttributeRef, + to_unnamed_recursive) from abc import ABCMeta, abstractmethod +import copy +import itertools class Rule(object): @@ -156,3 +159,299 @@ def is_simple_agg_expr(agg): # are mutating the objects it contains when we modify grp_expr or # agg_expr in the above for loops. return expr + + +class RemoveTrivialSequences(Rule): + def fire(self, expr): + if not isinstance(expr, algebra.Sequence): + return expr + + if len(expr.args) == 1: + return expr.args[0] + else: + return expr + + +class SplitSelects(Rule): + """Replace AND clauses with multiple consecutive selects.""" + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + + conjuncs = expression.extract_conjuncs(op.condition) + assert conjuncs # Must be at least 1 + + # Normalize named references to integer indexes + scheme = op.scheme() + conjuncs = [to_unnamed_recursive(c, scheme) + for c in conjuncs] + + op.condition = conjuncs[0] + op.has_been_pushed = False + for conjunc in conjuncs[1:]: + op = algebra.Select(conjunc, op) + op.has_been_pushed = False + return op + + def __str__(self): + return "Select => Select, Select" + + +class PushSelects(Rule): + """Push selections.""" + + @staticmethod + def is_column_equality_comparison(cond): + """Return a tuple of column indexes if the condition is an equality test. + """ + + if (isinstance(cond, expression.EQ) and + isinstance(cond.left, UnnamedAttributeRef) and + isinstance(cond.right, UnnamedAttributeRef)): + return cond.left.position, cond.right.position + else: + return None + + @staticmethod + def descend_tree(op, cond): + """Recursively push a selection condition down a tree of operators. + + :param op: The root of an operator tree + :type op: raco.algebra.Operator + :type cond: The selection condition + :type cond: raco.expression.expression + + :return: A (possibly modified) operator. + """ + + if isinstance(op, algebra.Select): + # Keep pushing; selects are commutative + op.input = PushSelects.descend_tree(op.input, cond) + return op + elif isinstance(op, algebra.CompositeBinaryOperator): + # Joins and cross-products; consider conversion to an equijoin + left_len = len(op.left.scheme()) + accessed = accessed_columns(cond) + in_left = [col < left_len for col in accessed] + if all(in_left): + # Push the select into the left sub-tree. + op.left = PushSelects.descend_tree(op.left, cond) + return op + elif not any(in_left): + # Push into right subtree; rebase column indexes + expression.rebase_expr(cond, left_len) + op.right = PushSelects.descend_tree(op.right, cond) + return op + else: + # Selection includes both children; attempt to create an + # equijoin condition + cols = PushSelects.is_column_equality_comparison(cond) + if cols: + return op.add_equijoin_condition(cols[0], cols[1]) + elif isinstance(op, algebra.Apply): + # Convert accessed to a list from a set to ensure consistent order + accessed = list(accessed_columns(cond)) + accessed_emits = [op.emitters[i][1] for i in accessed] + if all(isinstance(e, expression.AttributeRef) + for e in accessed_emits): + unnamed_emits = [expression.toUnnamed(e, op.input.scheme()) + for e in accessed_emits] + # This condition only touches columns that are copied verbatim + # from the child, so we can push it. + index_map = {a: e.position + for (a, e) in zip(accessed, unnamed_emits)} + expression.reindex_expr(cond, index_map) + op.input = PushSelects.descend_tree(op.input, cond) + return op + elif isinstance(op, algebra.GroupBy): + # Convert accessed to a list from a set to ensure consistent order + accessed = list(accessed_columns(cond)) + if all((a < len(op.grouping_list)) for a in accessed): + accessed_grps = [op.grouping_list[a] for a in accessed] + # This condition only touches columns that are copied verbatim + # from the child (grouping keys), so we can push it. + assert all(isinstance(e, expression.AttributeRef) + for e in op.grouping_list) + unnamed_grps = [expression.toUnnamed(e, op.input.scheme()) + for e in accessed_grps] + index_map = {a: e.position + for (a, e) in zip(accessed, unnamed_grps)} + expression.reindex_expr(cond, index_map) + op.input = PushSelects.descend_tree(op.input, cond) + return op + + # Can't push any more: instantiate the selection + new_op = algebra.Select(cond, op) + new_op.has_been_pushed = True + return new_op + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + if op.has_been_pushed: + return op + + new_op = PushSelects.descend_tree(op.input, op.condition) + + # The new root may also be a select, so fire the rule recursively + return self.fire(new_op) + + def __str__(self): + return ("Select, Cross/Join => Join;" + + " Select, Apply => Apply, Select;" + + " Select, GroupBy => GroupBy, Select") + + +class MergeSelects(Rule): + """Merge consecutive Selects into a single conjunctive selection.""" + + def fire(self, op): + if not isinstance(op, algebra.Select): + return op + + while isinstance(op.input, algebra.Select): + conjunc = expression.AND(op.condition, op.input.condition) + op = algebra.Select(conjunc, op.input.input) + + return op + + def __str__(self): + return "Select, Select => Select" + + +class PushApply(Rule): + """Many Applies in MyriaL are added to select fewer columns from the + input. In some of these cases, we can do less work in the children by + preventing them from producing columns we will then immediately drop. + + Currently, this rule: + - merges consecutive Apply operations into one Apply, possibly dropping + some of the produced columns along the way. + - makes ProjectingJoin only produce columns that are later read. + TODO: drop the Apply if the column-selection pushed into the + ProjectingJoin is everything the Apply was doing. See note below. + """ + + def fire(self, op): + if not isinstance(op, algebra.Apply): + return op + + child = op.input + + if isinstance(child, algebra.Apply): + in_scheme = child.scheme() + child_in_scheme = child.input.scheme() + names, emits = zip(*op.emitters) + emits = [to_unnamed_recursive(e, in_scheme) + for e in emits] + child_emits = [to_unnamed_recursive(e[1], child_in_scheme) + for e in child.emitters] + + def convert(n): + if isinstance(n, expression.UnnamedAttributeRef): + n = child_emits[n.position] + else: + n.apply(convert) + return n + + emits = [convert(copy.deepcopy(e)) for e in emits] + + new_apply = algebra.Apply(emitters=zip(names, emits), + input=child.input) + return self.fire(new_apply) + + elif isinstance(child, algebra.ProjectingJoin): + in_scheme = child.scheme() + names, emits = zip(*op.emitters) + emits = [to_unnamed_recursive(e, in_scheme) + for e in emits] + accessed = sorted(set(itertools.chain(*(accessed_columns(e) + for e in emits)))) + index_map = {a: i for (i, a) in enumerate(accessed)} + child.output_columns = [child.output_columns[i] for i in accessed] + for e in emits: + expression.reindex_expr(e, index_map) + # TODO(dhalperi) we may not need the Apply if all it did was rename + # and/or select certain columns. Figure out these cases and omit + # the Apply + return algebra.Apply(emitters=zip(names, emits), + input=child) + + return op + + def __str__(self): + return 'Push Apply into Apply, ProjectingJoin' + + +class RemoveUnusedColumns(Rule): + """For operators that construct new tuples (e.g., GroupBy or Join), we are + guaranteed that any columns from an input tuple that are ignored (neither + used internally nor to produce the output columns) cannot be used higher + in the query tree. For these cases, this rule will prepend an Apply that + keeps only the referenced columns. The goal is that after this rule, + a subsequent invocation of PushApply will be able to push that + column-selection operation further down the tree.""" + + def fire(self, op): + if isinstance(op, algebra.GroupBy): + child = op.input + child_scheme = child.scheme() + grp_list = [to_unnamed_recursive(g, child_scheme) + for g in op.grouping_list] + agg_list = [to_unnamed_recursive(a, child_scheme) + for a in op.aggregate_list] + agg = [accessed_columns(a) for a in agg_list] + pos = [g.position for g in grp_list] + accessed = sorted(set(itertools.chain(*(agg + [pos])))) + if not accessed: + # Bug #207: COUNTALL() does not access any columns. So if the + # query is just a COUNT(*), we would generate an empty Apply. + # If this happens, just keep the first column of the input. + accessed = [0] + if len(accessed) != len(child_scheme): + emitters = [(None, UnnamedAttributeRef(i)) for i in accessed] + new_apply = algebra.Apply(emitters, child) + index_map = {a: i for (i, a) in enumerate(accessed)} + for agg_expr in itertools.chain(grp_list, agg_list): + expression.reindex_expr(agg_expr, index_map) + op.grouping_list = grp_list + op.aggregate_list = agg_list + op.input = new_apply + return op + elif isinstance(op, algebra.ProjectingJoin): + l_scheme = op.left.scheme() + r_scheme = op.right.scheme() + in_scheme = l_scheme + r_scheme + condition = to_unnamed_recursive(op.condition, in_scheme) + column_list = [to_unnamed_recursive(c, in_scheme) + for c in op.output_columns] + + accessed = (accessed_columns(condition) + | set(c.position for c in op.output_columns)) + if len(accessed) == len(in_scheme): + return op + + accessed = sorted(accessed) + left = [a for a in accessed if a < len(l_scheme)] + if len(left) < len(l_scheme): + emits = [(None, UnnamedAttributeRef(a)) for a in left] + apply = algebra.Apply(emits, op.left) + op.left = apply + right = [a - len(l_scheme) for a in accessed + if a >= len(l_scheme)] + if len(right) < len(r_scheme): + emits = [(None, UnnamedAttributeRef(a)) for a in right] + apply = algebra.Apply(emits, op.right) + op.right = apply + index_map = {a: i for (i, a) in enumerate(accessed)} + expression.reindex_expr(condition, index_map) + [expression.reindex_expr(c, index_map) for c in column_list] + op.condition = condition + op.output_columns = column_list + return op + + return op + + def __str__(self): + return 'Remove unused columns' \ No newline at end of file diff --git a/raco/scheme.py b/raco/scheme.py index 24844d7a..5727e341 100644 --- a/raco/scheme.py +++ b/raco/scheme.py @@ -44,6 +44,10 @@ def get_types(self): """Return a list of the types in this scheme.""" return [_type for name, _type in self.attributes] + def get_names(self): + """Return a list of the names in this scheme.""" + return [name for name, _type in self.attributes] + def typecheck(self, tup): rmap = raco.types.reverse_python_type_map try: diff --git a/setup.py b/setup.py index d2f580fe..405df59d 100644 --- a/setup.py +++ b/setup.py @@ -35,6 +35,6 @@ def find_packages(path=".", base="", exclude=None): url='https://github.com/uwescience/datalogcompiler', packages=find_packages(exclude=['clang']), package_data={'': ['c_templates/*.template','grappa_templates/*.template']}, - install_requires=['networkx', 'ply', 'pyparsing'], + install_requires=['networkx', 'ply', 'pyparsing', 'SQLAlchemy'], scripts=['scripts/myrial'] )