# Using Snowpark to implement ZSTD dictionary decompression
Snowflake has compress and decompress function that support ZSTD, but it does not support a user-supplied dictionary file. This code demonstrates:
1. Connecting to Snowflake using the Snowpark for Python connector.
2. Creating fake data using the Faker library and creating a ZSTD dictionary file. 
3. Uploading that faked & compressed to a Snowflake table.
4. Upload our dictionary file to a stage and create a Python UDF to decompress data using that file.
5. Calling that Python UDF using ordinary SQL


## 1. Connect to Snowflake using the Snowpark for Python connector

In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark.types import PandasSeries
import streamlit as st
import pandas as pd

from faker import Faker
import pyzstd

from cachetools import cached
import sys
import os

import json 
with open(".env/creds.json") as f:
   connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()

# test if we have a connection
session.sql("select current_warehouse() wh, current_database() db, current_schema() schema, current_version() v").show()


2023-02-27 10:54:50.187 INFO    snowflake.connector.connection: Snowflake Connector for Python Version: 2.7.12, Python Version: 3.8.15, Platform: macOS-10.16-x86_64-i386-64bit
2023-02-27 10:54:50.187 INFO    snowflake.connector.connection: This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
2023-02-27 10:54:50.188 INFO    snowflake.connector.connection: Setting use_openssl_only mode to False
2023-02-27 10:54:51.077 INFO    snowflake.snowpark.session: Snowpark Session information: 
"version" : 1.1.0,
"python.version" : 3.8.15,
"python.connector.version" : 2.7.12,
"python.connector.session.id" : 647878641524766,
"os.name" : Darwin

2023-02-27 10:54:51.086 INFO    snowflake.connector.cursor: query: [select current_warehouse() wh, current_database() db, current_schema() schema, c...]
2023-02-27 10:54:51.16

-------------------------------------------------------
|"WH"    |"DB"                     |"SCHEMA"  |"V"    |
-------------------------------------------------------
|XSMALL  |COMMODITIES_FORECASTING  |PUBLIC    |7.6.3  |
-------------------------------------------------------



## 2. Create synthetic data and create a ZSTD dictionary.

### Create 10,000 fake address records, encoded as bytes. 

In [2]:
fake = Faker()
fake_addresses = [bytes(fake.address(), 'utf-8') for i in range(10000)]
fake_addresses

