# Import Apache Sedona

In [None]:
from sedona.spark import *

# Define Sedona context if not defined yet

In [None]:
config = SedonaContext.builder() .\
    getOrCreate()
sedona = SedonaContext.create(config)

sc = sedona.sparkContext

# Define Snowflake Data Source Class Name

In [None]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

# Set Snowflake connection and context options.

Full list of possible configuration options can be found [here](https://docs.snowflake.com/en/user-guide/spark-connector-use#label-spark-options)

In [None]:
#Connection options
snowflake_url = '<SNOWFLAKE_URL>' # <account_identifier>.snowflakecomputing.com
username = '<SNOWFLAKE_USERNAME>'
password = '<SNOWFLAKE_PASSWORD>'

#Context options
database = '<SNOWFLAKE_DB_NAME>'
schema = '<DB_SCHEMA>'

In [None]:
sfOptions = {"sfUrl": snowflake_url, "sfUser": username, "sfPassword" : password, "sfDatabase": database, "sfSchema": schema}

# Read entire table from snowflake into a Sedona Dataframe
A `dbtable` option with the table_name as value can be passed to the spark session object to read the entire table contents into the dataframe. Essentially, run a 'SELECT * FROM table' query.

In [None]:
src_table_name = "city_tbl_geom" #source table name in Snowflake
df = sedona.read.format(SNOWFLAKE_SOURCE_NAME)\
    .options(**sfOptions)\
    .option("dbtable", src_table_name)\
    .load()

# Read a particular query result from snowflake into a Sedona Dataframe
A `query` option with the desired query as value can be passed to the sedona context object to read the result of the query into the dataframe. 

By default, query and predicate pushdown in this execution. 
If you wish to disable pushdown, a `autopushdown` option with value `off` can be passed to the sedona context object.

In [None]:
query = "SELECT GEOM, CITY_NAME FROM " + src_table_name + " WHERE CITY_NAME = 'Seattle'" #custom query to run
df_query = sedona.read.format(SNOWFLAKE_SOURCE_NAME)\
            .options(**sfOptions)\
            .option("query", query)\
            .load()

# Create Sedona Geometry type column
Snowflake serializes geometries as GeoJSON and hence the dataframe will be populated with GeoJSON strings in the geometry column.
The GeoJSON strings can be converted to Sedona Geometry types using `ST_GeomFromGeoJSON` exposed via Sedona

In [None]:
df_city = df.selectExpr("ST_GeomFromGeoJSON(GEOM) as geom", "CITY_NAME")
df_seattle = df_query.selectExpr("ST_GeomFromGeoJSON(GEOM) as geom", "CITY_NAME")

#create a table for use in subsequent queries
df_city.createOrReplaceTempView("city_table")
df_seattle.createOrReplaceTempView("seattle_table")


# Operate on geospatial data using Sedona
Now, any desired processing can be performed on the loaded geospatial data with Sedona's extensive [vector function catalog](https://docs.wherobots.services/1.1.0/references/wherobots-compute/vector-data/Overview/)

In [None]:
new_york_point = 'POINT (-74.00 40.71)'
df_ny = sedona.sql("SELECT ST_GeomFromWKT('" + new_york_point + "') as new_york")
df_ny.createOrReplaceTempView("new_york_table")
df_dist_from_ny = sedona.sql("SELECT ST_AsGeoJson(geom) as geom, ST_DistanceSphere(geom, new_york) as dist_from_ny, CITY_NAME from city_table, new_york_table")

# Write computed data back to Snowflake

A Sedona Dataframe can be written back to Snowflake using the write() method of the sedona context object. 
Same as with the read() method, a map `sfOptions` must be passed with the necessary connection and context options.

Provide a `dbtable` option to specify the name of the destination table.

Also, provide a [save mode](https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/SaveMode.html) parameter to specify handling collisions with existing tables if any.

In [None]:
destination_table = "distance_from_ny" #destination table name in Snowflake
save_mode = "append" #Append data to the table if the table already exists with some data. Other possible values are: errorifexists, ignore, overwrite.
df_dist_from_ny.write\
                .format(SNOWFLAKE_SOURCE_NAME)\
                .options(**sfOptions)\
                .option("dbtable", destination_table)\
                .mode(saveMode=save_mode)\
                .save()