# Using Snowpark to implement ZSTD dictionary decompression
Snowflake has compress and decompress function that support ZSTD, but it does not support a user-supplied dictionary file. This code demonstrates:
1. Connecting to Snowflake using the Snowpark for Python connector.
2. Creating fake data using the Faker library and creating a ZSTD dictionary file. 
3. Uploading that faked & compressed to a Snowflake table.
4. Upload our dictionary file to a stage and create a Python UDF to decompress data using that file.
5. Calling that Python UDF using ordinary SQL


## 1. Connect to Snowflake using the Snowpark for Python connector

In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark.types import PandasSeries
import streamlit as st
import pandas as pd

from faker import Faker
import pyzstd

from cachetools import cached
import sys
import os

import json 
with open(".env/creds.json") as f:
   connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()

# test if we have a connection
session.sql("select current_warehouse() wh, current_database() db, current_schema() schema, current_version() v").show()


2023-02-27 12:30:39.685 INFO    snowflake.connector.connection: Snowflake Connector for Python Version: 2.7.12, Python Version: 3.8.15, Platform: macOS-10.16-x86_64-i386-64bit
2023-02-27 12:30:39.686 INFO    snowflake.connector.connection: This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2023-02-27 12:30:39.686 INFO    snowflake.connector.connection: Setting use_openssl_only mode to False
2023-02-27 12:30:40.722 INFO    snowflake.snowpark.session: Snowpark Session information: 
"version" : 1.1.0,
"python.version" : 3.8.15,
"python.connector.version" : 2.7.12,
"python.connector.session.id" : 647878641524774,
"os.name" : Darwin

2023-02-27 12:30:40.728 INFO    snowflake.connector.cursor: query: [select current_warehouse() wh, current_database() db, current_schema() schema, c...]
2023-02-27 12:30:40.91

-------------------------------------------------------
|"WH"    |"DB"                     |"SCHEMA"  |"V"    |
-------------------------------------------------------
|XSMALL  |COMMODITIES_FORECASTING  |PUBLIC    |7.6.3  |
-------------------------------------------------------



## 2. Create synthetic data and create a ZSTD dictionary.

### Create 10,000 fake address records, encoded as bytes. 

In [2]:
fake = Faker()
fake_addresses = [bytes(fake.address(), 'utf-8') for i in range(10000)]
fake_addresses[:10]

[b'Unit 0108 Box 7551\nDPO AP 91592',
 b'47788 Morris Stream Suite 478\nEast Danieltown, IA 32108',
 b'619 Brandy Union Suite 977\nNorth Sherryton, AK 52339',
 b'Unit 9457 Box 0274\nDPO AP 16416',
 b'2344 Amber Circles\nWest Connie, IA 68497',
 b'61131 Patterson Parkway\nWest Patrickfort, MT 48632',
 b'27795 Johnson Stream Suite 993\nNorth Stephenmouth, NE 49642',
 b'9609 Sarah Camp Apt. 285\nWest Christineside, ME 08302',
 b'481 Steven Bridge\nEast Lauraborough, PW 03394',
 b'7981 Pugh Mountain\nAmychester, MP 33537']

### Create ZSTD dictionary file, compress fake addresses, and write compressed addresses to a Snowflake table.

In [3]:
# create dictionary
zstd_dict = pyzstd.train_dict(fake_addresses, 100*1024)

# write our dictionary to a file
with open("zstd_address_dictionary.d", "wb") as f:
    f.write(zstd_dict.dict_content)

In [4]:
compressed_fake_addresses = [pyzstd.compress(i, zstd_dict=zstd_dict) for i in fake_addresses]
compressed_fake_addresses[:10]

