In [None]:
from snowflake.snowpark import Session
import snowflake.snowpark.functions as f
from snowflake.snowpark.functions import col,lit,min,max,avg,stddev,median,count,builtin,parse_json,udf
from snowflake.snowpark import DataFrame
from credenciales import connection_parameters

import os
from dotenv import load_dotenv,find_dotenv
load_dotenv(find_dotenv())

<a id="snowpark_architecture"></a>
### 1.1 Arquitectura Snowpark 

<img src="./images/SnowparkFlowOverviewPython.png" alt="Snowpark Flow Overview" style="width:80%;display:block;margin-left:5%;" />

<a id="snowpark_conn"></a>
## 2. Conexión Snowflake

In [None]:
sesion = Session.builder.configs(connection_parameters).create()
if sesion is not None:
    print('Conexión OK')
    print('ACCOUNT: ' + sesion.get_current_account())
    print('WAREHOUSE: ' + sesion.get_current_warehouse())
    print('DATABASE: ' + sesion.get_current_database())
    print('SCHEMA: ' + sesion.get_current_schema())
    print('ROLE: ' + sesion.get_current_role())
else:
    print("Error de conexión!")
    

<a id="snowpark_dataframe"></a>
## 3. Snowpark DataFrame

<img src="./images/AnatomyOfADataFrame.png" alt="Snowpark DataFrame" style="width:70%;display:block;margin-left:10%;" />

<a id="snowpark_load"></a>
## 4. Cargar Data Estrcuturada

In [None]:
q1 = 'CREATE STAGE IF NOT EXISTS ' + os.environ['stage'] + ' DIRECTORY = ( ENABLE = true )'

q2 =''' 
            CREATE or replace FILE FORMAT CSV
            TYPE=\'CSV\' 
            COMPRESSION=\'AUTO\' 
            FIELD_DELIMITER=\',\' 
            RECORD_DELIMITER='\\n\'
            SKIP_HEADER=1
            FIELD_OPTIONALLY_ENCLOSED_BY='\\042\'
            TRIM_SPACE=FALSE
            ERROR_ON_COLUMN_COUNT_MISMATCH=FALSE
            ESCAPE=\'NONE\'
            ESCAPE_UNENCLOSED_FIELD='\\134\'
            DATE_FORMAT=\'AUTO'\
            TIMESTAMP_FORMAT=\'AUTO\'
            NULL_IF=(\'\')
            COMMENT=\'CSV File Format SNOWPARK\'
            '''

resp1 = sesion.sql(q1)
print(resp1.show())
resp2 = sesion.sql(q2)
print(resp2.show())



In [None]:
#load Data en tabla
ruta_csv = os.path.join(os.getcwd(),'orders.csv')   
put_resultado = sesion.file.put(ruta_csv, '@'+os.environ['stage'], auto_compress=False)
print(put_resultado[0].status + ': size: ' + str(put_resultado[0].target_size))


In [None]:
q3 = 'COPY INTO ' + os.environ['tabla1'] +' FROM @'+os.environ['stage'] + ' file_format=CSV'
resp3 = sesion.sql(q3)


In [None]:
df = sesion.table(os.environ['tabla1']);
df.show(5);

<a id="snowpark_select1"></a>
## 4. Funciones 

In [None]:
#Selección de Columas
df.select(col("IP_ADDRESS"),col("CITY")).show(5)

In [None]:
#Agregar columna
new_df = (df
    .with_column("bool", lit(False))
).show(5)         

In [None]:
#Ordenamiento
df.sort(col("SHIPPING_ZIPCODE").asc()).show()

In [None]:
#Agregados funcionales 
cinco_registros = (df
    .limit(5)
    .cache_result()
)          

print("Nuestra data:")
cinco_registros.show()

In [None]:
#Promedio
cinco_registros.agg(avg(col("TOTAL_TRNX_AMOUNT"))).show()

In [None]:
#Promedio, Máximo, Mínimo
cinco_registros.agg(
      [
         avg(col("TOTAL_TRNX_AMOUNT"))
        ,stddev(col("TOTAL_TRNX_AMOUNT"))
        ,max(col("TOTAL_TRNX_AMOUNT"))
       ]
     ).show()

In [None]:
#Renombrar y ejecutar casting
from snowflake.snowpark.types import DecimalType,DoubleType

cinco_registros.agg(
         {
          "JOIN_KEY" : "avg"
          ,"TOTAL_TRNX_AMOUNT" :"stddev"
          ,"SHIPPING_ZIPCODE" : "max"
        }
      ).select(
         col("$1").cast(DecimalType(38,4)).alias("Promedio de Transaccion"),
         col("$2").cast(DecimalType(10,5)).alias("Desviación Estandard"),
         col("$3").cast(DoubleType()).alias("Max ZIP Code")).show()


In [None]:
#Agrupación
df.group_by(col("SHIPPING_ZIPCODE")).agg(                            # Produces a DataFrame
        [ count("*").alias("Cantidad")
         ,avg(col("TOTAL_TRNX_AMOUNT")).alias("Promedio")
        ]
      ).show(5)
                  

<a id="snowpark_semi"></a>
## 5. Data Semi-Estructurada 

