# Sample Project: Sqlite Connector UDF

In this tutorial you will create a Connector UDF that reads data from an external source and imports it into Xcalar. This is similar to the Parser UDF examples, but instead of a source file such as a csv, Connector UDFs take a config file that abstracts out the connection and points it to a database or other resources containing data.
<HTML>
<br>
<div style="position: relative;
    padding: 10px 10px 10px 100px;
    border: 1px solid #BFBFBF;
    box-shadow: 5px 5px 5px #aaaaaa;">
    <img src="xi-questionmark_yellow.png" 
         style="position: absolute; top: 15x; left: 10px; width:50px; height:50px" />
    <font style="font-size:20px">
What is a connector UDF?</font>
<br>Connector UDFs import data from remote data sources such as database servers, REST APIs or custom data APIs. To learn more about UDFs refer to
[this link](https://www.xcalar.com/documentation/help/XD/1.3.1/Content/C_AdvancedTasks/B_UDFUnderstand.htm?Highlight=MAP%20UDFs/).
    
</div>
<br>
</HTML>


<HTML>
<br>
<div style="position: relative;
    padding: 10px 10px 10px 100px;
    border: 1px solid #BFBFBF;
    box-shadow: 5px 5px 5px #aaaaaa;">
    <img src="xi-questionmark_yellow.png" 
         style="position: absolute;top: 20x;left: 10px;width:50px ;height:50px" />
    <font style="font-size:20px">
What is a config file?</font>
<br>Like parser UDFs, connector UDFs also take two arguments, a file name and an io stream to the given file.
Unlike parser UDFs, the data source of a connector UDF might be a database or a set of web resources rather than an actual data source file. One of the best practices is to point to a configuration file in place of datasource file when invoking a connector UDF. This configuration file abstracts out the connection and might store parameters such as database connection strings, user credentials, filter criterias, etc. 
</div>
<br>
</HTML>


### SQLite

Sqlite is a lightweight open source relational database engine which is embedded in the client program rather than being run as a separate db server. This serverless architecture makes SQLite a good solution for rapid prototyping. This chapter will teach you how to implement a SQLite connector for Xcalar. 


First, we need to create a SQLite database file at a location accessible to xcalar and create a securities table with the schema below:  
```
CREATE TABLE stocks (  
security TEXT,  
date TEXT,  
asked NUMBER,  
bid NUMBER,  
sale NUMBER,  
volume NUMBER  
);  
```

The following code block creates a SQLite file, the stocks table and then it inserts 10000 random records. Highlight the cell and run.

In [1]:
import sqlite3
import random
from datetime import datetime, timedelta
def genRandomMarketData(dayCount):
    #headers ="Security,date,Bid,Ask,Bid size,Ask size,Last Sale,Last size,Volume"
    #fake stock names
    securities = ["GGX" ,"ABC" , "ZZM" , "XEW" , "FFG" ,"UYT","RTF"]
    # Connecting to the database file
    sqlite_file = '/tmp/stocks.sqlite'    # name of the sqlite database file
    query = """
        CREATE TABLE stocks (
        security TEXT,
        date TEXT,
        asked NUMBER,
        bid NUMBER,
        sale NUMBER,
        volume NUMBER
        );
        """
    conn = sqlite3.connect(sqlite_file)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS stocks;")
    c.execute(query)
    for rec in range (1, dayCount +1):
        date = datetime.today() - timedelta(days=rec)
        for security in securities:
            ask = random.uniform(40,500)
            bid = ask + random.uniform(1,3)
            bidSize = random.randint(400,1500)
            askSize = random.randint(400,1500)
            lastSale = ask + random.uniform(-3,+3)
            lastSize = random.randint(400,1500)
            volume = random.randint(5000,15000)
            query = "INSERT INTO stocks(security,date,asked,bid,sale,volume)VALUES('"
            query += security + "','"+date.strftime('%m.%d.%Y')+"','"+str(ask)+"','"+str(bid)+"','"+str(lastSale)+"','"+str(volume)+"')";
            c.execute(query)
# Committing changes and closing the connection to the database file
    conn.commit()
    conn.close()
    print('Data has been generated!')
    
genRandomMarketData(10000)

Data has been generated!


### Check that the database was properly created

To check if your database was created, run the code below to connect which your SQLite database and pull 10 records.

In [2]:
import sqlite3
sqlite_file = '/tmp/stocks.sqlite'    # name of the sqlite database file
conn = sqlite3.connect(sqlite_file)
cursor = conn.cursor()
cursor = conn.execute("SELECT * from stocks LIMIT 10")
for row in cursor:
    print (row)

('GGX', '05.25.2018', 425.66889640438916, 426.8253079673379, 427.2704239296686, 10407)
('ABC', '05.25.2018', 426.25197563290044, 428.77873543312677, 424.7202662094225, 12164)
('ZZM', '05.25.2018', 73.45585614006947, 75.35799138192672, 75.25071873562945, 12720)
('XEW', '05.25.2018', 290.58311531628135, 293.36638981870686, 288.96322349468323, 6049)
('FFG', '05.25.2018', 210.2101149953876, 212.1829126146943, 213.10110184037498, 8797)
('UYT', '05.25.2018', 216.92426586651501, 219.7458985286148, 219.0758685489486, 14973)
('RTF', '05.25.2018', 219.69802689012806, 221.07247188947375, 222.31194217384063, 13573)
('GGX', '05.24.2018', 419.8566644707482, 421.5340377856436, 420.7113040940483, 12611)
('ABC', '05.24.2018', 248.10383365272918, 250.18304053343934, 246.34317939913421, 13141)
('ZZM', '05.24.2018', 299.28948401456546, 301.692765720327, 300.65308302922773, 11276)


### Implementing the Config File

Let’s build our config file. Usually a config file stores various connection and query parameters such as database name, host, type, query etc. For the purposes of this example let’s make it simple: our config file will contain location of the SQLite database file and the list of stocks to import.

The following code block creates a SQLite config file and writes it to the `/tmp/` folder. You may want to keep these config files in a more strategic directories in your actual implementation.

In [3]:
# To establish a connection to a sqlite file you need  the location of the database file
# You also add query parameters in our config file. Stocks are the securities to query in our UDF
config_data = """{
  "database file" : "/tmp/stocks.sqlite", 
  "stocks":["GGX" ,"ABC" , "ZZM" , "XEW" , "FFG" ,"UYT","RTF"] 
}"""

file_path = "/tmp/sqlite.config"
file = open(file_path,"w") 
file.write (config_data)
file.close()

# test the config file was created
import os
print("Created {} file: {} bytes".format(file_path, os.stat(file_path).st_size))

Created /tmp/sqlite.config file: 110 bytes


### Implementing the Connector UDF

Remember that the *fullPath* parameter is the location of the config file and the *inStream* reads from the config file and not from the database file. In the code below we establish the database connection to the SQLite file and retrieve the data.

The code below iterates over each row in the source dataset and inserts it into the target table. 
The **record = {}** variable is a dictionary with column name as keys and column values as values.
Each **record** variable represents a single row containing all columns to be inserted into the new Xcalar table.
Each record is passed to the Python yield generator to be inserted into the table.

In [4]:
# Query stocks table from  sqlite database
# Return rows in Xcalar import format
def sqlite3_import_udf (fullPath, inStream): #<== First param of the UDF is giving us the config file
    import sqlite3
    import codecs 
    import datetime
    from ast import literal_eval           #to deserialize dictionary in config file
    Utf8Reader = codecs.getreader("utf-8") #  Xcalar opens and streams files in binary mode,
    utf8Stream = Utf8Reader(inStream)      #  We need a codec to convert it to UTF-8
    config = utf8Stream.read()             #  Read content of the config file to a python string
    dict = literal_eval(config)            # convert the string to a dictionary   
    conn = sqlite3.connect(dict ["database file"])
    where_clause = " WHERE "
    index = 0
    for stock in dict["stocks"]: # build WHERE CLAUSE
        where_clause += "security ='" + stock + "'"
        if index < len(dict["stocks"]) - 1:
            where_clause +=" OR "
        index += 1
    cursor = conn.execute("select * from stocks"+where_clause)
    headers = [description[0] for description in cursor.description]
    for row in cursor:
        column_indx = 0;
        record = {}
        for val in row:
            record[headers[column_indx]] = val
            column_indx += 1
        yield (row)


### Adding this UDF to Xcalar

<HTML>
    <br>
Adding a UDF to Xcalar has already been covered in <a href="./1%20-%20Import%20UDF%20Simple%20Parser.ipynb" target="_self">Import UDF Simple Parser</a>. As a reminder, the steps you need to take are:
<br>
</HTML>

### Connecting Jupyter Notebook to Xcalar

Before utilizing this UDF, you need to establish a connection to the Xcalar session. 

1.   Click the <b>CODE SNIPPETS</b> dropdown menu in the top right corner of Jupyter.
2.   Select <b>Connect to Xcalar Workbook</b>.
3.   Run the code cell containing the generated code to connect Jupyter to you current workbook.

Please highlight the following cell and <b>Run</b>.

In [5]:
# Xcalar Notebook Connector
# 
# Connects this Jupyter Notebook to the Xcalar Workbook <wb-1>
#
# To use any data from your Xcalar Workbook, run this snippet before other 
# Xcalar Snippets in your workbook. 
# 
# A best practice is not to edit this cell.
#
# If you wish to use this Jupyter Notebook with a different Xcalar Workbook 
# delete this cell and click CODE SNIPPETS --> Connect to Xcalar Workbook.

%matplotlib inline

# Importing third-party modules to facilitate data work. 
import pandas as pd
import matplotlib.pyplot as plt

# Importing Xcalar packages and modules. 
# For more information, search and post questions on discourse.xcalar.com
from xcalar.compute.api.XcalarApi import XcalarApi
from xcalar.compute.api.Session import Session
from xcalar.compute.api.WorkItem import WorkItem
from xcalar.compute.api.ResultSet import ResultSet

# Create a XcalarApi object
xcalarApi = XcalarApi()
# Connect to current workbook that you are in
workbook = Session(xcalarApi, "xdpadmin", "xdpadmin", 4399150, True, "TutorialNotebooks-HelloUDF-Full")
xcalarApi.setSession(workbook)

1.   Click the <b>CODE SNIPPETS</b> dropdown menu in the top right corner of Jupyter.
2.   Select <b>Connect to Xcalar workbook</b>.
3.   Run the code cell containing the generated code to connect Jupyter to you current workbook.
4.   Again, click the <b>CODE SNIPPETS</b> dropdown.
5.   This time select <b>Create Import UDF</b>.

After creating the Import UDF, add your code into the template. If you follow these steps, you should end up with the code snippet below.


In [6]:
# Xcalar Import UDF Template
#
# This is a function definition for a Python UDF to import external data source
# file <Default Shared Root:/tmp/sqlite.config>
#
# Module name: <simple_connector>
# Function name: <sqlite3_import_udf>
#
# REQUIREMENTS: Import UDF functions take two arguments...
#   fullPath: The file path to the data source file being imported.
#   inStream: A binary stream of the data source file.
#
#   Your Import UDF function must be a generator, a Python function which
#   processes and returns a stream of data.
#
# To create an import UDF, modify the function definition immediately below this
# comment, as necessary.
#
# To test your UDF, run this cell. (Hit <control> + <enter>.)
#
# To apply it to your dataset, click the "Apply UDF on Dataset Panel" button.
#
#
# NOTE: Use discipline before replacing this module. Consider whether the import of older
# data source files using this UDF will be affected by this change. If so, versioning this
# module may be appropriate.
#
# Best practice is to name helper functions by starting with __. Such
# functions will be considered private functions and will not be directly
# invokable from Xcalar tools.

# Function definition for your Import UDF.
def sqlite3_import_udf (fullPath, inStream): #<== First param of the UDF is giving us the config file
    import sqlite3
    import codecs 
    import datetime
    from ast import literal_eval             #to deserialize dictionary in config file
    Utf8Reader = codecs.getreader("utf-8")   #  Xcalar opens and streams files in binary mode,
    utf8Stream = Utf8Reader(inStream)        #  We need a codec to convert it to UTF-8
    #Read config Param
    config = utf8Stream.read()               #  Read content of the config file to a python string
    #load the config file into a dictionary
    dict = literal_eval(config)              # convert the string to a dictionary   
    conn = sqlite3.connect(dict ["database file"])
    where_clause = " WHERE "
    index = 0
    for stock in dict["stocks"]:             # build WHERE CLAUSE
        where_clause += "security ='" + stock + "'"
        if index < len(dict["stocks"]) - 1:
            where_clause +=" OR "
        index += 1
    cursor = conn.execute("select * from stocks" + where_clause)
    headers = [description[0] for description in cursor.description]
    for row in cursor:
        column_indx = 0;
        record = {}
        for val in row:
            record[headers[column_indx]] = val
            column_indx += 1
        yield (record)

### WARNING DO NOT EDIT CODE BELOW THIS LINE ###
from xcalar.compute.api.Dataset import *
from xcalar.compute.coretypes.DataFormatEnums.ttypes import DfFormatTypeT
from xcalar.compute.api.Udf import Udf
from xcalar.compute.coretypes.LibApisCommon.ttypes import XcalarApiException
import random

def uploadUDF():
    import inspect
    sourceCode = "".join(inspect.getsourcelines(sqlite3_import_udf)[0])
    try:
        Udf(xcalarApi).add("simple_connector", sourceCode)
    except XcalarApiException as e:
        if e.status == StatusT.StatusUdfModuleAlreadyExists:
            Udf(xcalarApi).update("simple_connector", sourceCode)

def testImportUDF():
    from IPython.core.display import display, HTML
    userName = "temp"
    tempDatasetName = userName + "." + str(random.randint(10000,99999)) + "jupyterDS" + str(random.randint(10000,99999))
    dataset = UdfDataset(xcalarApi,
        "Default Shared Root",
        "/tmp/sqlite.config",
        tempDatasetName,
        "simple_connector:sqlite3_import_udf")

    dataset.load()

    resultSet = ResultSet(xcalarApi, datasetName=dataset.name, maxRecords=100)

    NUMROWS = 100
    rowN = 0
    numCols = 0
    headers = []
    data = []
    for row in resultSet:
        if rowN >= NUMROWS:
            break
        newRow = [""] * numCols
        for key in row:
            idx = headers.index(key) if key in headers else -1
            if idx > -1:
                newRow[idx] = row[key]
            else:
                numCols += 1
                newRow.append(row[key])
                headers.append(key)
        data.append(newRow)
        rowN += 1
    data = [row + [""] * (numCols - len(row)) for row in data]

    print("The following should look like a proper table with headings.")
    display(HTML(
            '<table><tr><th>{}</th></tr><tr>{}</tr></table>'.format(
            '</th><th>'.join(headers),
            '</tr><tr>'.join('<td>{}</td>'.format('</td><td>'.join(str(_) for _ in row)) for row in data)
            )))

    dataset.delete()
    print("End of UDF")

# Test import UDF on file
uploadUDF()
testImportUDF()

The following should look like a proper table with headings.


security,date,asked,bid,sale,volume
GGX,05.25.2018,425.6688964043892,426.8253079673379,427.2704239296686,10407
ABC,05.25.2018,426.2519756329005,428.7787354331268,424.7202662094225,12164
ZZM,05.25.2018,73.45585614006947,75.35799138192672,75.25071873562945,12720
XEW,05.25.2018,290.5831153162813,293.36638981870686,288.96322349468323,6049
FFG,05.25.2018,210.2101149953876,212.1829126146943,213.10110184037495,8797
UYT,05.25.2018,216.924265866515,219.7458985286148,219.0758685489486,14973
RTF,05.25.2018,219.6980268901281,221.07247188947372,222.31194217384063,13573
GGX,05.24.2018,419.8566644707482,421.5340377856436,420.7113040940483,12611
ABC,05.24.2018,248.1038336527292,250.1830405334393,246.3431793991342,13141
ZZM,05.24.2018,299.28948401456546,301.692765720327,300.65308302922773,11276


End of UDF


## Creating a Table from your Data
<HTML>
    <br>
As covered in <a href="./1%20-%20Import%20UDF%20Simple%20Parser.ipynb" target="_self">Import UDF Simple Parser</a>, use your UDF to create a Xcalar table from the SQLite source.
<br>
</HTML>


1. Click the Datasets icon in the XD menu.
2. In the <b>Import Data Source</b> form select 'Default Shared Root' for <b>Data Target</b> and '/tmp/sqlite.config' for <b>Data Source Path</b>.
3. Click <b>NEXT</b>.
4. In the next page, change the <b>Format</b> to 'Custom Format', this will change the other fields allowing you to select a UDF.
5. Select the Module you created, and 'sqlite3_import_udf' in <b>Function</b>.
6. Click <b>CREATE DATASET</b>.
7. The next page will show a preview of your table, slect all columns and click <b>CREATE TABLE</b>.

<html>
 Next: <a href="./4%20-%20Map%20UDF%20Example.ipynb" target="_self">4 - Map UDF Example</a><br>
 Back to <a href="./0%20-%20Introduction.ipynb" target="_self">Introduction</a><br>
</html>