[b'2557 Stephen Ridges\nKathrynfurt, SD 74800',
 b'0338 Kelly Islands\nNorth Andrew, KS 98808',
 b'04763 Roberto Trafficway Suite 269\nMegantown, FL 94315',
 b'0027 Smith Pass\nEast Lisa, WV 67350',
 b'6647 Phillips Island\nNathanton, SC 84735',
 b'00585 Lindsey Valleys\nSotoshire, OR 23992',
 b'2678 Garner Meadows Apt. 342\nNew Michael, SC 53274',
 b'626 Brown Heights\nLuishaven, ME 01149',
 b'85021 Jamie Corners\nPetershire, CT 26178',
 b'6605 Ronald Key Suite 164\nSouth Mark, RI 89402',
 b'022 Burch Neck Suite 823\nNorth Ebonystad, MO 74608',
 b'28502 Patricia Point\nSouth Maryville, HI 82652',
 b'12267 Allen Creek Apt. 071\nToddport, MO 27194',
 b'06469 Vaughn Mountain Apt. 898\nPort Amandaton, FL 16869',
 b'1374 Christine Lock\nMarshallland, AR 41901',
 b'029 Romero Light Apt. 817\nSouth Victorberg, NY 23543',
 b'PSC 4026, Box 8156\nAPO AA 10692',
 b'Unit 0251 Box 4185\nDPO AE 57151',
 b'732 Michele Unions Suite 869\nWest Emily, TX 54291',
 b'0466 Graham Ramp\nLongport, TN 42408',

### Create ZSTD dictionary file, compress fake addresses, and write compressed addresses to a Snowflake table.

In [6]:
# create dictionary
zstd_dict = pyzstd.train_dict(fake_addresses, 100*1024)

# write our dictionary to a file
with open("zstd_address_dictionary.d", "wb") as f:
    f.write(zstd_dict.dict_content)

In [8]:
compressed_fake_addresses = [pyzstd.compress(i, zstd_dict=zstd_dict) for i in fake_addresses]
compressed_fake_addresses



[b'(\xb5/\xfd#7\xb1K\x11)\xe5\x00\x00\xa3\xc0\x01f\xae\xfd\x12bY\x0c\x05\xfcl\x81\x96+\xba\xa1t\x16(\xc3>\x83\xf0\x19\xd4Q',
 b'(\xb5/\xfd#7\xb1K\x11)\xad\x00\x00s\x00\x01n\xee\x9f\x16\x03\xfc\xb33Z\x00\xeff\x8b,\x16\x0c"\x1a',
 b'(\xb5/\xfd#7\xb1K\x116\r\x01\x00c\x01\x04{\xa9_$48\x9a\xe8\x19A-!N\x19\xad\x16\x03\xfc\xadb\x10\x9c\xeb\x90:\x12\x16Da\x1c',
 b'(\xb5/\xfd#7\xb1K\x11#\xc5\x00\x00\xc3\x00\x02\xb6\xd9\\*+&h\x16\x03\xfc\x12\x9a\nQ\x0e)D\x91\xe9sD',
 b'(\xb5/\xfd#7\xb1K\x11(\xc5\x00\x00\x93@\x01\x9b\xad\x9bV\xe6\x04\xfc\x7f\xed\x82I\xe0/\x130:\xdc\x02>\xf8\x04',
 b"(\xb5/\xfd#7\xb1K\x11)\xdd\x00\x00\xd3\x00\x02\xf8\x9fX',\xebk\x16\x04\xfc\xd2\xbdD\xb5\xd9H~\x08D'\xba\x91\xb3K",
 b'(\xb5/\xfd#7\xb1K\x112\xed\x00\x00\xc3\x00\x02\xda\x98[\x18\x94O3\x06\x05\xfc\xab\x1a\xf0c1\x0c\xfa\xd216\xdd\rS\x88:\r',
 b"(\xb5/\xfd#7\xb1K\x11%\xc5\x00\x00\xc3\x00\x02\xafwV!\x1a\x17\x12\xe4\x03\xfc\x08\x12\x82\xda\xa4_\xb7\xcdF'\x1a",
 b"(\xb5/\xfd#7\xb1K\x11(\xcd\x00\x00\xa3\x80\x01\xde\xc7,\x87\

In [9]:
# write our fake addresses to a Snowflake table
sp_df = session.create_dataframe(compressed_fake_addresses, schema=["comressed_address"])
sp_df.write.mode('overwrite').save_as_table("compressed_addresses")

2023-02-27 11:00:38.650 INFO    snowflake.connector.cursor: query: [SELECT NULL :: BINARY AS "COMRESSED_ADDRESS"]
2023-02-27 11:00:39.376 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:00:39.378 INFO    snowflake.connector.cursor: query: [CREATE  OR  REPLACE  SCOPED TEMPORARY  TABLE SNOWPARK_TEMP_TABLE_FWKSN45YIW("COM...]
2023-02-27 11:00:41.763 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:00:41.799 INFO    snowflake.connector.cursor: query: [INSERT  INTO SNOWPARK_TEMP_TABLE_FWKSN45YIW("COMRESSED_ADDRESS") VALUES (?)]
2023-02-27 11:00:46.317 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:00:46.320 INFO    snowflake.connector.cursor: query: [CREATE  OR  REPLACE    TABLE  compressed_addresses AS  SELECT  *  FROM ( SELECT ...]
2023-02-27 11:00:48.209 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:00:48.211 INFO    snowflake.connector.cursor: query: [DROP  TABLE  If  EXISTS SNOWPARK_TEMP_TABLE

## Deploy decompression UDF for our data using the dictionary

### Upload dictionary and create UDF

In [10]:
# create a stage to hold our dictionary file and UDF
session.sql("""create stage if not exists zstd_decompress""").collect()

2023-02-27 11:01:44.189 INFO    snowflake.connector.cursor: query: [create stage if not exists zstd_decompress]
2023-02-27 11:01:44.671 INFO    snowflake.connector.cursor: query execution done


[Row(status='ZSTD_DECOMPRESS already exists, statement succeeded.')]

In [11]:
# write our dictionary file to our stage
session.file.put("zstd_address_dictionary.d", "@zstd_decompress", overwrite=True)

2023-02-27 11:02:00.357 INFO    snowflake.connector.cursor: query: [PUT 'file://zstd_address_dictionary.d' '@zstd_decompress'  parallel = 4 source_c...]
2023-02-27 11:02:00.785 INFO    snowflake.connector.cursor: query execution done


[PutResult(source='zstd_address_dictionary.d', target='zstd_address_dictionary.d.gz', source_size=102400, target_size=44640, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

### Create a UDF
Our UDF loads our dictionary file from the stage and uses the pyzstd library to decompress a binary field and return a string. 

We use a pandas_udf with inputs and outputs of a Pandas series. This is a vectorized UDF which enables better performance. 

In [37]:
@cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = file.read()
                     return m

@pandas_udf(name="address_decompress", 
     is_permanent=True,
     stage_location="@zstd_decompress",
     imports=["@zstd_decompress/zstd_address_dictionary.d"], 
     packages=["pyzstd", "cachetools"],
     replace=True
     )
def address_decompress(input_df:PandasSeries[bytes])-> PandasSeries[str]:
    d = read_file("zstd_address_dictionary.d") 
    zstd_dict = pyzstd.ZstdDict(d)
    output_series = input_df.apply(lambda x: pyzstd.decompress(x, zstd_dict=zstd_dict).decode('utf-8'))
    return output_series 

2023-02-27 11:22:17.227 INFO    snowflake.connector.cursor: query: [ls '@zstd_decompress']
2023-02-27 11:22:17.627 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:22:17.630 INFO    snowflake.connector.cursor: query: [SELECT "name" FROM ( SELECT  *  FROM  TABLE ( RESULT_SCAN('01aa9d72-0000-fe81-00...]
2023-02-27 11:22:18.077 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:22:18.084 INFO    snowflake.connector.cursor: query: [SELECT  *  FROM information_schema.packages]
2023-02-27 11:22:18.192 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:22:18.194 INFO    snowflake.connector.cursor: query: [SELECT "PACKAGE_NAME", array_agg("VERSION") AS "ARRAY_AGG(VERSION)" FROM ( SELEC...]
2023-02-27 11:22:19.130 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:22:19.147 INFO    snowflake.connector.cursor: query: [PUT 'file:///tmp/placeholder/udf_py_1691318770.zip' '@zstd_decompress/address_de...]
2023-02-27 11

## Call our Python UDF using normal SQL

In [38]:
session.sql("select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS) from compressed_addresses").show()

2023-02-27 11:22:36.737 INFO    snowflake.connector.cursor: query: [select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS) from compressed_...]
2023-02-27 11:22:37.570 INFO    snowflake.connector.cursor: query execution done
2023-02-27 11:22:37.571 INFO    snowflake.connector.cursor: query: [SELECT  *  FROM (select COMRESSED_ADDRESS, address_decompress(COMRESSED_ADDRESS)...]
2023-02-27 11:22:39.068 INFO    snowflake.connector.cursor: query execution done


------------------------------------------------------------------------------------------------
|"COMRESSED_ADDRESS"                                 |"ADDRESS_DECOMPRESS(COMRESSED_ADDRESS)"  |
------------------------------------------------------------------------------------------------
|bytearray(b'(\xb5/\xfd#7\xb1K\x11)\xe5\x00\x00\...  |2557 Stephen Ridges                      |
|                                                    |Kathrynfurt, SD 74800                    |
|bytearray(b"(\xb5/\xfd#7\xb1K\x114\x05\x01\x00\...  |098 Williams Mall Apt. 092               |
|                                                    |Port Adrianbury, ID 86938                |
|bytearray(b'(\xb5/\xfd#7\xb1K\x11.\xc5\x00\x00\...  |287 John Center Suite 051                |
|                                                    |Robertstad, MI 40088                     |
|bytearray(b'(\xb5/\xfd#7\xb1K\x11&\xc5\x00\x00\...  |2684 Carter Ramp                         |
|                             