In [None]:
json_array_string = (
"""{
"castaways" : [
  { "id" : 1, "Name" : "Willy Gilligan", "title" : "Little Buddy", "hut-mate" : 2 },
  { "id" : 2, "Name" : "Jonas Grumby", "title" : "Skipper", "hut-mate" : 1  },
  { "id" : 3, "Name" : "Thurston Howell, III", "title" : "The Millionaire", "hut-mate" : 4 },
  { "id" : 4, "Name" : "Lovey Howell", "title" : "His Wife", "hut-mate" : 3 },
  { "id" : 5, "Name" : "Ginger Grant", "title" : "The Movie Star", "hut-mate" : 7 },
  { "id" : 6, "Name" : "Roy Hinkley", "title" : "The Professor" },
  { "id" : 7, "Name" : "Mary Ann Summers", "title" : "The Rest", "hut-mate" : 5 }        
 ]
}"""
)

In [None]:
castaways_df = (sesion.create_dataframe([""])
    .select(
         parse_json(lit(json_array_string))
        .alias("CASTAWAYS_MAP_WITH_ARRAY")
      )
)

castaways_df.show()

In [None]:
for field in castaways_df.schema.fields:
    print(field)

In [None]:
#Traversing Nested
castaways_df.select(
        col("CASTAWAYS_MAP_WITH_ARRAY")["castaways"]
            .alias("Castaways Array from Python list notation")
      ).show()

In [None]:
from snowflake.snowpark.types import StringType
(castaways_df
.select(
          # First element (zero); Rename to "Castaways sub zero"
         col("CASTAWAYS_MAP_WITH_ARRAY")["castaways"][0]
            .alias("Castaways sub 0")
          # Second element (one); Rename to "Castaways sub 1" 
        ,col("CASTAWAYS_MAP_WITH_ARRAY")["castaways"][1]
            .alias("Castaways sub 1")
        ,col("CASTAWAYS_MAP_WITH_ARRAY")["castaways"][1]["title"].cast(StringType())
            .alias("Title of castaways sub 1")
      )
    .show()
)

In [None]:
#DatafaFrame Flatten
flattened_df = (castaways_df
    .join_table_function(
         "flatten"
        ,col("CASTAWAYS_MAP_WITH_ARRAY") # input - The column containing data to flatten
        ,lit("castaways")                # path - The key of the value to be flattened
        ,lit(False)                      # outer -  Omit zero-length entities (no elements)                     
        ,lit(True)                       # recursive - Flatten any values that are variants
        ,lit("BOTH")                     # mode - Flatten both objects (maps) and arrays
      )
)

for field in flattened_df.schema.fields:
    print(field) 

In [None]:
(flattened_df
    .select(
         col("KEY")
        ,col("INDEX")
        ,col("VALUE")
     )
    .show()
)

<a id="snowpark_save"></a>
## 6. Salvar Data en Snowflake

In [None]:
desired_table_name = "TOP_5_TBL"
target_db_name = sesion.get_current_database()
target_schema_name = sesion.get_current_schema()
context_list = ([
         target_db_name
        ,target_schema_name
        ,desired_table_name
   ]
)    

truncate_table_string = f"TRUNCATE TABLE IF EXISTS {desired_table_name}"
(sesion.sql(truncate_table_string)
    .show()
) 

cinco_registros.write.mode("OVERWRITE").save_as_table(context_list)


In [None]:
select_star_query_string = f"SELECT * FROM {desired_table_name}"
print(f"Results of {select_star_query_string} (3 rows)")
(sesion.sql(select_star_query_string)
    .show(3)
) 

<a id="snowpark_udf"></a>
## 7. UDFs

In [None]:
from snowflake.snowpark.types import DoubleType
import math

curr_db = sesion.get_current_database()
curr_schema = sesion.get_current_schema()

my_lambda_circum_udf = (
    udf(                            
         func = lambda radius: 2.0 * math.pi * radius
        ,return_type = DoubleType()   
        ,input_types = [DoubleType()] 
        ,is_permanent = False          
#        ,stage_location = '@~'        # Ignored when is_permanent is False
        ,replace = True               
        ,session = sesion
        ,name = [curr_db, curr_schema, "LAMBDA_CIRCUMFERENCE"]
     ))

In [None]:
#Test UDF
(sesion.create_dataframe([1,2,3,4,5])
    .to_df("RADIUS")
    .with_column(
             "THE_CIRCUM"
            ,my_lambda_circum_udf("RADIUS")
     )
    .sort(col("RADIUS").asc())
    .show() 
) 

<a id="snowpark_udfreg"></a>
## 7. Registrar UDFs

In [None]:
from snowflake.snowpark.types import DoubleType

def circumference(radius) -> float:
    return 2.0 * radius * math.pi 

circum_udf_temp = (sesion
    .udf                      # An instance of UDFRegistration
    .register(
         func = circumference
        ,name = "CALCULATE_CIRCUMFERENCE_TEMP"
        ,return_type = DoubleType()
        ,input_types = [DoubleType()]
        ,is_permanent = True 
        ,stage_location = os.environ['stage']
        ,replace = True       # Avoid errors in case we run this cell more than once
        ,session = sesion    
    )
) 

In [None]:
# Test  UDF via SQL
num = 10
select_text = f"SELECT {num} AS NUM, CALCULATE_CIRCUMFERENCE_TEMP({num}) AS CIRCUM"
print(f"Invoking our UDF via SQL:\n{select_text}")
(sesion.sql(select_text)
    .show()
)   

<a id="snowpark_close"></a>
## Cerrar Sesión

In [None]:
sesion.close()