In [2]:
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

import pandas as pd

# import matplotlib.pyplot as plt

# %matplotlib inline
import datetime as dt
import numpy as np
import seaborn as sns

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop


# lets import some tranformations functions
from snowflake.snowpark.functions import udf, col, lit, translate, is_null, iff



In [4]:
from snowflake.snowpark import version
print(version.VERSION)
#session.close()
session = Session.builder.configs(snowflake_conn_prop).create()
session.sql("use role accountadmin").collect()
session.sql("create database if not exists  {}".format(snowflake_conn_prop['database'])).collect()
session.sql("use database {}".format(snowflake_conn_prop['database'])).collect()
session.sql("create schema if not exists {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("use schema {}".format(snowflake_conn_prop['schema'])).collect()
session.sql("create warehouse if not exists {} with \
                WAREHOUSE_SIZE = XSMALL \
                AUTO_SUSPEND = 120 \
                AUTO_RESUME = TRUE".format(snowflake_conn_prop['warehouse'])).collect()
session.sql("use warehouse {}".format(snowflake_conn_prop['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 8, 0)
[Row(CURRENT_WAREHOUSE()='SP_QS_WH', CURRENT_DATABASE()='TEST_DB', CURRENT_SCHEMA()='DVD_FROSTYFRIDAYS_SPARK')]


In [6]:
session.get_current_database(), session.get_fully_qualified_current_schema(),
session.

('"TEST_DB"', '"TEST_DB"."DVD_FROSTYFRIDAYS_SPARK"')

In [10]:
def create_stage(session, database, schema, name, additional_info):
    sql = f'create stage if not exists {database}.{schema}.{name} {additional_info}'
    session.sql(sql).collect()

In [76]:
url_stage = 's3://frostyfridaychallenges/challenge_1/'
create_stage(session,
             session.get_current_database(), 
             session.get_current_schema(),
             "SNOWPARK_FF_01",
             f"url='{url_stage}' file_format=(type=csv SKIP_HEADER =1)")

##### Single column import to explore data

In [83]:
user_schema = StructType([StructField("VALUE", StringType())])
df = session.read.options({"field_delimiter": "\0", "skip_header": 1}).schema(user_schema).csv("@SNOWPARK_FF_01")
df.show()

--------------------
|"VALUE"           |
--------------------
|right             |
|NULL              |
|totally_empty     |
|congratulations!  |
|it                |
|you               |
|have              |
|gotten            |
--------------------



##### Proper types and columns

In [30]:
#user_schema = StructType([StructField("metadata$filename", StringType())])


In [85]:
user_schema = StructType([StructField("WORD", StringType())])
df = session.read.options({"field_delimiter": "\0", "skip_header": 1})\
            .schema(user_schema).csv("@SNOWPARK_FF_01")


In [87]:
session.sql(f'drop table if exists {session.get_fully_qualified_current_schema()}.CH_01').collect()

[Row(status='CH_01 successfully dropped.')]

In [88]:
staged_files = session.sql("list @SNOWPARK_FF_01")

file_names_list = staged_files.select(col('"name"')).collect()


for file in file_names_list:
    print(file.asDict()['name'])
    df_lines = session.read.options({"field_delimiter": "\0", "skip_header": 1})\
                  .schema(user_schema).csv("@SNOWPARK_FF_01/" + file.asDict()['name'].replace(url_stage,''))
    df_lines.write.save_as_table([session.get_fully_qualified_current_schema(), 'CH_01'],
                                 mode='append'
                                )

s3://frostyfridaychallenges/challenge_1/1.csv
s3://frostyfridaychallenges/challenge_1/2.csv
s3://frostyfridaychallenges/challenge_1/3.csv


In [82]:
t = session.table([session.get_fully_qualified_current_schema(), 'CH_01'])
t.collect()

[Row(WORD='you'),
 Row(WORD='have'),
 Row(WORD='gotten'),
 Row(WORD='it'),
 Row(WORD='right'),
 Row(WORD='NULL'),
 Row(WORD='totally_empty'),
 Row(WORD='congratulations!')]