Skip to content

Commit

Permalink
Merge pull request #413 from uwescience/load_options
Browse files Browse the repository at this point in the history
Adding optional args to load() function.
  • Loading branch information
domoritz committed May 5, 2015
2 parents 3809e64 + ab24b3f commit a96efce
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 15 deletions.
2 changes: 2 additions & 0 deletions examples/load_opts.myl
@@ -0,0 +1,2 @@
t = load("https://s3-us-west-2.amazonaws.com/myria/public-adhoc-TwitterK.csv", csv(schema(column0:int, column1:int), skip=1));
store(t, TwitterK2);
4 changes: 2 additions & 2 deletions examples/standalone.myl
@@ -1,5 +1,5 @@
Emp = load("./examples/emp.csv", id:int, dept_id:int, name:string, salary:int);
Dept = load("./examples/dept.csv", id:int, name:string, manager:int);
Emp = load("./examples/emp.csv", csv(schema(id:int, dept_id:int, name:string, salary:int)));
Dept = load("./examples/dept.csv", csv(schema(id:int, name:string, manager:int)));

out = [from Emp, Dept
where Emp.dept_id == Dept.id AND Emp.salary > 5000
Expand Down
16 changes: 10 additions & 6 deletions raco/algebra.py
Expand Up @@ -1266,15 +1266,17 @@ class FileScan(ZeroaryOperator):

"""Load table data from a file."""

def __init__(self, path=None, _scheme=None):
def __init__(self, path=None, _scheme=None, options={}):
self.path = path
self._scheme = _scheme
self.options = options
ZeroaryOperator.__init__(self)

def __eq__(self, other):
return (ZeroaryOperator.__eq__(self, other)
and self.path == other.path
and self.scheme() == other.scheme())
and self.scheme() == other.scheme()
and self.options == other.options)

def __hash__(self):
return ("%s-%s" % (self.opname(), self.path)).__hash__()
Expand All @@ -1283,9 +1285,11 @@ def shortStr(self):
return "%s(%s)" % (self.opname(), self.path)

def __repr__(self):
return "{op}({path!r}, {sch!r})".format(op=self.opname(),
path=self.path,
sch=self._scheme)
return "{op}({path!r}, {sch!r}, {opt!r})".format(
op=self.opname(),
path=self.path,
sch=self._scheme,
opt=self.options)

def num_tuples(self):
raise NotImplementedError("{op}.num_tuples".format(op=type(self)))
Expand All @@ -1294,7 +1298,7 @@ def copy(self, other):
"""deep copy"""
self.path = other.path
self._scheme = other._scheme

self.options = other.options
ZeroaryOperator.copy(self, other)

def scheme(self):
Expand Down
3 changes: 3 additions & 0 deletions raco/fakedb.py
Expand Up @@ -354,6 +354,9 @@ def scantemp(self, op):
def myriascan(self, op):
return self.scan(op)

def myriafilescan(self, op):
return self.filescan(op)

def myriasink(self, op):
return self.sink(op)

Expand Down
15 changes: 14 additions & 1 deletion raco/language/myrialang.py
Expand Up @@ -174,6 +174,18 @@ def compileme(self):
}


class MyriaFileScan(algebra.FileScan, MyriaOperator):
def compileme(self):
return dict({
"opType": "FileScan",
"source": {
"dataType": "URI",
"uri": self.path,
},
"schema": scheme_to_schema(self.scheme())
}, **self.options)


