## Prepare training table 

#### import the library

In [1]:
import os
import numpy as np
import gpudb
import collections

### define input type

In [2]:
host_ip="p4.rewreu.org"

input_type = """
    {
       "type": "record",
       "name": "input_type",
       "fields": [{"name":"y","type":"float"},
       ]
    }  """.replace(' ', '').replace('\n', '')
for i in range(10):
    l="""{"name":"x"""+str(i)+"""","type":"float"},""" if i<10-1 else """{"name":"x"""+str(i)+"""","type":"float"}"""
    input_type=input_type[:-2]+l+input_type[-2:]
    

### Create type

In [3]:
h_db = gpudb.GPUdb(encoding='BINARY', host=host_ip, port="9191")
response = h_db.create_type(type_definition=input_type, label="X", properties={})

### Create table

In [4]:
if h_db.has_table(table_name="X")['table_exists']:
    h_db.clear_table(table_name="X")
response = h_db.create_table(table_name="X", type_id=response['type_id'],
                             options={"collection_name": "LinearReg"})

### Injest data

In [5]:
data_pack_size=10
Input_table="X"
x=np.random.random([1000,10])
Y=x.sum(axis=1)+np.random.random([1000])*3
i = 0
encoded_obj_list = []
# for data in 
for data, label in zip(x, Y):
    datum = collections.OrderedDict()
    datum["y"]=label
    for k in range(10):
        datum["x"+str(k)]=data[k]
    encoded_obj_list.append(h_db.encode_datum(input_type, datum))
    i = i + 1
    if i % data_pack_size == 0:
        response = h_db.insert_records(table_name=Input_table, data=encoded_obj_list, list_encoding='binary',
                                       options={})  # the list_encoding and options can be omitted
        encoded_obj_list = []
        pct=float(i)/float(x.shape[0])*100.0
        if pct==int(pct) and pct%20 ==0:
            print(str(pct)+" % loaded")
if encoded_obj_list != []:
    response = h_db.insert_records(table_name=Input_table, data=encoded_obj_list, list_encoding='binary', options={})

response = h_db.insert_records(table_name=Input_table, data=encoded_obj_list,
                               list_encoding='binary', options={})  # the list_encoding and options can be omitted
print("%s table has %d number of record inserted"%(Input_table,i*data_pack_size))

20.0 % loaded
40.0 % loaded
60.0 % loaded
80.0 % loaded
100.0 % loaded
X table has 10000 number of record inserted


In [6]:
%%writefile trainingReg.py
import gpudb
import collections
import numpy as np
import pandas as pd
from datetime import datetime
import os
from sklearn.linear_model import LinearRegression
import random
from kineticaIO import kineticaIO
import pickle
from kinetica_proc import ProcData

# modify according to environment
HOSTIP = '127.0.0.1'  # trainning on the head node, no need to change
in_table_name = "X"
train_data_size = 10000  # random sampling data, for Kmeans clustering, there is no batch feed, so to avoid crashing, randomly 


# choose a smaller subset of data to train. 

