# `snopy` - json data ingestion from a local stage

## Imports

In [1]:
import os
from snopy import snopy_connect
from snowflake.connector.errors import ProgrammingError

## Credentials

In [2]:
sf_username=os.environ['SNOWFLAKE_USER']
sf_password=os.environ['SNOWFLAKE_PASSWORD']
sf_account=os.environ['SNOWFLAKE_ACCOUNT']

## Connecting to your Snowflake account

In [3]:
sc = snopy_connect(
    username=sf_username,
    password=sf_password,
    account=sf_account
)

## Environment setup

In [4]:
sc.set_environment(
    warehouse='COMPUTE_WH',
    role='ACCOUNTADMIN'
)

sc.database.create('SNOPY', or_replace=True, silent=True)
sc.database.use('SNOPY', silent=True)
sc.schema.create('INGESTION_SCHEMA', or_replace=True, silent=True)
sc.schema.use('INGESTION_SCHEMA', silent=True)

In [5]:
sc.get_environment()

{'role': 'ACCOUNTADMIN',
 'database': 'SNOPY',
 'schema': 'INGESTION_SCHEMA',
 'warehouse': 'COMPUTE_WH'}

## Creating file format

In [6]:
# It's possible to add extra parameters to the function
# They will be passed as additional variables at the end of the query statement
results_ff = sc.file_format.create(
    'my_json_format', 
    'JSON', 
    or_replace=True,
    TRIM_SPACE = 'TRUE'
)

results_ff

{'results': [('File format MY_JSON_FORMAT successfully created.',)],
 'description': [ResultMetadata(name='status', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=True)],
 'statement': 'CREATE OR REPLACE FILE FORMAT my_json_format TYPE = JSON TRIM_SPACE = TRUE'}

In [7]:
# Let's not forget about `snowflake.execute` functionality
sc.execute('SHOW FILE FORMATS;')['results']

[(datetime.datetime(2022, 9, 15, 8, 33, 17, 331000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>),
  'MY_JSON_FORMAT',
  'SNOPY',
  'INGESTION_SCHEMA',
  'JSON',
  'ACCOUNTADMIN',
  '',
  '{"TYPE":"JSON","FILE_EXTENSION":null,"DATE_FORMAT":"AUTO","TIME_FORMAT":"AUTO","TIMESTAMP_FORMAT":"AUTO","BINARY_FORMAT":"HEX","TRIM_SPACE":true,"NULL_IF":[],"COMPRESSION":"AUTO","ENABLE_OCTAL":false,"ALLOW_DUPLICATE":false,"STRIP_OUTER_ARRAY":false,"STRIP_NULL_VALUES":false,"IGNORE_UTF8_ERRORS":false,"REPLACE_INVALID_CHARACTERS":false,"SKIP_BYTE_ORDER_MARK":true}')]

## Creating file format with wrong parameter (not applicable for JSON file format, see [docs](https://docs.snowflake.com/en/sql-reference/sql/create-file-format.html))

In [8]:
try:
    sc.file_format.create('my_invalid_son_format', 'JSON', or_replace=True, ESCAPE = '"\\"')
except ProgrammingError as pe:
    print('Something went wrong!')
    print(pe)

Something went wrong!
002135 (42601): SQL compilation error:
Option ESCAPE is not valid for file format type JSON.


## Dropping file format

In [9]:
# sc.file_format.drop('my_json_format', if_exists=True)

## Creating internal stage

In [10]:
sc.stage.create(
    stage_name='my_internal_stage', 
    or_replace=True,
    file_format_name='my_json_format'
)

{'results': [('Stage area MY_INTERNAL_STAGE successfully created.',)],
 'description': [ResultMetadata(name='status', type_code=2, display_size=None, internal_size=16777216, precision=None, scale=None, is_nullable=True)],
 'statement': 'CREATE OR REPLACE STAGE my_internal_stage FILE_FORMAT = my_json_format'}

## Putting data on the stage

In [11]:
sc.stage.put(
    filepath='file://data/meteorites.json',
    internal_stage_name='@my_internal_stage',
    overwrite=True,
    silent=True
)

## Listing files inside stage

In [12]:
result_list_stage = sc.stage.list('@my_internal_stage')

print(result_list_stage['results'])
print(result_list_stage['statement'])

[('my_internal_stage/meteorites.json.gz', 34400, '299c996d4d6d59bb0da5f2ec44ca3b4d', 'Thu, 15 Sep 2022 15:33:23 GMT')]
LIST @my_internal_stage


## Creating table with query execution

In [13]:
query_create_table = """
    CREATE OR REPLACE TABLE meteorites (
        name VARCHAR
        , id INT
        , nametype VARCHAR
        , recclass VARCHAR
        , mass INT
        , fall VARCHAR
        , year DATE
        , reclat FLOAT
        , reclong FLOAT
        , geolocation OBJECT
    );
"""

# No worries, Table API will be available soon too!
sc.execute(query_create_table)['results']

[('Table METEORITES successfully created.',)]

## Loading data from internal stage to a table

As we're loading data in JSON format, transformations are required to not put everything into a single VARIANT type column (for more, [see the documentation](https://docs.snowflake.com/en/sql-reference/data-types-semistructured.html)).

In [14]:
transformation_statement = """
    SELECT
        $1:name::varchar AS name
        , $1:id::int AS id
        , $1:nametype::varchar AS nametype
        , $1:recclass::varchar AS recclass
        , $1:mass::int AS mass
        , $1:fall::varchar AS fall
        , $1:year::date AS date
        , $1:reclat::float AS reclat
        , $1:reclong::float AS reclong
        , $1:geolocation::object AS geolocation
    FROM @my_internal_stage
"""

sc.copy_into(
    table_name='meteorites',
    source_stage=transformation_statement,
    silent=True
)

## Data querying straight to Pandas DataFrame

In [16]:
data_meteorites = sc.query_pd("SELECT *, geolocation['type']::varchar as geo_type FROM meteorites;")
data_meteorites.head()

Unnamed: 0,NAME,ID,NAMETYPE,RECCLASS,MASS,FALL,YEAR,RECLAT,RECLONG,GEOLOCATION,GEO_TYPE
0,Aachen,1,Valid,L5,21,Fell,1880-01-01,50.775,6.08333,"{\n ""coordinates"": [\n 6.08333,\n 50.77...",Point
1,Aarhus,2,Valid,H6,720,Fell,1951-01-01,56.18333,10.23333,"{\n ""coordinates"": [\n 10.23333,\n 56.1...",Point
2,Abee,6,Valid,EH4,107000,Fell,1952-01-01,54.21667,-113.0,"{\n ""coordinates"": [\n -113,\n 54.21667...",Point
3,Acapulco,10,Valid,Acapulcoite,1914,Fell,1976-01-01,16.88333,-99.9,"{\n ""coordinates"": [\n -99.9,\n 16.8833...",Point
4,Achiras,370,Valid,L6,780,Fell,1902-01-01,-33.16667,-64.95,"{\n ""coordinates"": [\n -64.95,\n -33.16...",Point


## Closing the connection

In [17]:
sc.close_connection()