class MyriaLimit(algebra.Limit, MyriaOperator):
def compileme(self, inputid):
return {
Expand Down Expand Up @@ -1482,6 +1494,7 @@ def fire(self, expr):
rules.OneToOne(algebra.NaryJoin, MyriaLeapFrogJoin),
rules.OneToOne(algebra.Scan, MyriaScan),
rules.OneToOne(algebra.ScanTemp, MyriaScanTemp),
rules.OneToOne(algebra.FileScan, MyriaFileScan),
rules.OneToOne(algebra.SingletonRelation, MyriaSingleton),
rules.OneToOne(algebra.EmptyRelation, MyriaEmptyRelation),
rules.OneToOne(algebra.UnionAll, MyriaUnionAll),
Expand Down Expand Up @@ -1513,6 +1526,7 @@ class MyriaAlgebra(Algebra):
MyriaHyperShuffleConsumer,
MyriaScan,
MyriaScanTemp,
MyriaFileScan,
MyriaEmptyRelation,
MyriaSingleton
)
Expand Down Expand Up @@ -1766,7 +1780,6 @@ def compile_to_json(raw_query, logical_plan, physical_plan,
# raw_query must be a string
if not isinstance(raw_query, basestring):
raise ValueError("raw query must be a string")

return {"rawQuery": raw_query,
"logicalRa": str(logical_plan),
"language": language,
Expand Down
4 changes: 2 additions & 2 deletions raco/myrial/interpreter.py
Expand Up @@ -97,8 +97,8 @@ def scan(self, rel_key):
return raco.algebra.Scan(rel_key, scheme,
self.catalog.num_tuples(rel_key))

def load(self, path, scheme):
return raco.algebra.FileScan(path, scheme)
def load(self, path, scheme, options):
return raco.algebra.FileScan(path, scheme, options)

def table(self, emit_clause):
"""Emit a single-row table literal."""
Expand Down
50 changes: 48 additions & 2 deletions raco/myrial/parser.py
Expand Up @@ -555,8 +555,9 @@ def p_expression_scan(p):

@staticmethod
def p_expression_load(p):
'expression : LOAD LPAREN STRING_LITERAL COMMA column_def_list RPAREN'
p[0] = ('LOAD', p[3], scheme.Scheme(p[5]))
'expression : LOAD LPAREN STRING_LITERAL COMMA file_parser_fun RPAREN'
schema, options = p[5]
p[0] = ('LOAD', p[3], scheme.Scheme(schema), options)

@staticmethod
def p_relation_key(p):
Expand All @@ -581,6 +582,51 @@ def p_column_def(p):
'column_def : unreserved_id COLON type_name'
p[0] = (p[1], p[3])

@staticmethod
def p_schema_fun(p):
'schema_fun : SCHEMA LPAREN column_def_list RPAREN'
p[0] = p[3]

@staticmethod
def p_file_parser_fun(p):
"""file_parser_fun : file_parser_type LPAREN \
schema_fun COMMA option_list RPAREN
| file_parser_type LPAREN schema_fun RPAREN"""
if len(p) == 7:
schema, options = (p[3], p[5])
else:
schema, options = (p[3], {})
p[0] = (schema, options)

@staticmethod
def p_file_parser_type(p):
'file_parser_type : CSV'
p[0] = p[1]

@staticmethod
def p_option_list(p):
"""option_list : option_list COMMA option
| option"""
if len(p) == 4:
opts = p[1] + [p[3]]
else:
opts = [p[1]]
p[0] = dict(opts)

@staticmethod
def p_option(p):
'option : unreserved_id EQUALS literal_arg'
p[0] = (p[1], p[3])

@staticmethod
def p_literal_arg(p):
"""literal_arg : STRING_LITERAL
| INTEGER_LITERAL
| FLOAT_LITERAL
| TRUE
| FALSE"""
p[0] = p[1]

@staticmethod
def p_type_name(p):
"""type_name : STRING
Expand Down
3 changes: 2 additions & 1 deletion raco/myrial/scanner.py
Expand Up @@ -6,7 +6,8 @@
import raco.myrial.exceptions

keywords = ['WHILE', 'DO', 'DEF', 'APPLY', 'CASE', 'WHEN', 'THEN',
'ELSE', 'END', 'CONST', 'LOAD', 'DUMP', 'UDA', 'TRUE', 'FALSE']
'ELSE', 'END', 'CONST', 'LOAD', 'DUMP', 'CSV', 'SCHEMA',
'UDA', 'TRUE', 'FALSE']

types = ['INT', 'STRING', 'FLOAT', 'BOOLEAN']

Expand Down
3 changes: 2 additions & 1 deletion raco/test_style.py
Expand Up @@ -21,7 +21,8 @@ class StyleTest(unittest.TestCase):

def test_style(self):
"run flake8 with the right arguments and ensure all files pass"
check_output_and_print_stderr(['flake8', '--ignore=F', 'raco'])
check_output_and_print_stderr([
'flake8', '--ignore=F', '--exclude=parsetab.py', 'raco'])

def test_pylint(self):
"run pylint -E to catch obvious errors"
Expand Down

0 comments on commit a96efce

Please sign in to comment.