Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobin Baker committed Mar 8, 2017
1 parent 09bf52c commit 1613e3d
Showing 1 changed file with 49 additions and 47 deletions.
96 changes: 49 additions & 47 deletions integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def test(self):
emptyrelation = load('{}', csv(schema(foo:string, bar:int)));
store(emptyrelation, emptyrelation);
""".format(self.get_file_url('testdata/filescan/empty.txt'))
print program
expected = []
query = MyriaQuery.submit(program)
self.assertEqual(query.status, 'SUCCESS')
Expand Down Expand Up @@ -206,14 +205,14 @@ def get_join_circle_program(self, A0, B0, C0):
""" % (A0, B0, C0)

def get_join_chain_expected(self, R, A0, B0, C0):
A = Set(A0)
B = Set(B0)
C = Set(C0)
A = Set(A0)
B = Set(B0)
C = Set(C0)
while True:
change = False
for i in range(self.MAXN):
for j in range(self.MAXN):
for k in range(self.MAXN):
for i in range(self.MAXN):
for j in range(self.MAXN):
for k in range(self.MAXN):
if (j, i) in R and (i, k) in A and (j, k) not in A:
change = True
A.add((j, k))
Expand All @@ -223,18 +222,19 @@ def get_join_chain_expected(self, R, A0, B0, C0):
if (j, i) in B and (i, k) in C and (j, k) not in C:
change = True
C.add((j, k))
if not change: break
return [{'src':x, 'dst':y} for x, y in C]
if not change:
break
return [{'src': x, 'dst': y} for x, y in C]

def get_join_circle_expected(self, A0, B0, C0):
A = Set(A0)
B = Set(B0)
C = Set(C0)
A = Set(A0)
B = Set(B0)
C = Set(C0)
while True:
change = False
for i in range(self.MAXN):
for j in range(self.MAXN):
for k in range(self.MAXN):
for i in range(self.MAXN):
for j in range(self.MAXN):
for k in range(self.MAXN):
if (j, i) in C and (i, k) in A and (j, k) not in A:
change = True
A.add((j, k))
Expand All @@ -244,7 +244,8 @@ def get_join_circle_expected(self, A0, B0, C0):
if (j, i) in B and (i, k) in C and (j, k) not in C:
change = True
C.add((j, k))
if not change: break
if not change:
break
return [{'src':x, 'dst':y} for x, y in C]

def genData(self, fin):
Expand All @@ -258,7 +259,10 @@ def genData(self, fin):
return edges

def test(self):
with NamedTemporaryFile(suffix='.csv') as R, NamedTemporaryFile(suffix='.csv') as A0, NamedTemporaryFile(suffix='.csv') as B0, NamedTemporaryFile(suffix='.csv') as C0:
with NamedTemporaryFile(suffix='.csv') as R, \
NamedTemporaryFile(suffix='.csv') as A0, \
NamedTemporaryFile(suffix='.csv') as B0, \
NamedTemporaryFile(suffix='.csv') as C0:
R_data = self.genData(R)
A0_data = self.genData(A0)
B0_data = self.genData(B0)
Expand All @@ -271,7 +275,6 @@ def test(self):
self.assertEqual(query.status, 'SUCCESS')
results = MyriaRelation('public:adhoc:joinCircle').to_dict()
self.assertListOfDictsEqual(results, self.get_join_circle_expected(A0_data, B0_data, C0_data))



class IncorrectDelimiterTest(MyriaTestBase):
Expand Down Expand Up @@ -313,7 +316,7 @@ def test(self):

# delete relation and check the catalog
relation.delete()
self.assertRaises(MyriaError,self.connection.dataset,relation.qualified_name)
self.assertRaises(MyriaError, self.connection.dataset, relation.qualified_name)


class RoundRobinAggregateTest(MyriaTestBase):
Expand All @@ -329,40 +332,39 @@ def test(self):
join_query = MyriaQuery.submit_plan(join_json).wait_for_completion()
self.assertEqual(join_query.status, 'SUCCESS')


class BroadcastTest(MyriaTestBase):
def test(self):
twitterData = self.get_file_url('testdata/twitter/TwitterK.csv')
loadData="""
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t emit *];
store(T2, twitterOriginal);
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t where t.a = 17 emit *];
store(T2, twitterSubsetNotBroadcast);
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t where t.a = 17 emit *];
store(T2, twitterSubsetBroadcast, broadcast());
""".format(twitterData, twitterData, twitterData)

MyriaQuery.submit(loadData)

twitterData = self.get_file_url('testdata/twitter/TwitterK.csv')
loadData = """
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t emit *];
store(T2, twitterOriginal);
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t where t.a = 17 emit *];
store(T2, twitterSubsetNotBroadcast);
T1 = load('{}',csv(schema(a:int,b:int)));
T2 = [from T1 as t where t.a = 17 emit *];
store(T2, twitterSubsetBroadcast, broadcast());
""".format(twitterData, twitterData, twitterData)
MyriaQuery.submit(loadData)
notBroadcastJoin = """
T1 = [from scan(twitterOriginal) as t1, scan(twitterSubsetNotBroadcast) as t2
where t1.a = t2.a emit *];
store(T1, finalNotBroadcast);
"""
T1 = [from scan(twitterOriginal) as t1, scan(twitterSubsetNotBroadcast) as t2
where t1.a = t2.a emit *];
store(T1, finalNotBroadcast);
"""
originalResult = MyriaQuery.submit(notBroadcastJoin)

broadcastJoin = """
T1 = [from scan(twitterOriginal) as t1, scan(twitterSubsetBroadcast) as t2
where t1.a = t2.a emit *];
store(T1, finalBroadcast);
"""
broadcastJoin = """
T1 = [from scan(twitterOriginal) as t1, scan(twitterSubsetBroadcast) as t2
where t1.a = t2.a emit *];
store(T1, finalBroadcast);
"""
broadcastResult = MyriaQuery.submit(broadcastJoin)
self.assertListOfDictsEqual(
originalResult.to_dict(), broadcastResult.to_dict())

self.assertListOfDictsEqual(originalResult.to_dict(), broadcastResult.to_dict())

if __name__ == '__main__':
unittest.main()

0 comments on commit 1613e3d

Please sign in to comment.