# What is this notebook for?

Data engineers and data scientists often have a need to import datasets from external sources. 

With Snowflake's [external network access feature](https://docs.snowflake.com/en/developer-guide/external-network-access/external-network-access-overview) and the [ability to use 3rd party Python packages](https://medium.com/snowflake/snowflake-cli-tutorial-upload-and-use-non-snowflake-anaconda-channel-packages-in-snowflake-888eea8a9742) it is now possible to do that in Snowpark. This notebook is demo purposes only and shouldn't be used as-is in a production environment. Adapt to code to your needs and makes sure you have proper data governance in place (roles and data access).

In [1]:
# Import python packages
import json
from snowflake.snowpark import Session
    
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSessionException

try:
    # get the context if this is running within Snowflake (e.g. on a Snowflake Notebook)
    session = get_active_session()
except SnowparkSessionException:
    # otherwise, load creds from a file
    snowflake_connection_cfg = json.loads(open('.creds/creds-sample.json').read())

# Creating Snowpark Session
    session = Session.builder.configs(snowflake_connection_cfg).create()

session.custom_package_usage_config = {"enabled": True}

print('Role:     ', session.get_current_role())
print('Warehouse:', session.get_current_warehouse())
print('Database: ', session.get_current_database())
print('Schema:   ', session.get_current_schema())



Parameter custom_package_usage_config is experimental since 1.6.0. Do not use it in production. 


Role:      "DATASCIENTIST"
Warehouse: "DS_XS"
Database:  "SNOWPARK_PLAYGROUND"
Schema:    "CUSTOM_PACKAGES"


# Setup:
- Database
- External network access
- Logging (event table), just in case things go south

In [2]:
session.use_role('datascientist')
session.sql('create schema if not exists snowpark_playground.hug_datasets')

<snowflake.snowpark.dataframe.DataFrame at 0x7f9b98648310>

In [3]:
sql_text= """CREATE OR REPLACE NETWORK RULE huggingface_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('huggingface.co', 'cdn-lfs-us-1.huggingface.co', 'cdn-lfs.huggingface.co', 's3.amazonaws.com');"""

session.sql(sql_text)  

<snowflake.snowpark.dataframe.DataFrame at 0x7fdf3a76b2b0>

In [4]:
sql_text= """CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION huggingface_access_integration
  ALLOWED_NETWORK_RULES = (huggingface_network_rule)
  ENABLED = TRUE;"""

session.sql(sql_text)  

<snowflake.snowpark.dataframe.DataFrame at 0x7fdf3a76b280>

In [5]:
# enable this if logging is required

# session.sql('grant all on schema snowpark_playground.hug_datasets to role accountadmin;')
# session.use_warehouse('public_xs')
# session.use_role('accountadmin')
# session.sql('CREATE TABLE IF NOT EXISTS snowpark_playground.hug_datasets.my_events;')
# session.sql('ALTER ACCOUNT SET EVENT_TABLE = snowpark_playground.hug_datasets.my_events;')
# session.sql('ALTER ACCOUNT SET LOG_LEVEL = DEBUG')
# session.sql('grant all on table snowpark_playground.hug_datasets.my_events to role datascientist')
# session.use_role('datascientist')

session.sql('SHOW PARAMETERS LIKE \'event_table\' IN ACCOUNT').show()


-------------------------------------------------------------------------------------------------------------------------------------
|"key"        |"value"                                     |"default"  |"level"  |"description"                            |"type"  |
-------------------------------------------------------------------------------------------------------------------------------------
|EVENT_TABLE  |snowpark_playground.hug_datasets.my_events  |           |ACCOUNT  |Event destination for the given target.  |STRING  |
-------------------------------------------------------------------------------------------------------------------------------------



In [6]:
session.use_role('datascientist')

# How this works
- Patching the datasets library:
    - download source bundle: https://files.pythonhosted.org/packages/ff/d5/d0fffd6afdf24c062e3c289f0b13f7636f074005c1e76e322633e8c5508c/datasets-2.18.0.tar.gz
    - take out the src/datasets
    - patch it:
        - remove all usages of os.umask() (**currently not available on Snowpark**)
        - removed the import of arrow-hotfix (not available but also not needed for arrow vers. > 14.0)
    - zip i.e. create datasets.zip
    - upload the .zip to a stage (using the UI, SnowSQL or Snow CLI): snow snowpark package upload -f datasets.zip -s packages -c packages --overwrite
- Write the SP
    - it has to have an import clause for the package(s) on the stage: IMPORTS = ('@snowpark_playground.custom_packages.packages/datasets.zip')
    - 


In [None]:
# Create a Snowpark session
session = Session.builder().create()

# Define the stored procedure code
procedure_code = """
def my_function():
    # Your code here
    return "Hello from Snowpark stored procedure!"

result = my_function()
return result
"""


In [20]:
from snowflake.snowpark.functions import sproc

@sproc(packages=['snowflake-snowpark-python', 'pathlib', 'filelock', 'numpy', 'pyarrow', 'dill', 'pandas', 'requests', 'tqdm', 'multiprocess', 'fsspec', 'aiohttp', 'huggingface_hub', 'packaging', 'pyyaml','python-xxhash'],
       external_access_integrations=['huggingface_access_integration'], imports=['@snowpark_playground.custom_packages.packages/datasets.zip'],
       name="get_data", is_permanent=True, stage_location="@snowpark_playground.custom_packages.packages",
       replace=True)
