In [15]:
from myria import *

# Myria IPython Extensions

## 1. Loading the Extension

Requires installing the experimental `ipython-myria` branch of the [`Myria-Python` repository](https://github.com/brandonhaynes/myria-python)

In [16]:
%load_ext myria

The myria extension is already loaded. To reload it, use:
  %reload_ext myria


## 2. Configuration Options

An ambient connection is maintained to the Myria API.  The connection parameters are initially drawn from configuration, but may be set explicitly as well (see `%connect` below).

Most users will probably either specify a connection URI in the initial query, or put it in a configuration file.  

Here we view the supported set of Myria configuration options:

In [17]:
%config MyriaExtension

MyriaExtension options
--------------------
MyriaExtension.execution_url=<Unicode>
    Current: u'https://demo.myria.cs.washington.edu'
    Myria web API endpoint URL
MyriaExtension.language=<Unicode>
    Current: u'MyriaL'
    Language for Myria queries
MyriaExtension.rest_url=<Unicode>
    Current: u'https://rest.myria.cs.washington.edu:1776'
    Myria REST API endpoint URL
MyriaExtension.timeout=<Int>
    Current: 60
    Query timeout (in seconds)


Configuration may also be updated directly within the notebook:

In [18]:
# Update the default query language to be MyriaL 
# (it's already the current language, so this is really a no-op)
%config MyriaExtension.language = 'MyriaL'

## 3. Ambient Connection to Myria

The extension uses the default connection to Myria maintained at `MyriaRelation.DefaultConnection`.  If not already specified in IPython configuration, connection details may be specified explicitly.

View `connect` arguments:

In [19]:
%connect?

Connect to a local server, implicitly assuming that the Myria-Web endpoint is 

In [20]:
%connect http://localhost:8753

<myria.connection.MyriaConnection at 0x10c664210>

In [21]:
_._url_start, _.execution_url

('http://localhost:8753', 'http://localhost')

Connect to the production Myria server, which has separate REST and web URLs:

In [22]:
%connect https://rest.myria.cs.washington.edu:1776 https://myria-web.appspot.com

<myria.connection.MyriaConnection at 0x10b9554d0>

Using the ambient connection:

In [23]:
%%query
T1 = scan(TwitterK);
T2 = [from T1 where $1 = 500 emit $0 as x];
store(T2, Just500);

Unnamed: 0,x
0,499
1,498


## 4. Language Selection

An ambient language is maintained with an initial value drawn from configuration (`%config MyriaExtension.language`).  Convenience functions are available for MyriaL, Datalog, and SQL.

Using the "current" language:

In [24]:
%config MyriaExtension.language

u'MyriaL'

In [25]:
%%query
T1 = scan(TwitterK);
T2 = [from T1 where $1 = 500 emit $0 as x];
store(T2, Just500);

Unnamed: 0,x
0,499
1,498


### Datalog

In [26]:
%datalog Just500(column0) :- TwitterK(column0, 500)

Unnamed: 0,column0
0,498
1,499


### MyriaL

In [27]:
%%myrial
T1 = scan(TwitterK);
T2 = [from T1 where $0 = 500 emit $0 as x];
store(T2, Just500);

Unnamed: 0,x
0,500
1,500


### SQL

In [28]:
%%sql
JustX = SELECT $0 AS x FROM SCAN(TwitterK) AS Twitter WHERE $1 = 500;
store(JustX, public:adhoc:JustX);

Unnamed: 0,x
0,499
1,498


## 5. Visualizing Relations & Queries

The new `MyriaRelation` and `MyriaQuery` classes support a `to_dataframe` method when Pandas is available.  This is leveraged within IPython to visualize these entities:

### Relations

In [29]:
MyriaRelation('Just500')

Unnamed: 0,x
0,500
1,500


### Queries

In [30]:
%%myrial
OppData = scan(all_opp_v3);
VctData = scan(all_vct);

OppWithPop = select opp.*, vct.pop
             from OppData as opp,
                  VctData as vct
             where opp.Cruise = vct.Cruise
               and opp.Day = vct.Day
               and opp.File_Id = vct.File_Id
               and opp.Cell_Id = vct.Cell_Id;

PlanktonCount = select Cruise, count(*) as Phytoplankton
                from OppWithPop
                where pop != "beads" and pop != "noise"
                  and fsc_small > 10000;

store(PlanktonCount, public:demo:PlanktonCount);

Unnamed: 0,Cruise,Phytoplankton
0,Tokyo_2,1824845
1,Tokyo_3,6088514
2,Tokyo_4,1353062
3,Tokyo_1,316049


### Accessing Executed Queries in Python

In [31]:
_.to_dataframe()

Unnamed: 0,Cruise,Phytoplankton
0,Tokyo_4,1353062
1,Tokyo_2,1824845
2,Tokyo_3,6088514
3,Tokyo_1,316049


In [32]:
__.query_id, __.status

(73517, u'SUCCESS')

### Single-Line queries may be treated like Python expressions

In [33]:
query = %datalog Just500(column0) :- TwitterK(column0, 500)%
query.status, query.to_dict()

(u'SUCCESS', [{u'column0': 499}, {u'column0': 498}])

## 6. Variable Binding

The IPython extension supports simple variable binding, where the current environment may be referenced from within a query.  (Currently, only simple Python identifiers are replaced.  Boo!)

In [34]:
low, high, destination = 543, 550, 'BoundRelation'

Note that the tokens `:low`, `:high`, and `:destination` are replaced based on values drawn from the current environment:

In [35]:
%%myrial
T1 = scan(TwitterK);
T2 = [from T1 where $0 > @low and $0 < @high emit $1 as x];
store(T2, @destination);

Unnamed: 0,x
0,989
1,16
2,21
3,20
4,53
5,610


## 7. Plans and Operators

You can use %plan magic to compile a plan without immediatley executing it.  This alllows for postprocessing and advanced JSON manipulation (e.g., shuffle removal, loop unrolling, join substitution). 

In [36]:
%%plan 
T1 = scan(TwitterK);
T2 = [from T1, T1 as x emit x.$0 as x];
store(T2, JustX);

{u'language': u'MyriaL',
 u'logicalRa': u'Store(public:adhoc:JustX)[Apply(x=$2)[CrossProduct[Scan(public:adhoc:TwitterK),Scan(public:adhoc:TwitterK)]]]',
 u'plan': {u'fragments': [{u'operators': [{u'argOperatorId': 1,
      u'opId': 0,
      u'opName': u'MyriaSplitConsumer',
      u'opType': u'LocalMultiwayConsumer'},
     {u'argChild': 0,
      u'emitExpressions': [{u'outputName': u'x',
        u'rootExpressionOperator': {u'columnIdx': 2, u'type': u'VARIABLE'}}],
      u'opId': 2,
      u'opName': u'MyriaApply(x=$2)',
      u'opType': u'Apply'},
     {u'argChild': 2,
      u'argOverwriteTable': True,
      u'opId': 3,
      u'opName': u'MyriaStore(public:adhoc:JustX)',
      u'opType': u'DbInsert',
      u'relationKey': {u'programName': u'adhoc',
       u'relationName': u'JustX',
       u'userName': u'public'}}]},
   {u'operators': [{u'argOperatorId': 5,
      u'opId': 4,
      u'opName': u'MyriaBroadcastConsumer',
      u'opType': u'BroadcastConsumer'},
     {u'opId': 6,
      u'opNa

The plan object exposes the obvious properties:

In [37]:
plan = _
print '%s (%s) ' % (plan.type, plan.language)
print '#Subplans:  %d' % len(list(plan.subplans))
print '#Fragments: %d' % len(list(plan.fragments))
print '#Operators: %d' % len(list(plan.operators))

AttributeError: 'dict' object has no attribute 'type'

There are also convenience methods to obtain references to commonly-sought operators:

In [None]:
print '#Shuffles:  %d' % len(list(plan.shuffles))
print '#Loops:     %d' % len(list(plan.loops))

Plan entities are also queryable via JMESPath:

In [None]:
plan.search('plan.fragments[].operators[?contains(opType, `Insert`)]')

### Fragments

Various properties are also exposed at the fragment level:

In [38]:
fragment = list(plan.fragments)[-1]
fragment

AttributeError: 'dict' object has no attribute 'fragments'

In [39]:
print 'Workers:    %s' % fragment.workers
print '#Operators: %d' % len(list(fragment.operators))
print '#Shuffles:  %d' % len(list(fragment.shuffles))

NameError: name 'fragment' is not defined

### Operators

Finally, you can do the same sorts of things with the operators themselves:

In [40]:
operator = fragment.operators.next()
operator

NameError: name 'fragment' is not defined

In [41]:
print 'id:   %d' % operator.id
print 'name: %s' % operator.name
print 'type: %s' % operator.type

NameError: name 'operator' is not defined

Additional metadata are accessible via indexing:

In [42]:
operator['relationKey']['userName'] = 'Brandon'

for pair in operator.items():
    print pair

NameError: name 'operator' is not defined

For debugging and comprehensibility purposes, you can also view the local neighborhood of a given operator.

In [43]:
operator.neighborhood()

NameError: name 'operator' is not defined

In [44]:
print operator.parent.name
operator.parent.neighborhood()

NameError: name 'operator' is not defined

### Working with JSON

You can also continue to work with the original JSON, once you've identified the relevant entities:

In [45]:
import json
print 'Plan JSON:     %s ...' % json.dumps(plan.data)[:50]
print 'Fragment JSON: %s ...' % json.dumps(plan.fragments.next().data)[:50]
print 'Operator JSON  %s ...' % json.dumps(plan.operators.next().data)[:50]

AttributeError: 'dict' object has no attribute 'data'

### Advanced Plan Manipulation

Several high-level plan manipulation operators are exposed:

#### Shuffle Removal

We can remove a shuffle from a plan, and stitch the resulting plan back together.

First, create a plan with a shufle:

In [46]:
%%plan
T1 = scan(TwitterK);
T2 = [from T1 emit $0, count($1)];
store(T2, Groups);

{u'language': u'MyriaL',
 u'logicalRa': u'Store(public:adhoc:Groups)[Apply(column0=$0,_COLUMN1_=$1)[GroupBy($0; COUNT($1))[Scan(public:adhoc:TwitterK)]]]',
 u'plan': {u'fragments': [{u'operators': [{u'argOperatorId': 1,
      u'opId': 0,
      u'opName': u'MyriaSplitConsumer',
      u'opType': u'LocalMultiwayConsumer'},
     {u'argChild': 0,
      u'argOverwriteTable': True,
      u'opId': 2,
      u'opName': u'MyriaStore(public:adhoc:Groups)',
      u'opType': u'DbInsert',
      u'relationKey': {u'programName': u'adhoc',
       u'relationName': u'Groups',
       u'userName': u'public'}}]},
   {u'operators': [{u'argOperatorId': 4,
      u'opId': 3,
      u'opName': u'MyriaShuffleConsumer',
      u'opType': u'ShuffleConsumer'},
     {u'aggregators': [{u'aggOps': [u'SUM'],
        u'column': 1,
        u'type': u'SingleColumn'}],
      u'argChild': 3,
      u'argGroupField': 0,
      u'opId': 5,
      u'opName': u'MyriaGroupBy($0; SUM($1))',
      u'opType': u'SingleGroupByAggregate'},
 

Let's view the neighborhood of the shuffle to make sure we've identified the correct operator:

In [47]:
shuffle_plan = _
shuffle = shuffle_plan.shuffles.next()
shuffle.neighborhood()

AttributeError: 'dict' object has no attribute 'shuffles'

If we know that each of the $0 values are worker-local, we can locally aggregate and avoid the cost of shuffling.  The following plan captures this assumption:

In [48]:
plans.remove_shuffle(shuffle)
shuffle_plan

AttributeError: 'module' object has no attribute 'remove_shuffle'

In [49]:
# Since our groups weren't _actually_ worker local, this answer will differ from the shuffled version :)
shuffle_plan.execute().to_dataframe()

AttributeError: 'dict' object has no attribute 'execute'

## 8. Profiling

Some initial, tentative support for profiling is available via `%profile` magic.  This probably requires a few tweaks to `Myria-Web` for optimal UX.

In [50]:
%connect http://localhost:8753 http://localhost:8080

<myria.connection.MyriaConnection at 0x10c1e4990>

In [51]:
%profile MyriaQuery(28)