---
aliases:
- /2025/08/26/SparkSnowflake
badges: true
categories:
- spark
- snowflake
date: '2025-08-26'
description: Details on how to run Apache Spark in Snowflake.
output-file: 2025-08-26-sparkinSnowflake.html
title: Running Apache Spark in Snowflake
toc: true
---

# Running Apache Spark in Snowflake

Dataset: https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv

![image-20240507155528488](./images/image-sparksnowflake.png)

test



In [5]:
# Snowflake Connection Setup for Cursor/Jupyter
import os
from dotenv import load_dotenv
from snowflake.snowpark import Session
# from snowflake import snowpark_connect  # Not available in standard installations

# Load environment variables from .env file
load_dotenv()

# Connection parameters - Update these with your Snowflake credentials
# connection_parameters = {
#     'account': os.getenv('SNOWFLAKE_ACCOUNT', 'your_account_identifier'),
#     'user': os.getenv('SNOWFLAKE_USER', 'your_username'), 
#     'password': os.getenv('SNOWFLAKE_PASSWORD', 'your_password'),
#     'role': os.getenv('SNOWFLAKE_ROLE', 'SYSADMIN'),
#     'warehouse': os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH'),
#     'database': os.getenv('SNOWFLAKE_DATABASE', 'AICOLLEGE'),
#     'schema': os.getenv('SNOWFLAKE_SCHEMA', 'PUBLIC')
# }

# Alternative: Import from config file
import sys
import os

# Try to import snowflake_config from different possible locations
try:
    from snowflake_config import SNOWFLAKE_CONFIG
    connection_parameters = SNOWFLAKE_CONFIG
except ImportError:
    # If direct import fails, try adding parent directories to path
    current_dir = os.getcwd()
    parent_dirs = [
        current_dir,
        os.path.dirname(current_dir),
        os.path.dirname(os.path.dirname(current_dir))
    ]
    
    config_found = False
    for dir_path in parent_dirs:
        config_path = os.path.join(dir_path, 'snowflake_config.py')
        if os.path.exists(config_path):
            sys.path.insert(0, dir_path)
            try:
                from snowflake_config import SNOWFLAKE_CONFIG
                connection_parameters = SNOWFLAKE_CONFIG
                config_found = True
                print(f"✅ Found snowflake_config.py in: {dir_path}")
                break
            except ImportError:
                continue
    
    if not config_found:
        print("❌ Could not find snowflake_config.py")
        # Fallback to direct configuration with proper private key loading
        from cryptography.hazmat.primitives import serialization
        
        def load_private_key_fallback(private_key_path):
            try:
                expanded_path = os.path.expanduser(private_key_path)
                if not os.path.exists(expanded_path):
                    print(f"❌ Private key file not found: {expanded_path}")
                    return None
                    
                with open(expanded_path, 'rb') as key_file:
                    private_key = serialization.load_pem_private_key(
                        key_file.read(),
                        password=None
                    )
                
                private_key_bytes = private_key.private_bytes(
                    encoding=serialization.Encoding.DER,
                    format=serialization.PrivateFormat.PKCS8,
                    encryption_algorithm=serialization.NoEncryption()
                )
                return private_key_bytes
            except Exception as e:
                print(f"❌ Error loading private key: {e}")
                return None
        
        private_key_bytes = load_private_key_fallback('~/.ssh/mlops_hol_rsa_private_key.pem')
        
        connection_parameters = {
            'account': 'SFSEEUROPE-MCASTRO_AWS1_USWEST2',
            'user': 'mlops_user',
            'role': 'aicollege',
            'warehouse': 'aicollege',
            'database': 'aicollege',
            'schema': 'PUBLIC',
            'authenticator': 'SNOWFLAKE_JWT',
            'private_key': private_key_bytes
        }

try:
    # Create Snowpark session
    session = Session.builder.configs(connection_parameters).create()
    
    # Note: We'll use Snowpark DataFrames instead of Spark DataFrames
    # Snowpark provides similar functionality to Spark for data processing
    
    print("✅ Connected to Snowflake successfully!")
    print(f"Current database: {session.get_current_database()}")
    print(f"Current schema: {session.get_current_schema()}")
    
except Exception as e:
    print(f"❌ Connection failed: {str(e)}")
    print("Please check your Snowflake credentials in snowflake_config.py or .env file")


❌ Connection failed: 250001 (08001): Failed to connect to DB: SFSEEUROPE-MCASTRO_AWS1_USWEST2.snowflakecomputing.com:443. JWT token is invalid. [715fe50c-9611-443c-a239-b7f2b520c157]
Please check your Snowflake credentials in snowflake_config.py or .env file


In [None]:
# Set up the env for Java libraries and enable the Spark Connect Mode
import os
import traceback

os.environ['JAVA_HOME'] = os.environ["CONDA_PREFIX"]
os.environ['JAVA_LD_LIBRARY_PATH'] = os.path.join(os.environ["CONDA_PREFIX"], 'lib', 'server')
os.environ["SPARK_LOCAL_HOSTNAME"] = "127.0.0.1"
os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"

from snowflake import snowpark_connect
from snowflake.snowpark.context import get_active_session


session = get_active_session()
snowpark_connect.start_session(snowpark_session = session)


# Here is your normal pyspark code. You can of course have them in other Python Cells
spark = snowpark_connect.get_session()
df = spark.sql("show schemas").limit(10)
df.show()

In [None]:
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]

df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
#display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
df1.show() #The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.



In [None]:
df_csv = spark.read.csv(f"@aicollege.public.setup/row.csv",
    header=True,
    inferSchema=True,
    sep=",")
#display(df_csv)
df_csv.show()


In [None]:
df_csv.printSchema()
df1.printSchema()

In [None]:
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema()

In [None]:
df = df1.union(df_csv)
df.show()

In [None]:
df.filter(df["Count"] > 50).show()

In [None]:
df.where(df["Count"] > 50).show()



In [None]:
from pyspark.sql.functions import desc
df.select("First_Name", "Count").orderBy(desc("Count")).show()

In [None]:
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
subsetDF.show()

In [None]:
df.write.mode("overwrite").saveAsTable("AICOLLEGE.PUBLIC.MYFIRSTSPARK")

In [None]:
df.write.format("json").mode("overwrite").save(f"@aicollege.public.setup/myfirstspark")

In [None]:
#spark.read.format("json").json(f"@aicollege.public.setup/myfirstspark")



In [None]:
df.selectExpr("Count", "upper(County) as big_name").show()



In [None]:
from pyspark.sql.functions import expr
df.select("Count", expr("lower(County) as little_name")).show()

In [None]:
spark.sql(f"SELECT * FROM AICOLLEGE.PUBLIC.MYFIRSTSPARK").show()