def get_data(session: Session, dataset_name: str, table_name:str) -> str:
    import datasets
    import os
    import logging
    import shutil

    logging.basicConfig(level=logging.DEBUG)
    logging.warning("Watch out, I'm in the SPROC now!")

    cache_dir="/tmp/home/.cache"
    os.makedirs(cache_dir, exist_ok=True)
    os.environ['HOME'] = "/tmp/home"

    dset = datasets.load_dataset(dataset_name)
    dset.set_format("pandas")
    for dset_chunk in dset['train'].to_pandas(batch_size=10000, batched=True):
        # process dataframes
        session.write_pandas(dset_chunk, table_name, auto_create_table=True)
    shutil.rmtree('/tmp/home')
    return "Success!"


Package 'python-xxhash' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.


In [7]:
CREATE OR REPLACE PROCEDURE snowpark_playground.hug_datasets.huggingface_load(dataset_name STRING, table_name STRING)
RETURNS variant
LANGUAGE PYTHON
RUNTIME_VERSION = 3.10
HANDLER = 'get_data'
EXTERNAL_ACCESS_INTEGRATIONS = (huggingface_access_integration)
PACKAGES = ('snowflake-snowpark-python', 'pathlib', 'filelock', 'numpy', 'pyarrow', 'dill', 'pandas', 'requests', 'tqdm', 'multiprocess', 'fsspec', 'aiohttp', 'huggingface_hub', 'packaging', 'pyyaml','python-xxhash')
IMPORTS = ('@snowpark_playground.custom_packages.packages/datasets.zip')
AS
$$
import _snowflake
import os
import pathlib
import sys
import pandas
import logging
import shutil

logging.basicConfig(level=logging.DEBUG)
logging.warning("Watch out, I'm in the UDF now!")

cache_dir="/tmp/home/.cache"
os.makedirs(cache_dir, exist_ok=True)
os.environ['HOME'] = "/tmp/home"

# import datasets as ds

# session = requests.Session()
def get_data(session, dataset_name, table_name):
    import datasets

    dset = datasets.load_dataset(dataset_name)
    dset.set_format("pandas")
    for dset_chunk in dset['train'].to_pandas(batch_size=10000, batched=True):
        # process dataframes
        session.write_pandas(dset_chunk, table_name, auto_create_table=True)
    shutil.rmtree('/tmp/home')
    return "Success!"
$$;

SyntaxError: invalid syntax (726706977.py, line 1)

In [22]:
db = 'snowpark_playground'
stage = 'hug_datasets'

dataset_name='ai4privacy/pii-masking-300k'
table_name = 'ai4privacy_pii_masking_300k'.upper()

session.use_database(db)
session.use_schema(stage)

session.use_warehouse('ds_m_snowpark')
session.sql(f"drop table if exists {table_name}").collect()
session.call(f"{db}.{stage}.huggingface_load",dataset_name, table_name)

session.use_warehouse('ds_xs')

In [21]:
db = 'snowpark_playground'
stage = 'hug_datasets'

dataset_name='ai4privacy/pii-masking-300k'
table_name = 'ai4privacy_pii_masking_300k'.upper()

session.use_database(db)
session.use_schema(stage)

session.use_warehouse('ds_m_snowpark')
session.sql(f"drop table if exists {table_name}").collect()
session.call(f"{db}.{stage}.get_data",dataset_name, table_name)

session.use_warehouse('ds_xs')

SnowparkSQLException: (1304): 01b39e81-0205-1029-0002-3f4b003bedae: 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "_udf_code.py", line 34, in compute
  File "/var/folders/ds/ppnd5sh16wzb68t1n1zmgjl00000gn/T/ipykernel_76167/3817284023.py", line 20, in get_data
  File "/home/udf/9651816061/datasets.zip/datasets/load.py", line 2556, in load_dataset
    builder_instance = load_dataset_builder(
  File "/home/udf/9651816061/datasets.zip/datasets/load.py", line 2228, in load_dataset_builder
    dataset_module = dataset_module_factory(
  File "/home/udf/9651816061/datasets.zip/datasets/load.py", line 1879, in dataset_module_factory
    raise e1 from None
  File "/home/udf/9651816061/datasets.zip/datasets/load.py", line 1861, in dataset_module_factory
    ).get_module()
  File "/home/udf/9651816061/datasets.zip/datasets/load.py", line 1206, in get_module
    dataset_readme_path = cached_path(
  File "/home/udf/9651816061/datasets.zip/datasets/utils/file_utils.py", line 190, in cached_path
    output_path = get_from_cache(
  File "/home/udf/9651816061/datasets.zip/datasets/utils/file_utils.py", line 493, in get_from_cache
    os.makedirs(cache_dir, exist_ok=True)
  File "/usr/lib/python_udf/623a1518b72a6a0aec06c1753ec173f03064e933643f0dd79a2e0784916fabb5/lib/python3.10/os.py", line 215, in makedirs
    makedirs(head, exist_ok=exist_ok)
  File "/usr/lib/python_udf/623a1518b72a6a0aec06c1753ec173f03064e933643f0dd79a2e0784916fabb5/lib/python3.10/os.py", line 215, in makedirs
    makedirs(head, exist_ok=exist_ok)
  File "/usr/lib/python_udf/623a1518b72a6a0aec06c1753ec173f03064e933643f0dd79a2e0784916fabb5/lib/python3.10/os.py", line 215, in makedirs
    makedirs(head, exist_ok=exist_ok)
  File "/usr/lib/python_udf/623a1518b72a6a0aec06c1753ec173f03064e933643f0dd79a2e0784916fabb5/lib/python3.10/os.py", line 225, in makedirs
    mkdir(name, mode)
OSError: [Errno 30] Read-only file system: '/home/udf/.cache'
 in function GET_DATA with handler compute

In [None]:
ds = session.table(f"{db}.{stage}.{table_name}")
ds.show()