## omega|ml - snowflake plugin

This plugin enables working with snowflake data sources directly in omega|ml

* store & retrieve connections to snowflake for dynamic queries using SQL (dynamic: at runtime)
* store & retrieve views to snowflake (storing connection & static SQL)
* copy data from snowflake to omega|ml for further processing

Installation

1. install dependencies: `pip install -U snowflake-sqlalchemy==1.2.1 cffi==1.14`
2. install the plugin using getgist: `getgist -y omegaml omx_snowflake.py`
3. load the plugin: `import omx_snowflake`
4. register the plugin: `om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)`

Details see below

Usage

`om.datasets.put('snowflake://user:password@account', 'omega-dataset-name', sql='select ...', copy=True)`

details see `help(omx_snowflake)`

Version history

- 0.1.0 - initial version (without support for copying data)
- 0.1.1 - support copying of data
- 0.1.2 - provide more robustness in parallel inserts on copy 

In [5]:
# install dependencies
!pip install -U snowflake-sqlalchemy==1.2.1 'cffi<1.14'
from sqlalchemy.dialects import registry
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')

In [2]:
# install the plugin
!pip install -q getgist
!rm -f *snowflake.py && getgist -y omegaml omx_snowflake.py

  Fetching https://api.github.com/users/omegaml/gists[0m
  Reading https://gist.githubusercontent.com/omegaml/8979e42667803c5a938e7bdbe31bfb85/raw/1371141c2ba53a3a963ad3a7c1c58f53cf9878e5/omx_snowflake.py[0m
  Saving omx_snowflake.py[0m
[32m  Done![0m


In [1]:
# load the plugin
import omegaml as om
import omx_snowflake
from omx_snowflake import SnowflakeDataBackend
om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)

snowflake plugin 0.1.3: to install execute the following line of code
> om.datasets.register_backend(SnowflakeDataBackend.KIND, SnowflakeDataBackend)


OmegaStore(bucket=omegaml, prefix=data/)

In [4]:
# get more information
help(omx_snowflake)

Help on module omx_snowflake:

NAME
    omx_snowflake

CLASSES
    omegaml.backends.basedata.BaseDataBackend(builtins.object)
        SnowflakeDataBackend
    
    class SnowflakeDataBackend(omegaml.backends.basedata.BaseDataBackend)
     |  Snowflake plugin for omegaml
     |  
     |  Installation:
     |      copy/paste above into a cell, execute, then run this to register      
     |      
     |      Alternatively install getgist
     |      
     |      !pip install getgist
     |      !getgist 
     |      
     |  Pre-Requisites:
     |      make sure you have the following packages installed
     |      
     |      !pip install -U snowflake-sqlalchemy==1.2.1
     |      
     |  Usage:
     |      # define your snowflake connection
     |      snowflake_constr = f'snowflake://{user}:{password}@{account}/'
     |      
     |      # store in any of three ways
     |      
     |      # -- just the connection
     |      om.datasets.put(snowflake_constr, 'mysnowflake')
     | 

In [9]:
secrets = om.datasets.get('secrets')[0]

In [12]:
# build connection string
from getpass import getpass
#user = input('snowflake user name> ')
#password = getpass('snowflake password> ')
#account = input('snowflake account (remove .snowflake.com)> ')
snowflake_cxstr = 'snowflake://{user}:{password}@{account}/'.format(**secrets)

In [13]:
# store just the connection
om.datasets.drop('mysnowflake', force=True)
om.datasets.put(snowflake_cxstr, 'mysnowflake')
om.datasets.get('mysnowflake')

<sqlalchemy.engine.base.Connection at 0x7ffb755cbe80>

In [14]:
# store a connection reference with sql 
om.datasets.drop('mysnowflake', force=True)
om.datasets.put(snowflake_cxstr, 'mysnowflake', 
                sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')
om.datasets.get('mysnowflake')

Unnamed: 0,COUNT(*)
0,6001215


In [5]:
# query the connection with a specific sql, returning a pandas dataframe
om.datasets.drop('mysnowflake', force=True)
om.datasets.put(snowflake_cxstr, 'mysnowflake')
om.datasets.get('mysnowflake', 
                sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem')

Unnamed: 0,COUNT(*)
0,6001215


In [6]:
# copy the dataset to a native omegaml dataset
om.datasets.put(snowflake_cxstr, 
                'mysnowflake', 
                sql='select count(*) from snowflake_sample_data.tpch_sf1.lineitem',
                copy=True)
om.datasets.get('mysnowflake')

1rows [00:00,  7.71rows/s]


Unnamed: 0,COUNT(*)
0,6001215


In [7]:
# copy the dataset to a native omegaml dataset
om.datasets.drop('mysnowflake', force=True)
om.datasets.put(snowflake_cxstr, 
                'mysnowflake', 
                sql='select * from snowflake_sample_data.tpch_sf1.lineitem limit 100000',
                parse_dates=['l_shipdate', 'l_receiptdate', 'l_commitdate'],
                chunksize=50000,
                append=False,
                copy=True)
len(om.datasets.getl('mysnowflake'))

100000rows [00:09, 14153.82rows/s]


100000