In [1]:
import snowflake.snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v
import json 
import pandas as pd
from IPython.display import display
import os
import time
from datetime import datetime
date_format_str = '%Y-%m-%d %H:%M:%S'

import jnius_config
import phonenumbers



In [2]:
start_time = pd.to_datetime(datetime.now(), format=date_format_str)

In [3]:
#Add the Snowpipe Streaming JAVA SDK Files to classpath
jnius_config.set_classpath(os.getcwd(),'snowflake-ingest-sdk-1.1.0.jar')
jnius_config.add_classpath(os.getcwd(),'slf4j-api-1.7.21.jar')

In [4]:
from jnius import autoclass

In [5]:
InsertValidationResponse_class = autoclass("net.snowflake.ingest.streaming.InsertValidationResponse")
OpenChannelRequest_class = autoclass("net.snowflake.ingest.streaming.OpenChannelRequest")
SnowflakeStreamingIngestChannel_class = autoclass("net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel")
SnowflakeStreamingIngestClient_class = autoclass("net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient")
OpenChannelRequestBuilder_class = autoclass("net.snowflake.ingest.streaming.OpenChannelRequest$OpenChannelRequestBuilder")
OnErrorOption_class = autoclass("net.snowflake.ingest.streaming.OpenChannelRequest$OnErrorOption")
SnowflakeStreamingIngestClientFactory_class = autoclass("net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory")

In [6]:
print(OpenChannelRequest_class)

<class 'jnius.reflect.net.snowflake.ingest.streaming.OpenChannelRequest'>


In [7]:
Properties_class = autoclass("java.util.Properties")
FileInputStream_class = autoclass("java.io.FileInputStream")
InputStream_class = autoclass("java.io.InputStream")

In [8]:
#Read Snowflake credentials from config file
with open('snowflake_env_login_creds.json') as f:
    data = json.load(f)
#Fetch Snowflake Database credentials    
    USERNAME = data['user']
    URL = data['url']
    SF_ACCOUNT = data['account']
    PRIVATE_KEY = data['private_key']
    PORT = data['port']
    HOST = data['host']
    SCHEMA = data['schema']
    SCHEME = data['scheme']
    DATABASE = data['database']
    TABLE = data['table']
    CONNECTION_STRING = data['connection_string']
    ROLE = data['role']
    WAREHOUSE = data['warehouse']
#Fetch MYSQL Database credentials    
    MYSQL_URL = data['mysql_url']
    MYSQL_USERNAME = data['mysql_user']
    MYSQL_PWD = data['mysql_pwd']
    MYSQL_DATABASE = data['mysql_database']

In [9]:
import mysql.connector as connection

try:
    mydb = connection.connect(host=MYSQL_URL, database=MYSQL_DATABASE,user=MYSQL_USERNAME, passwd=MYSQL_PWD,use_pure=True)
    query = "Select * from people_100k;"
    result_dataFrame = pd.read_sql(query,mydb)
    print(result_dataFrame.head(5))
    
    mydb.close() #close the connection
    
except Exception as e:
    mydb.close()
    print(str(e))

  result_dataFrame = pd.read_sql(query,mydb)


  ROW_INDEX          USER_ID FIRST_NAME LAST_NAME     SEX  \
0         1  e09c4f4cbfEFaFd       Dawn   Trevino    Male   
1         2  D781D28b845Ab9D       Dale  Mcknight    Male   
2         3  eda7EcaF87b2D80    Herbert      Bean  Female   
3         4  E75ACea5D7AeC3e      Karen   Everett  Female   
4         5  9C4Df1246ddf543     Angela      Shea    Male   

                        EMAIL                  PHONE DATE_OF_BIRTH  \
0     clintongood@example.org           360-423-5286    1972-01-17   
1  clairebradshaw@example.org             9062423229    1931-01-31   
2    johnnybooker@example.org  001-149-154-0679x1617    2018-02-10   
3           wkhan@example.org     870.294.7563x20939    1938-06-14   
4  reginaldgarner@example.com           242.442.2978    1971-11-22   

                       JOB_TITLE  
0        Teacher, primary school  
1  Development worker, community  
2              Ceramics designer  
3     Civil engineer, consulting  
4      Health and safety adviser  


In [10]:
try:
#Populate connection string for streaming channel to snowflake    
    props = Properties_class()
    props.put("user", USERNAME)
    props.put("url", URL)
    props.put("account", SF_ACCOUNT)
    props.put("private_key", PRIVATE_KEY)
    props.put("port", PORT)
    props.put("host", HOST)
    props.put("schema", SCHEMA)
    props.put("scheme", SCHEME)
    props.put("database", DATABASE)
    props.put("table", TABLE)
    props.put("connect_string", CONNECTION_STRING)
    props.put("role", ROLE)
    props.put("warehouse", WAREHOUSE)    
    
#Create a secure connection to snowflake   
    client = SnowflakeStreamingIngestClientFactory_class.builder("CLIENT_FOR_SQLDWLOAD").setProperties(props).build()

#Create the streaming channel to snowflake    
    request1 = OpenChannelRequest_class.builder("DWLOAD_CHANNEL").setDBName(props.getProperty("database")).setSchemaName(props.getProperty("schema")).setTableName(props.getProperty("table")).setOnErrorOption(OnErrorOption_class.CONTINUE).build()

#Open a streaming ingest channel from the given client
    channel1 = client.openChannel(request1)
    print("Is Streaming Channel Valid - {}".format(channel1.isValid()))
    print("Is Streaming Ingest Channel Open - {}".format(channel1.isClosed()))
    #print(channel1.isClosed())
    #print(channel1.isValid())    
    
