## Working with Snowpark in Snowpark Python Workspace

This notebook provides an introduction to using the Snowpark library within the Snowpark Python Workspace in Keboola. Snowpark allows you to build Python transformations that process data directly in Snowflake without moving the data out of it. This guide covers the basics of setting up Snowpark, executing SQL statements, and performing data transformations within Snowflake.

### Key Features of Snowpark:

- The Snowpark API provides intuitive programming constructs for building SQL statements.
- Benefit from intelligent code completion and type checking when using native language constructs.
- Snowpark supports pushdown for all operations, including Snowflake UDFs.
- No need for a separate cluster outside of Snowflake for computations; all computations are performed within Snowflake.

#### Additional Resources:
- [Snowpark Overview](https://www.snowflake.com/snowpark/)
- [Snowpark Python Developer Guide](https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html)

---

### Snowpark in Keboola

Snowpark is available as a new type of Python workspace and transformation in Keboola. It’s intended to provide a seamless experience for data processing and transformations within Snowflake. Below, we will guide you through setting up Snowpark, writing Snowpark code, and saving results back to Keboola Storage.

Keboola Snowpark Python Workspace creates a dedicated Snowflake schema behind the scenes, with read permissions to the entire project Storage.

*__TO USE SNOWPARK IN KEBOOLA, YOU NEED TO ENABLE THE SNOWPARK FEATURE IN YOUR PROJECT SETTINGS (OR CONTACT OUR SUPPORT TEAM) AND USE THIS NOTEBOOK IN SNOWPARK PYTHON WORKSPACE__*


In [None]:
# Import necessary libraries
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from keboola.component import CommonInterface
import pandas as pd
import json
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)

# Setting up connection parameters from variables stored in the Workspace's environment
ci = CommonInterface()
connection_parameters = ci.configuration.workspace_credentials

# Create a Snowpark session
session = Session.builder.configs(connection_parameters).create()

logging.info("Snowpark session created successfully.")


---
### Listing Tables in Snowflake

In this section, we'll list all the tables available in the specified schema using Snowpark.


In [None]:
# List all tables in the schema
tables = session.sql("SELECT * FROM INFORMATION_SCHEMA.TABLES").collect()

# Convert the list of tables to a DataFrame for better readability
tables_df = pd.DataFrame(tables)

# Display the tables
display(tables_df)


---
### Loading Data from a Snowflake Table

Next, we’ll load data from a specific table into a DataFrame for processing. Replace 'your_table_name' with the actual table name you want to load.


In [None]:
# Load data from a specific table
table_name = 'your_table_name' # Example: '"in.component-name"."table-name"' -> notice that lowercase names of Schema and Table need to be quoted with "
df = session.table(table_name).to_pandas()

# Display the first few rows of the DataFrame
display(df.head())


---
### Data Transformation

Perform data transformations using Snowpark. Here, we'll demonstrate some common transformations like filtering, selecting columns, and creating new columns.

**Note:** Make sure to replace the placeholder column names with the actual column names from your dataset.


In [None]:
# Example transformation: Filter rows, select columns, and create a new column
# Replace 'column_name', 'column1', 'column2', etc. with your actual column names

# Filter rows where 'column_name' is greater than 100
filtered_df = session.table(table_name).filter(col("column_name") > 100)

# Select specific columns 'column1' and 'column2'
selected_df = filtered_df.select(col("column1"), col("column2"))

# Create a new column 'new_column' as the sum of 'column1' and 'column2'
transformed_df = selected_df.withColumn("new_column", col("column1") + col("column2"))

# Convert the transformed Snowpark DataFrame to a Pandas DataFrame
result_df = transformed_df.to_pandas()

# Display the transformed DataFrame
display(result_df.head())


---
### Saving Results to Keboola Storage

In Snowpark Python Workspaces, you don't use the `out/tables/` folder and CSV files to load results into storage. Instead, you need to write your outputs directly into the connected Snowflake workspace.

We'll save the transformed data back to Snowflake as a new table. After writing the data to Snowflake, you can configure the Table Output Mapping in your transformation accordingly.


In [None]:
# Define the name of the output table in Snowflake
output_table_name = 'your_output_table_name'  # Replace with your desired table name

# Save the transformed DataFrame as a new table in Snowflake
# Overwrite mode is used here; you can change it to 'append' if needed
transformed_df.write.mode("overwrite").save_as_table(output_table_name)

logging.info(f"Transformed data saved to Snowflake table: {output_table_name}")


---
# Full Transformation Example

It is expected that your script will create new objects (tables) in the linked Snowflake workspace *as shown in the cells above*.

You will then use a standard Table Output Mapping from your Snowpark Python Transformation. 

In our example we create a new table named __PACKAGE_NAME_DISTINCT__. 

## Optional: Dynamic output mapping
Alternatively, instead of defining the Table Output Mapping in your Python Snowpark Transformation, you can produce a manifest file for the created objects and then you don't have to specify the output mapping.
Please not that this might have an impact to the transparency of "what is produced by the Transformation" as it won't be immediatelly visible in the metadata definition of output mapping.

```Python
from keboola.component import CommonInterface

ci = CommonInterface()

# set destination as resulting output mapping, then the UI OM can be omitted
result_table_id = "out.c-my-new-bucket.PACKAGE_NAME_DISTINCT"
out_table_def = ci.create_out_table_definition("PACKAGE_NAME_DISTINCT", primary_key=[], destination=result_table_id)
ci.write_manifest(out_table_def)
```

___

Following example is a full Transformation executable in your environment that'll produce a new table in bucket named __snowpark-demo__.

```Python
# ===== Imports =====
import os
import pandas as pd

from snowflake.snowpark import Session
from snowflake.snowpark.functions import udf, col, lit, is_null, iff, initcap
from keboola.component import CommonInterface

# ===== Initiation =====

# Setting up connection parameters from variables stored in the Workspace's environment
ci = CommonInterface()
connection_parameters = ci.configuration.workspace_credentials

# Initiate the session
session = Session.builder.configs(connection_parameters).create()
print("New Snowpark session created in a dedicated Snowflake workspace named", session.get_current_schema())

# ===== Main code - reading object from Snowflake and producing new table =====

# Define the object
df = session.table("INFORMATION_SCHEMA.PACKAGES").filter(col("LANGUAGE") == 'python').select(col("PACKAGE_NAME")).distinct()

# Write the object as a new table 
df.write.mode("overwrite").save_as_table("PACKAGE_NAME_DISTINCT")

# ===== Define dynamic output mapping =====

# set destination as resulting output mapping, then the UI OM can be omitted
result_table_id = "out.c-snowpark-demo.PACKAGE_NAME_DISTINCT"
out_table_def = ci.create_out_table_definition("PACKAGE_NAME_DISTINCT", primary_key=[], destination=result_table_id)
ci.write_manifest(out_table_def)
```