# Working with Snowflake

In [90]:
## Packages
import swat
import sys
import os
import pandas as pd
import numpy as np
import json


## Options
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_colwidth', None)

## My custom connection package for CAS
try:
    from casauth import CASAuth
    print('Imported personal custom CAS auth package')
except:
    print('casauth package not available')

    
## View versions of packages
print(f'Python version:{sys.version.split("|")[0]}')
print(f'swat version:{swat.__version__}')
print(f'pandas version:{pd.__version__}')
print(f'numpy version:{np.__version__}')

Imported personal custom CAS auth package
Python version:3.11.5 
swat version:1.13.3
pandas version:2.1.4
numpy version:1.26.4


## Connect to CAS
My personal CAS connection information.

In [91]:
###################################################
## My Personal connection using a custom package ##
###################################################
path = os.getenv('CAS_CREDENTIALS')
pem_file = os.getenv('CAS_CLIENT_SSL_CA_LIST')

conn = CASAuth(path, ssl_ca_list = pem_file)

################################
## General connection syntax  ##
################################
# conn = swat.CAS(host, port, username, password)

##########################################
## SAS Viya for Learners 3.5 connection ##
##########################################
# hostValue = os.environ.get('CASHOST')
# portValue = os.environ.get('CASPORT')
# passwordToken=os.environ.get('SAS_VIYA_TOKEN')
# conn = swat.CAS(hostname=hostValue, port=portValue, password=passwordToken)

CAS Connection created


Current version of SAS Viya.

In [92]:
conn.about()['About']['Viya Version']

NOTE: Grid node action status report: 5 nodes, 9 total actions executed.


'Stable 2024.01'

## Connect to Snowflake
Connect to my trial Snowflake environment. I'm using a simple username and password to connect to Snowflake and have stored all of my information in a JSON file. **Please follow any company guidelines to connect to your production Snowflake environment.**

My JSON file template:

{

    "server"  : "<your account>.snowflakecomputing.com",
    "userName": "user-name",
    "password": "password"
}