def run():
    proc_data = ProcData()
    h_db = gpudb.GPUdb(encoding='BINARY', host=HOSTIP, port=9191)
    mydf1 = pd.DataFrame()
    response = h_db.show_table(table_name=in_table_name, options={"get_sizes": "true"})
    size = response['sizes'][0]
    batch_size = 100
    for i in range(train_data_size // batch_size):
        offset = random.randint(0, size)
        response = h_db.get_records(table_name='X', offset=offset, limit=batch_size)
        res_decoded = gpudb.GPUdbRecord.decode_binary_data(response["type_schema"], response["records_binary"])
        mydf1 = mydf1.append(pd.DataFrame(res_decoded))
    y = mydf1["y"]
    x = mydf1.drop(['y'], axis=1)
    lr = LinearRegression()
    lr.fit(X=x, y=y)
    # save model to database
    s = pickle.dumps(lr)
    kio = kineticaIO(h_db)
    kio.Model2Kinetica(pbfile=s, ModelName="LinearReg_Model", Loss=-99, COLLECTION="MASTER")
    proc_data.complete()


if __name__ == "__main__":
    run()


Overwriting trainingReg.py


In [15]:
import collections
import time
import gpudb
from datetime import datetime, timedelta

host_ip = "p4.rewreu.org"  # dev machine
PORT = '9191'
proc_name = 'lr_train'
filelist = ['trainingReg.py', 'kineticaIO.py']


# Register UDF
def Register_UDF(h_db, proc_name=proc_name, filelist=filelist):
    files = {}
    for ifile in filelist:
        with open(ifile, 'rb') as f:
            files[ifile.split("/")[-1]] = f.read()
    if h_db.has_proc(proc_name)['proc_exists']:
        h_db.delete_proc(proc_name)
    print("Registering proc...")
    response = h_db.create_proc(proc_name, 'nondistributed', files, 'python', [filelist[0]], {})


# Execute UDF
def execute_UDF(h_db, proc_name=proc_name):
    print("Executing proc...")
    response = h_db.execute_proc(proc_name, {}, {}, [], {}, [], {})
    return response


if __name__ == "__main__":
    h_db = gpudb.GPUdb(encoding='BINARY', host=host_ip, port=PORT)
    Register_UDF(h_db)
    response = execute_UDF(h_db)
    start_time = datetime.now()
    if response['status_info']['status'] == 'OK':
        run_id = response['run_id']
        print('Proc was launched successfully with run_id: ' + run_id)
        while h_db.show_proc_status(run_id)['overall_statuses'][run_id] == 'running':
            time.sleep(1)
            print('process is running... ')
        final_proc_state = h_db.show_proc_status(run_id)['overall_statuses'][run_id]
        print("total running time is ", datetime.now() - start_time)
        print('Final Proc state: ' + final_proc_state)
        if final_proc_state == 'error':
            raise RuntimeError('proc error')
    else:
        print('Error launching proc; response: ')
        print(response)
        raise RuntimeError('proc error')


Registering proc...
Executing proc...
Proc was launched successfully with run_id: 6
process is running... 
process is running... 
process is running... 
process is running... 
process is running... 
process is running... 
process is running... 
('total running time is ', datetime.timedelta(0, 7, 241556))
Final Proc state: complete


## model table

In [7]:
%%bash 
./kisql/kisql -h p4.rewreu.org -sql \
"""
SELECT
    model_id, Data_Time_created
FROM TFmodel
where model = 'LinearReg_Model'
"""

Connection successful
Catalog [KINETICA]
Time 0.181
+----------------------------------------+------------------------------+
| model_id                               |            Data_Time_created |
+----------------------------------------+------------------------------+
| 49c835e4-8609-11e8-bc7f-984be16d139a   |   2018-07-12 19:25:05.000000 |
+----------------------------------------+------------------------------+
Rows read = 1
Exec time 0.055 Fetch time 0.025


## Write inference code to local file
<font color='red'>modify the model_id according to the table above</font>

In [8]:
%%writefile infer_Reg.py
import gpudb
import collections
import numpy as np
import pandas as pd
from datetime import datetime
import os
from sklearn.linear_model import LinearRegression
import random
from kineticaIO import kineticaIO
import pickle
from kinetica_proc import ProcData

Model_id="49c835e4-8609-11e8-bc7f-984be16d139a"
headnode='127.0.0.1' # trainning on the head node

cols=["x"+str(i) for i in range(10)]
cols.append("y")
def run():
    proc_data = ProcData()
    
    KIO=kineticaIO(gpudb.GPUdb(encoding = 'BINARY', host = headnode, port = 9191))
    picklebytes=KIO.SkModel_from_Kinetica(Model_id)
    model=pickle.loads(picklebytes)
    in_table, out_table = proc_data.input_data[0], proc_data.output_data[0]
    out_table.size = in_table.size
    for col in cols:
        out_table[col][:]=in_table[col][:]
    x=np.array(in_table[:][:]).T[:,1:11]
    predict=model.predict(x)
    out_table["predict"][:]=predict

    proc_data.complete()
if __name__=="__main__":
    run()

Overwriting infer_Reg.py


## Register and run the inference

In [9]:
import collections
import time
import gpudb
from datetime import datetime, timedelta
host_ip="p4.rewreu.org" # dev machine
PORT = '9191'
proc_name = 'LR_infer'
filelist=['infer_Reg.py','kineticaIO.py']
OUTPUT_TABLE="LR_output"

# Register UDF
def Register_UDF(h_db,proc_name=proc_name, filelist=filelist):
    files = {}
    for ifile in filelist:
        with open(ifile, 'rb') as f:
            files[ifile] = f.read()
    if h_db.has_proc(proc_name)['proc_exists']:
        h_db.delete_proc(proc_name)
    print("Registering proc...")
    response = h_db.create_proc(proc_name, 'distributed', files, 'python', [filelist[0]], {})
    print(response)

# Execute UDF
def execute_UDF(h_db,proc_name=proc_name):
    print("Executing proc...")
    response = h_db.execute_proc(proc_name, {}, {}, ["X"], {}, [OUTPUT_TABLE], {})
    return response

def create_table(h_db):
    res=h_db.show_table("X")
    type_schema=res["type_schemas"][0]
    type_schema=type_schema[:-2]+""",{"name":"predict","type":"float"}"""+type_schema[-2:]
    properties=res["properties"][0]
    
    response = h_db.create_type(type_definition = type_schema, label = OUTPUT_TABLE, properties=properties)
    if h_db.has_table(table_name = OUTPUT_TABLE)['table_exists']:
        h_db.clear_table(table_name = OUTPUT_TABLE)
    response=h_db.create_table(table_name = OUTPUT_TABLE, type_id = response['type_id'],
                               options={"collection_name": "LinearReg"})
if __name__ == "__main__":
    h_db = gpudb.GPUdb(encoding='BINARY', host=host_ip,port=PORT)
    create_table(h_db)
    Register_UDF(h_db)
    response=execute_UDF(h_db)
    start_time = datetime.now()
    if response['status_info']['status'] == 'OK':
        run_id = response['run_id']
        print('Proc was launched successfully with run_id: ' + run_id)
        while h_db.show_proc_status(run_id)['overall_statuses'][run_id] == 'running':
            time.sleep(3)
            print('process is running... ')
        final_proc_state = h_db.show_proc_status(run_id)['overall_statuses'][run_id]
        print("total running time is ",datetime.now() - start_time)
        print('Final Proc state: ' + final_proc_state)
        if final_proc_state=='error':
            raise RuntimeError('proc error')
    else:
        print('Error launching proc; response: ')
        print(response)
        raise RuntimeError('proc error')


Registering proc...
{'status_info': {u'status': u'OK', u'data_type': u'create_proc_response', u'message': u'', 'response_time': 0.0099}, u'proc_name': u'LR_infer'}
Executing proc...
Proc was launched successfully with run_id: 14
process is running... 
('total running time is ', datetime.timedelta(0, 3, 782004))
Final Proc state: complete


In [10]:
%%bash 
./kisql/kisql -h p4.rewreu.org -sql \
"""
SELECT
    y as real_value, predict, abs(y-predict) as error
FROM LR_output
Limit 10
"""

Connection successful
Catalog [KINETICA]
Time 0.176
+--------------+-------------+--------------+
|   real_value |     predict |        error |
+--------------+-------------+--------------+
|     5.672384 |   6.4574013 |    0.7850175 |
|     8.074842 |    7.074363 |    1.0004792 |
|    7.9338965 |    8.598011 |    0.6641145 |
|     6.284284 |   6.2104464 |   0.07383776 |
|     6.058305 |    6.992298 |   0.93399334 |
|     7.777841 |    8.113915 |   0.33607435 |
|     6.160608 |    6.385312 |   0.22470427 |
|      7.22046 |   6.9435635 |   0.27689648 |
|    6.3354597 |    6.201312 |   0.13414764 |
|      5.33734 |    4.432918 |    0.9044218 |
+--------------+-------------+--------------+
Rows read = 10
Exec time 0.05 Fetch time 0.019
