### Main Imports

In [2]:
import os, sys, time

# Spark SQL
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *


### Create a new Spark Session 

In [3]:
extra_jars = ','.join([
    "net.snowflake:snowflake-jdbc:3.8.7",
    "net.snowflake:spark-snowflake_2.11:2.5.1-spark_2.4"
])
extra_jars

'net.snowflake:snowflake-jdbc:3.8.7,net.snowflake:spark-snowflake_2.11:2.5.1-spark_2.4'

In [4]:
spark = SparkSession.builder \
        .appName("Spark Snowflake Experiments") \
        .master("local[*]") \
        .config("spark.driver.memory", "4G") \
        .config("spark.driver.maxResultSize", "2G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "500m")\
        .config("spark.jars.repositories", "http://repo.spring.io/plugins-release") \
        .getOrCreate()

spark.version

'2.4.0'

### Import Snowflake Config

In [5]:
from dotenv import load_dotenv
from pathlib import Path 

# Load all setting from the environment file
env_path = Path('.') / '.env-dev'
load_dotenv(dotenv_path=env_path, verbose=True, )

sfOptions = {
    "sfURL" : f"{os.environ['SNOWSQL_ACCOUNT']}.snowflakecomputing.com",
    "sfUser" : os.environ['SNOWSQL_USER'],
    "sfPassword" : os.environ['SNOWSQL_PWD'],
    "sfDatabase" : os.environ['SNOWSQL_DATABASE'],
    "sfSchema" : os.environ['SNOWSQL_SCHEMA'],
    "sfWarehouse" : os.environ['SNOWSQL_WAREHOUSE'],
    "authenticator": os.environ['SNOWSQL_AUTH'],
}


### Enabling Pushdown in the Session

In [6]:
sc = spark.sparkContext
sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
#sc._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.disablePushdownSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())



### Run a Query on Snowflake

In [7]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"


df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",  "select 10 as sample_amount union all select 25 as sample_amount") \
  .load()

df.show()


+-------------+
|SAMPLE_AMOUNT|
+-------------+
|           10|
|           25|
+-------------+