[b'(\xb5/\xfd#e-\x80e\x1f\xad\x00\x00\xe3\x00\x02\xf8{_$\xbb\xedv\x16\x03\xfc\x80\xbd\xa4B(\x84k\x01',
 b'(\xb5/\xfd#e-\x80e7\xf5\x00\x00\xd3\xc0\x01n\x87\xe9\xfdq\xef\xd6\x06\xfc}\x1cB\xa8A\x92\xb4*\xa4ul\x18mJ\x94\xed\x02\x1d',
 b'(\xb5/\xfd#e-\x80e4\xfd\x00\x00\x03\x01\x03\x9f\x89\xfb2\x1ayVL^t\xfcq\x04\xfc\x8a(\x92\x8d1\xbd\x8f\xf6\x80Q\xab\x16\x98\x1c',
 b'(\xb5/\xfd#e-\x80e\x1f\xa5\x00\x00\xa3\x80\x01|\xca\x87\xd6\xab\x01\x03\xfc\x08\xb1V\xdeR\xe1-\x02\x8a',
 b'(\xb5/\xfd#e-\x80e(\xed\x00\x00#A\x03\xfd\xea\\\x98M\xa9\x80\x0c\x1e\x98RM\x0c\x03\xfcX\xf6\x06\xe7k\x8aT\x10\xa0Ol',
 b'(\xb5/\xfd#e-\x80e2\xcd\x00\x00\xa3\x80\x01\x98\xec\xba#\xef8\x04\xfc\xccW\x88\xaf(\x135\xee4\x91\tv\xa1(',
 b'(\xb5/\xfd#e-\x80e;\xf5\x00\x00\xe3\x00\x02\xa8\xfcZ\x1f\x9d\xff\xef\xc6\x05\xfcA\x83\xf2\x9e1\xc2\xc5\x02\xc1i\xa8Z\xfa\xf2&\xa1E',
 b'(\xb5/\xfd#e-\x80e5\xf5\x00\x00\xe3\x00\x02h\xe9V\x1f\xeb\xf8\xc6\x1f\x06\xfc\xb4\xcc\x06\xc9\x1d\x04\xb0\x7f\x9f\x19\xf91\xbd,<Q\x01',
 b'(\xb5/\xfd#e-\x80e-\x

In [5]:
# write our fake addresses to a Snowflake table
sp_df = session.create_dataframe(compressed_fake_addresses, schema=["comressed_address"])
sp_df.write.mode('overwrite').save_as_table("compressed_addresses")

2023-02-27 12:30:42.570 INFO    snowflake.connector.cursor: query: [SELECT NULL :: BINARY AS "COMRESSED_ADDRESS"]
2023-02-27 12:30:42.758 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:42.761 INFO    snowflake.connector.cursor: query: [CREATE  OR  REPLACE  SCOPED TEMPORARY  TABLE SNOWPARK_TEMP_TABLE_4BSYBF7MW5("COM...]
2023-02-27 12:30:43.009 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:43.109 INFO    snowflake.connector.cursor: query: [INSERT  INTO SNOWPARK_TEMP_TABLE_4BSYBF7MW5("COMRESSED_ADDRESS") VALUES (?)]
2023-02-27 12:30:45.636 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:45.640 INFO    snowflake.connector.cursor: query: [CREATE  OR  REPLACE    TABLE  compressed_addresses AS  SELECT  *  FROM ( SELECT ...]
2023-02-27 12:30:46.852 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:46.853 INFO    snowflake.connector.cursor: query: [DROP  TABLE  If  EXISTS SNOWPARK_TEMP_TABLE

## Deploy decompression UDF for our data using the dictionary

### Upload dictionary and create UDF

In [6]:
# create a stage to hold our dictionary file and UDF
session.sql("""create stage if not exists zstd_decompress""").collect()

2023-02-27 12:30:46.999 INFO    snowflake.connector.cursor: query: [create stage if not exists zstd_decompress]
2023-02-27 12:30:47.189 INFO    snowflake.connector.cursor: query execution done


[Row(status='ZSTD_DECOMPRESS already exists, statement succeeded.')]

In [7]:
# write our dictionary file to our stage
# this is our dictionary for our Faker data. you'll want to use your own dictionary
session.file.put("zstd_address_dictionary.d", "@zstd_decompress", overwrite=True)

2023-02-27 12:30:47.220 INFO    snowflake.connector.cursor: query: [PUT 'file://zstd_address_dictionary.d' '@zstd_decompress'  parallel = 4 source_c...]
2023-02-27 12:30:47.390 INFO    snowflake.connector.cursor: query execution done


[PutResult(source='zstd_address_dictionary.d', target='zstd_address_dictionary.d.gz', source_size=102400, target_size=44928, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

### Create a UDF
Our UDF loads our dictionary file from the stage and uses the pyzstd library to decompress a binary field and return a string. 

We use a pandas_udf with inputs and outputs of a Pandas series. This is a [Vectorized UDF](https://docs.snowflake.com/en/developer-guide/snowpark/python/creating-udfs#label-snowpark-python-udf-vectorized) which enables better performance. 

In [8]:
@cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = file.read()
                     return m

@pandas_udf(name="address_decompress", 
     is_permanent=True,
     stage_location="@zstd_decompress",
     imports=["@zstd_decompress/zstd_address_dictionary.d"], 
     packages=["pyzstd", "cachetools"],
     replace=True
     )
def address_decompress(input_df:PandasSeries[bytes])-> PandasSeries[str]:
    d = read_file("zstd_address_dictionary.d") 
    zstd_dict = pyzstd.ZstdDict(d)
    output_series = input_df.apply(lambda x: pyzstd.decompress(x, zstd_dict=zstd_dict).decode('utf-8'))
    return output_series 

2023-02-27 12:30:48.153 INFO    snowflake.connector.cursor: query: [ls '@zstd_decompress']
2023-02-27 12:30:48.285 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:48.286 INFO    snowflake.connector.cursor: query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01aa9db6-0000-fe81-00...]
2023-02-27 12:30:48.635 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:48.637 INFO    snowflake.connector.cursor: query: [SELECT  *  FROM information_schema.packages]
2023-02-27 12:30:48.730 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:48.733 INFO    snowflake.connector.cursor: query: [SELECT "PACKAGE_NAME", array_agg("VERSION") AS "ARRAY_AGG(VERSION)" FROM ( SELEC...]
2023-02-27 12:30:49.525 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:30:49.532 INFO    snowflake.connector.cursor: query: [PUT 'file:///tmp/placeholder/udf_py_143628847.zip' '@zstd_decompress/address_dec...]
2023-02-27 12

## Call our Python UDF using normal SQL

In [9]:
session.sql("select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS) from compressed_addresses").show()

2023-02-27 12:31:02.153 INFO    snowflake.connector.cursor: query: [select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS) from compressed_...]
2023-02-27 12:31:03.029 INFO    snowflake.connector.cursor: query execution done
2023-02-27 12:31:03.031 INFO    snowflake.connector.cursor: query: [SELECT  *  FROM (select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS)...]
2023-02-27 12:31:04.494 INFO    snowflake.connector.cursor: query execution done


------------------------------------------------------------------------------------------------
|"COMRESSED_ADDRESS"                                 |"ADDRESS_DECOMPRESS(COMRESSED_ADDRESS)"  |
------------------------------------------------------------------------------------------------
|bytearray(b'(\xb5/\xfd#e-\x80e\x1f\xad\x00\x00\...  |Unit 0108 Box 7551                       |
|                                                    |DPO AP 91592                             |
|bytearray(b'(\xb5/\xfd#e-\x80e2\xfd\x00\x00#\x0...  |4336 Graham Wall Apt. 212                |
|                                                    |New Donnaburgh, LA 63045                 |
|bytearray(b'(\xb5/\xfd#e-\x80e/\xdd\x00\x00\xb3...  |522 Pennington Squares                   |
|                                                    |Port Katiefurt, MN 43940                 |
|bytearray(b'(\xb5/\xfd#e-\x80e9\xf5\x00\x00\xe3...  |6776 Andre Corner Suite 616              |
|                             