In [3]:
# Xcalar Notebook Connector
# 
# Connects this Jupyter Notebook to the Xcalar Workbook <New Workbook>
#
# To use any data from your Xcalar Workbook, run this snippet before other 
# Xcalar Snippets in your workbook. 
# 
# A best practice is not to edit this cell.
#
# If you wish to use this Jupyter Notebook with a different Xcalar Workbook 
# delete this cell and click CODE SNIPPETS --> Connect to Xcalar Workbook.

%matplotlib inline

# Importing third-party modules to faciliate data work. 
import pandas as pd
import matplotlib.pyplot as plt

# Importing Xcalar packages and modules. 
# For more information, search and post questions on discourse.xcalar.com
from xcalar.external.LegacyApi.XcalarApi import XcalarApi
from xcalar.external.LegacyApi.Session import Session
from xcalar.external.LegacyApi.WorkItem import *
from xcalar.external.LegacyApi.Operators import *
from xcalar.external.LegacyApi.Retina import *

# Create a XcalarApi object
xcalarApi = XcalarApi()
# Connect to current workbook that you are in
workbook = Session(xcalarApi, "admin", "admin", 4135730, True, "TPCH")
xcalarApi.setSession(workbook)

op = Operators(xcalarApi)
ret = Retina(xcalarApi)



In [54]:
sources = [
    'lineitem_partkey#8',
    'lineitem_shipdate#21',
    'lineitem_suppkey#45',
    'lineitem_orderkey#50',
    'orders_orderkey#46',
    'part_brandcontainer#3',
    'part_partkey#23',
    'supplier_suppkey#31',
    'nation#27',
    'partsupp_suppkey_partkey#60'
]

# In[23]:


def runQuery(retinaName, tag):
    graph = ret.getGraph(retinaName)
    for nodeName, node in graph.dag.copy().items():
        if node.operator["operation"] == "XcalarApiSynthesize" or node.operator["operation"] == "XcalarApiExport":
            graph.removeNode(nodeName)
        else:
            node.operator["state"] = "Dropped"

    for nodeName, node in graph.dag.copy().items():
        if node.operator["operation"] != "XcalarApiAggregate":
            node.operator["args"]["dest"] += tag

        if isinstance(node.operator["args"]["source"], list):
            for (ii, source) in enumerate(node.operator["args"]["source"]):
                if source not in sources:
                    node.operator["args"]["source"][ii] += tag
        else:
            if node.operator["args"]["source"] not in sources:
                node.operator["args"]["source"] += tag

    query = graph.toQueryDict()

    xcalarApi.submitQuery(json.dumps(query), "TPCH", "q" + tag)
    xcalarApi.deleteQuery("q" + tag)
    op.dropTable("*" + tag)

In [57]:
from multiprocessing import Pool
import time

def f(x, retinaName, start, runAsQuery):
    execStart = time.time()

    if runAsQuery:
        runQuery(retinaName, x)
    else:
        tmpTable = "temp-" + x
        ret.execute(retinaName, [], tmpTable, tmpTable)
        op.dropTable(tmpTable)

    elapsed = time.time() - start
    execTime = time.time() - execStart
    print("Dataflow time: {0:.2f}, Latency: {1:.2f}".format(execTime, elapsed))

    return elapsed

def runRetina(retinaName, runs = 1, pool_count = 1, runAsQuery = True):
    start = time.time()
    args = [("tt-" + str(i), retinaName, start, runAsQuery) for i in range(runs)]
    p = Pool(pool_count)

    times = p.starmap(f, args)

    print("Avg Latency: {0:.2f}. Total Time: {1:.2f}".format(sum(times) / len(times), time.time() - start))

In [61]:
queries = ["q09", "q14", "q17", "q21"]

for q in queries:
    print("\nRunning: " + q)
    
    if q == "q21":
        runRetina(q, runAsQuery=False)
    else:
        runRetina(q)



Running: q09




Dataflow time: 29.85, Latency: 29.88
Avg Latency: 29.88. Total Time: 29.88

Running: q14




Dataflow time: 2.64, Latency: 2.66
Avg Latency: 2.66. Total Time: 2.66

Running: q17




Dataflow time: 2.40, Latency: 2.42
Avg Latency: 2.42. Total Time: 2.42

Running: q21




Dataflow time: 64.80, Latency: 64.82
Avg Latency: 64.82. Total Time: 64.82


In [41]:
runRetina('q09_1')



Dataflow time: 3.12, Latency: 3.14
Avg Latency: 3.14. Total Time: 3.14
