## Imports and Creating a Cluster, attaching it to client

In [17]:
from blazingsql import BlazingContext
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster(dashboard_address='0.0.0.0:8001') ## networks the GPUs togther to work together
client = Client(cluster) # attached the GPU cluster, to Client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34953 instead
  http_address["port"], self.http_server.port


## Creating Blazing Context with Dask Client

In [18]:
# create blazing context, and passing the Client to Blazing Context
bc = BlazingContext(dask_client=client) 


BlazingContext ready


In [19]:
bc.create_table('table1', '/root/code/*.csv') ## Creating Table

## Making the query String
q= '''
SELECT * FROM table1
WHERE a=1
'''

## Running the Query String
res = bc.sql(q)
res

Unnamed: 0_level_0,a,b,c,d
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,float64,float64,float64
,...,...,...,...


In [20]:
type(res)

dask_cudf.core.DataFrame

In [21]:
res.compute()

Unnamed: 0,a,b,c,d
0,1,0.728617,-0.846392,0.176448
1,1,0.194267,0.574008,0.308305
2,1,-0.020245,-1.046001,0.267910
3,1,1.585304,-2.608947,0.716433
4,1,1.584202,1.665679,-0.866326
...,...,...,...,...
9997294,1,0.698527,-1.141175,2.048563
9997295,1,0.637495,-0.753099,-0.170067
9997296,1,0.035239,0.102899,-1.461216
9997297,1,0.283887,-0.484700,1.994155


## Tuning The Blazing Context

In [22]:
config = {'JOIN_PARTITION_SIZE_THRESHOLD': 585640000,
 'MAX_JOIN_SCATTER_MEM_OVERHEAD': 500000000,
 'MAX_NUM_ORDER_BY_PARTITIONS_PER_NODE': 8,
 'NUM_BYTES_PER_ORDER_BY_PARTITION': 400000000,
 'TABLE_SCAN_KERNEL_NUM_THREADS': 2,
 'MAX_CONCAT_CACHE_BYTE_SIZE': 400000000,
 'ORDER_BY_SAMPLES_RATIO': 0.1,
 'BLAZING_DEVICE_MEM_RESOURCE_CONSUMPTION_THRESHOLD': 0.95,
 "FLOW_CONTROL_BYTES_THRESHOLD'": 600000000.0}

bc = BlazingContext(dask_client=client, config_options=config) 

BlazingContext ready


## Return Futures

In [23]:
# create blazing context, and passing the Client to Blazing Context
bc = BlazingContext(dask_client=Client(cluster)) 

bc.create_table('table0', '/root/code/*.csv') ## Creating Table

## Making the query String
q= f'''
    SELECT * FROM table0
    WHERE a=1
'''
## Running the Query String
## return_futures is set to True, bc.sql returns the future, becoming a non blocking call.
res = bc.sql(q, return_futures=True)
res

BlazingContext ready


[<Future: pending, key: executeGraph-3f299cec-be23-44c5-9186-bf966ac8f272>]

## Single GPU


In [24]:
from blazingsql import BlazingContext

# create blazing context, and passing the Client to Blazing Context
bc = BlazingContext(dask_client=Client(cluster)) 

bc.create_table('table0', '/root/code/0.csv') ## Creating Table
bc.create_table('table1', '/root/code/1.csv') ## Creating Table

res=[]
for i in range(2):
    ## Making the query String
    q= f'''
        SELECT * FROM table{i}
        WHERE a=1
    '''
    ## Running the Query String
    ## runs non blocking on a single GPU.
    res += bc.sql(q, single_gpu=True, return_futures=True)
res

BlazingContext ready


[<Future: finished, type: builtins.tuple, key: executeGraph-b277d8a1-60a6-4341-953d-99987fc81d32>,
 <Future: pending, key: executeGraph-a66f049f-b131-4225-9521-e5e357ba31b5>]

In [25]:
import dask
def get_element(query_partid):
    worker = dask.distributed.get_worker()
    df = worker.query_parts[query_partid]
    del  worker.query_parts[query_partid]
    return df

def convert_to_df(futures):
    res = bc.dask_client.gather(futures)
    futures = []
    for query_partids, meta, worker_id in res:
        for query_partid in query_partids:
            futures.append(bc.dask_client.submit(get_element, query_partid, workers=[worker_id]))
    result = dask.dataframe.from_delayed(futures, meta=meta)
    return result

df = convert_to_df(res)
df

Unnamed: 0_level_0,a,b,c,d
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,int64,float64,float64,float64
,...,...,...,...
,...,...,...,...


In [26]:
df.compute()

Unnamed: 0,a,b,c,d
0,1,0.728617,-0.846392,0.176448
1,1,0.194267,0.574008,0.308305
2,1,-0.020245,-1.046001,0.267910
3,1,1.585304,-2.608947,0.716433
4,1,1.584202,1.665679,-0.866326
...,...,...,...,...
4997438,1,0.698527,-1.141175,2.048563
4997439,1,0.637495,-0.753099,-0.170067
4997440,1,0.035239,0.102899,-1.461216
4997441,1,0.283887,-0.484700,1.994155
