In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql.types import *
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

### Downloading snowflake jdbc driver

    - Create a directory for the snowflake jar files
    - Define the drivers to be downloaded
    - Identify the latest version of the driver
    - Download the driver

In [2]:
%%bash
SFC_DIR=/home/ec2-user/snowflake
[ ! -d "$SFC_DIR" ] && mkdir $SFC_DIR 
cd $SFC_DIR
PRODUCTS='snowflake-jdbc spark-snowflake_2.11'
for PRODUCT in $PRODUCTS
do
   wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/maven-metadata.xml" 2> /dev/null
   VERSION=$(grep latest maven-metadata.xml | awk -F">" '{ print $2 }' | awk -F"<" '{ print $1 }')
   DRIVER=$PRODUCT-$VERSION.jar
   if [[ ! -e $DRIVER ]]
   then
      rm $PRODUCT* 2>/dev/null
      wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/$VERSION/$DRIVER" 2> /dev/null
   fi
   [ -e maven-metadata.xml ] && rm maven-metadata.xml
done

In [3]:
!ls -lrt /home/ec2-user/snowflake

total 30472
-rw-rw-r-- 1 ec2-user ec2-user 30260258 Jul 14 17:26 snowflake-jdbc-3.12.9.jar
-rw-rw-r-- 1 ec2-user ec2-user   938318 Jul 14 21:45 spark-snowflake_2.11-2.8.1-spark_2.4.jar


In [4]:
sfc_jars=!ls -d /home/ec2-user/snowflake/*.jar

In [5]:
":".join(classpath_jars())+":"+":".join(sfc_jars)

'/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-aws-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-common-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-s3-1.11.613.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-auth-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sagemaker-1.11.613.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/hadoop-annotations-2.8.1.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-sts-1.11.613.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/sagemaker_pyspark/jars/aws-java-sdk-kms-1.11.613.jar:/home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages/

In [6]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/ec2-user/snowflake/spark-snowflake_2.11-2.8.1-spark_2.4.jar,/home/ec2-user/snowflake/snowflake-jdbc-3.12.9.jar pyspark-shell'

In [7]:
conf = (SparkConf()
        #.set("spark.driver.extraClassPath", (":".join(classpath_jars())+":"+":".join(sfc_jars)))
        .setMaster('local')
        .setAppName('local-spark-test'))
sc=SparkContext.getOrCreate(conf=conf)
sc.stop()

sc=SparkContext(conf=conf)
print (sc)

spark = SQLContext(sc)
print (spark)

<SparkContext master=local appName=local-spark-test>
<pyspark.sql.context.SQLContext object at 0x7fc54c273a90>


In [8]:
sfOptions = {
  "sfURL" : "https://oq51261.eu-west-1.snowflakecomputing.com/",
  "sfAccount" : "oq51261",
  "sfUser" : "sagemaker",
  "sfPassword" : "IRISPW",
  "sfDatabase" : "ML_IRIS",
  "sfSchema" : "PUBLIC",
  "sfWarehouse" : "SAGEMAKER_WH",
}

In [9]:
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
  .options(**sfOptions) \
  .option("query",\
          """select * from IRIS""").load()

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [13]:
# convert to pandas dataframe
iris_df = df.toPandas()
iris_df.to_csv('iris_spark.csv')

# Uplaod data to S3

In [14]:
import sagemaker

session = sagemaker.session.Session()
s3_bucket = 'snowflake-getting-started'
prefix = 'iris/data'

print ('uploading data to s3')
s3_data_path = session.upload_data(path='iris_spark.csv', 
                                   bucket=s3_bucket, 
                                   key_prefix=prefix)
print ('data uploaded to -', s3_data_path)

uploading data to s3
data uploaded to - s3://snowflake-getting-started/iris/data/iris_spark.csv