For more information about connecting to a Snowflake database:
- [SAS Viya Best practices with Snowflake Data](https://video.sas.com/detail/video/6312274491112/sas-viya-best-practices-with-snowflake-data)
- [Documentation - Snowflake Data Connector](https://go.documentation.sas.com/doc/en/pgmsascdc/default/casref/p183rli8obtde3n10y9bzbrpwnsh.htm)

In [94]:
## Get my Snowflake connection information from my JSON file
my_json_file = open(os.getenv('CAS_CREDENTIALS') + '\snowflake_creds.json')
snow_creds = json.load(my_json_file)

## Create a caslib to Snowflake using my specified connection information
cr = conn.addcaslib(name = 'my_snow_db',
                    datasource = dict(
                        srctype = 'snowflake',
                        server = snow_creds['account_url'],     
                        userName = snow_creds['userName'],
                        password = snow_creds['password'],
                        database = "SNOWFLAKE_SAMPLE_DATA", 
                        schema = "TPCH_SF10"
                    )
              )

ERROR: Duplicate Caslib
ERROR: Could not add caslib 'my_snow_db'. Make sure that the caslib does not already exist and that you have permissions to add caslibs to Cloud Analytic Services.
ERROR: The action stopped due to errors.


View available tables in Snowflake.

In [95]:
conn.fileInfo(caslib = 'my_snow_db')

Unnamed: 0,Catalog,Schema,Name,Type,Description
0,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,CUSTOMER,TABLE,Customer data as defined by TPC-H
1,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,LINEITEM,TABLE,Lineitem data as defined by TPC-H
2,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,NATION,TABLE,Nation data as defined by TPC-H
3,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,ORDERS,TABLE,Orders data as defined by TPC-H
4,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,PART,TABLE,Part data as defined by TPC-H
5,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,PARTSUPP,TABLE,Partsupp data as defined by TPC-H
6,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,REGION,TABLE,Region data as defined by TPC-H
7,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,SUPPLIER,TABLE,Supplier data as defined by TPC-H


View available in-memory CAS tables in the **my_snow_db** caslib (should be none since we haven't loaded anything into memory on the CAS server).

In [96]:
conn.tableInfo(caslib = 'my_snow_db')

NOTE: No tables are available in caslib my_snow_db of Cloud Analytic Services.


## Executing SQL Queries with Snowflake

Load the [fedSQL action set](https://go.documentation.sas.com/doc/en/pgmsascdc/default/caspg/cas-fedsql-TblOfActions.htm?fromDefault=).

In [97]:
conn.loadActionSet('fedSQL')

NOTE: Added action set 'fedSQL'.


### Writing FedSQL queries to Snowflake

Execute a simple SQL query through the execDirect action. The method option prints a brief description of the FedSQL query plan. Notice that SAS will pass the ANSI SQL query to Snowflake for execution when possible.

In [98]:
totalRows = '''
    SELECT count(*)
    FROM my_snow_db.part
'''

conn.execDirect(query = totalRows, method = True)

 
Methods for full query plan
----------------------------
        Agg 
          SeqScan from my_snow_db.PART 
 
Offloaded SQL statement
------------------------
 
        select COUNT ( * )  as "COUNT" from "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF10"."PART"
 
NOTE: The SQL statement was fully offloaded to the underlying data source via full pass-through


Unnamed: 0,COUNT
0,2000000.0


Execute a FedSQL group by query.

In [99]:
group_p_mfgr = '''
    SELECT P_MFGR, count(*)
    FROM MY_SNOW_DB.PART
    GROUP BY P_MFGR
'''
conn.execDirect(query = group_p_mfgr, method = True)

 
Methods for full query plan
----------------------------
        Agg 
          Sort 
            SeqScan from my_snow_db.PART 
 
Offloaded SQL statement
------------------------
 
        select "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF10"."PART"."P_MFGR", COUNT ( * )  as "COUNT" from "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF10"."PART" group by "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF10"."PART"."P_MFGR"
 
NOTE: The SQL statement was fully offloaded to the underlying data source via full pass-through


Unnamed: 0,P_MFGR,COUNT
0,Manufacturer#4,400154.0
1,Manufacturer#1,399506.0
2,Manufacturer#2,399091.0
3,Manufacturer#3,400964.0
4,Manufacturer#5,400285.0


### Writing native Snowflake SQL (Explicit pass-through)

You can use SAS explicit pass through to execute native Snowflake SQL, pushing all of the processing into Snowflake, and only returning the smaller results back.

I'll first run the fileInfo action again to view the available tables in the database. You will see the actual database table name (SNOWFLAKE_SAMPLE_DATA), the schema (TPCH_SF10), and all of the available tables.

In [87]:
conn.fileInfo(caslib = 'my_snow_db')

Unnamed: 0,Catalog,Schema,Name,Type,Description
0,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,CUSTOMER,TABLE,Customer data as defined by TPC-H
1,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,LINEITEM,TABLE,Lineitem data as defined by TPC-H
2,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,NATION,TABLE,Nation data as defined by TPC-H
3,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,ORDERS,TABLE,Orders data as defined by TPC-H
4,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,PART,TABLE,Part data as defined by TPC-H
5,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,PARTSUPP,TABLE,Partsupp data as defined by TPC-H
6,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,REGION,TABLE,Region data as defined by TPC-H
7,SNOWFLAKE_SAMPLE_DATA,TPCH_SF10,SUPPLIER,TABLE,Supplier data as defined by TPC-H


What if a coworker uses Snowflake SQL, while you work within SAS Viya and the Python SWAT package. Your coworker has written a query for you and sent it your way to summarize the data. You want to run it through Viya to utilize the results for another process. What can you do? What if we just run the native Snowflake query? Let's try it.

On the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/sample-data-tpch) there is an example query. It answers the business question "The Pricing Summary Report Query provides a summary pricing report for all line items shipped as of a given date. The date is within 60-120 days of the greatest ship date contained in the database.". I'll run the query.

In [88]:
snowflakeSQL = '''
select
       l_returnflag,
       l_linestatus,
       sum(l_quantity) as sum_qty,
       sum(l_extendedprice) as sum_base_price,
       sum(l_extendedprice * (1-l_discount)) as sum_disc_price,
       sum(l_extendedprice * (1-l_discount) * (1+l_tax)) as sum_charge,
       avg(l_quantity) as avg_qty,
       avg(l_extendedprice) as avg_price,
       avg(l_discount) as avg_disc,
       count(*) as count_order
 from
       SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.LINEITEM
 where
       l_shipdate <= dateadd(day, -90, to_date('1998-12-01'))
 group by
       l_returnflag,
       l_linestatus
 order by
       l_returnflag,
       l_linestatus;
'''

conn.execDirect(query = snowflakeSQL, method = True)

ERROR: Table "SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.LINEITEM" does not exist or cannot be accessed
ERROR: The action stopped due to errors.


Use explicit pass through to execute the native Snowflake SQL query.

In [89]:
snowflakeSQL = '''
SELECT * FROM CONNECTION TO MY_SNOW_DB
(select
       l_returnflag,
       l_linestatus,
       sum(l_quantity) as sum_qty,
       sum(l_extendedprice) as sum_base_price,
       sum(l_extendedprice * (1-l_discount)) as sum_disc_price,
       sum(l_extendedprice * (1-l_discount) * (1+l_tax)) as sum_charge,
       avg(l_quantity) as avg_qty,
       avg(l_extendedprice) as avg_price,
       avg(l_discount) as avg_disc,
       count(*) as count_order
 from
       SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.LINEITEM
 where
       l_shipdate <= dateadd(day, -90, to_date('1998-12-01'))
 group by
       l_returnflag,
       l_linestatus
 order by
       l_returnflag,
       l_linestatus);
'''

conn.execDirect(query = snowflakeSQL, 
                method = True)

 
Methods for full query plan
----------------------------
        SeqScan from my_snow_db.__fedsql_cep_1__ 
 
Methods for stage 1
--------------------
        FedSQL did not generate a plan. Entire query can be pushed to driver.
 


Unnamed: 0,L_RETURNFLAG,L_LINESTATUS,SUM_QTY,SUM_BASE_PRICE,SUM_DISC_PRICE,SUM_CHARGE,AVG_QTY,AVG_PRICE,AVG_DISC,COUNT_ORDER
0,A,F,377518399.0,566065700000.0,537759100000.0,559276700000.0,25.500975,38237.151009,0.050007,14804077.0
1,N,F,9851614.0,14767440000.0,14028810000.0,14590490000.0,25.522448,38257.81066,0.049973,385998.0
2,N,O,743124873.0,1114302000000.0,1058581000000.0,1100937000000.0,25.498076,38233.902923,0.050001,29144351.0
3,R,F,377732830.0,566431100000.0,538110900000.0,559634800000.0,25.508385,38251.219274,0.049997,14808183.0


### Forcing the SQL to run in Snowflake

Execute another fedSQL query. Notice that again SAS temporarily loads the entire table into memory to process the query. Moving data from Snowflake back to SAS again can be time consuming for large data.

In [None]:
myQuery = '''
    SELECT *
    FROM MY_SNOW_DB.PART
    LIMIT 10
'''

conn.execDirect(query = myQuery, method = True)

In [None]:
myQuery = '''
    SELECT *
    FROM MY_SNOW_DB.PART
    LIMIT 10
'''

conn.execDirect(query = myQuery, 
                method = True,
                cntl={'requireFullPassThrough':True})

In [None]:
myQuery = '''
    SELECT * FROM CONNECTION TO MY_SNOW_DB
        (SELECT * exclude(P_COMMENT, P_NAME)
         FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.PART
         LIMIT 10)
'''

conn.execDirect(query = myQuery, 
                method = True)

## Terminate the CAS session

In [None]:
conn.terminate()

# Additional Resources

- [Getting Started with Python Integration to SAS® Viya® - Index](https://blogs.sas.com/content/sgf/2020/06/19/getting-started-with-python-integration-to-sas-viya-index/)
- [FedSQL Action Set: Syntax](https://go.documentation.sas.com/doc/en/pgmsascdc/default/caspg/cas-fedsql-TblOfActions.htm?fromDefault=)
- [SAS Viya Best practices with Snowflake Data](https://video.sas.com/detail/video/6312274491112/sas-viya-best-practices-with-snowflake-data)
- [Documentation - Snowflake Data Connector](https://go.documentation.sas.com/doc/en/pgmsascdc/default/casref/p183rli8obtde3n10y9bzbrpwnsh.htm)
- [FedSQL Implicit Pass-Through Facility for CAS](https://communities.sas.com/t5/SAS-Communities-Library/FedSQL-Implicit-Pass-Through-Facility-for-CAS/ta-p/459556)
- [Proc FedSQL and Multi-node load to CAS](https://communities.sas.com/t5/SAS-Communities-Library/Proc-FedSQL-and-Multi-node-load-to-CAS/ta-p/824773)