except Exception as err:
    print(f"Exception: {err}")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


Is Streaming Channel Valid - True
Is Streaming Ingest Channel Open - False


In [11]:
HashMap_class = autoclass("java.util.HashMap")
ArrayList_class = autoclass("java.util.ArrayList")
String_class = autoclass("java.lang.String")

In [12]:
try:
    bulk_rows = ArrayList_class()
    
    for index, row in result_dataFrame.iterrows():    
        java_maprow = HashMap_class()
        java_maprow.put("customer_id",row["USER_ID"])
        java_maprow.put("email",row["EMAIL"])
        java_maprow.put("phone",row["PHONE"])
        java_maprow.put("date_of_birth",row["DATE_OF_BIRTH"])
        bulk_rows.add(java_maprow)
    

except Exception as err:
    print(f"Exception: {err}")

In [13]:
#print(bulk_rows[1])

In [14]:
stream_insert_start_time = pd.to_datetime(datetime.now(), format=date_format_str)

In [15]:
try:
    
    if(channel1.isClosed()):
        print("Channel is closed. Insert to Snowflake Table failed")
    else:
        #response = channel1.insertRow(row, "0");
        response = channel1.insertRows(bulk_rows, "0");

        if(response.hasErrors()):
            print(response.hasErrors())
            print(response.toString())  #If there are errors then print to output            

except Exception as err:
    print(f"Exception: {err}") 
finally:
    channel1.close().get() #Close channel to snowflake
    print("Channel Closed = {}".format(channel1.isClosed()))

Channel Closed = True


In [16]:
end_time = pd.to_datetime(datetime.now(), format=date_format_str)
# Get the interval between two datetimes as timedelta object
ingest_diff = end_time - stream_insert_start_time
pipeline_diff = end_time - start_time
print('Total time to ingest {} records in seconds:{}'.format(len(result_dataFrame),ingest_diff.total_seconds()))
print('Total pipeline execution time in seconds:{}'.format(pipeline_diff.total_seconds()))

Total time to ingest 500000 records in seconds:10.692488
Total pipeline execution time in seconds:50.369858


In [17]:
#print(channel1.isClosed())
#print(channel1.isValid())

In [18]:
offsetTokenFromSnowflake = channel1.getLatestCommittedOffsetToken()
print(int(offsetTokenFromSnowflake))

0


In [19]:
#Build Snowpark code to validate that records have been inserted into the table

In [44]:
z = phonenumbers.parse("001-399-445-1778x8552", "US")
print(z.national_number)
print(phonenumbers.format_number(z, phonenumbers.PhoneNumberFormat.to_string))

13994451778
0013994451778 ext. 8552


In [43]:
#print(type(z.national_number))

In [22]:
#Build Snowpark Connection
# Specify connection parameters
connection_parameters = {
    "account": SF_ACCOUNT,
    "user": USERNAME,
    "password": MYSQL_PWD,
    "role": ROLE,
    "warehouse": WAREHOUSE,
    "database": DATABASE,
    "schema": SCHEMA,
}

In [23]:
# Create Snowpark session
snowpark_session = Session.builder.configs(connection_parameters).create()

In [24]:
snowpark_df = snowpark_session.table("SNOWPIPE_STREAMING.DEV.sf_customer_raw")

In [25]:
print("Total records in Customer RAW table - {}".format(snowpark_df.count()))

Total records in Customer RAW table - 500000


In [26]:
#Transform phone no. column and format DOB
from snowflake.snowpark.functions import udf
snowpark_session.add_packages("numpy", "pandas", "phonenumbers")

The version of package phonenumbers in the local environment is 8.13.11, which does not fit the criteria for the requirement phonenumbers. Your UDF might not work when the package version is different between the server and your local environment


In [34]:
'''
def parse_phone_no(x: str) -> str:
    try:
        return str(phonenumbers.parse(x,"US").national_number) 
    except Exception as err:
        return "0000000000"     #setting a default value on error
    
##################################################################
## Register UDF in Snowflake
### Add packages and data types
from snowflake.snowpark.types import IntegerType, StringType
### Upload UDF to Snowflake
snowpark_session.udf.register(
    func = parse_phone_no
  , return_type = StringType()
  , input_types = [StringType()]
  , is_permanent = True
  , name = 'parse_phone_no'
  , replace = True
  , stage_location = '@UDF_STAGE'
)
'''

<snowflake.snowpark.udf.UserDefinedFunction at 0x7fe5c5685c40>

In [28]:
tf_start_time = pd.to_datetime(datetime.now(), format=date_format_str)

In [35]:
snowpark_session.sql('''
                    insert into dev.sf_customer_gold
                    (select customer_id,
                        email,
                        parse_phone_no(phone),
                        to_date(date_of_birth)
                        from dev.sf_customer_raw);''').collect()

[Row(number of rows inserted=500000)]

In [30]:
snowpark_df = snowpark_session.table("SNOWPIPE_STREAMING.DEV.sf_customer_gold")

In [31]:
print("Transformed GOLD table [sf_customer_gold] created. Total records - {}".format(snowpark_df.count()))

Transformed GOLD table [sf_customer_gold] created. Total records - 500000


In [33]:
tf_end_time = pd.to_datetime(datetime.now(), format=date_format_str)
# Get the interval between two datetimes as timedelta object
tf_diff = tf_end_time - tf_start_time
print('Total transformation execution time in seconds:{}'.format(tf_diff.total_seconds()))

Total transformation execution time in seconds:13.925515
