In [4]:
from snowflake import connector
from os import getenv
# import snowflake.connector



class SnowflakeLoader:
   def __init__(self, schema, target_table):
       self.schema = schema.upper()
       self.cred = self.get_cred()
       self.cred["schema"] = self.schema
       self.cur = self.connect(self.cred)
       self.target_table = target_table.upper()
       self.target_stg_table = f"{target_table}_STG"

   def get_cred(self):
       return {
           "user": getenv("dwh_username","phuc2210" ),
           "password": getenv("dwh_password", "Phuc2210"),
           "account": getenv("dwh_account", "zw13425.us-central1.gcp"),
           "role": getenv("dwh_role",'SYSADMIN'),
           "database": getenv("dwh_database", "PHUCWH"),
       }

   def connect(self, cred):
       self.conn = connector.connect(**cred)
       return self.conn.cursor()

   def get_columns(self, tablename):
       result = self.cur.execute(
           f"""SELECT
       COLUMN_NAME , DATA_TYPE
       FROM INFORMATION_SCHEMA.COLUMNS
       WHERE TABLE_SCHEMA = '{self.schema}'
       AND TABLE_NAME = '{tablename}'"""
       ).fetchall()

       return {x[0]: x[1] for x in result}

   def create_table(self, col_definition, table=None):

       if table is None:
           table = self.target_table

       return self.cur.execute(
           f"CREATE TABLE IF NOT EXISTS {table} ({col_definition});"
       ).fetchone()

   def add_column(self, colname, datatype):
       return self.cur.execute(
           f"ALTER TABLE {self.target_table} ADD {colname} {datatype}"
       ).fetchone()

   def put_file(self, path):
       return self.cur.execute(
           f"put file://{path} @{self.target_stg_table};"
       ).fetchone()

   def create_stg_table(self):
       return self.cur.execute(
           f"CREATE OR REPLACE STAGE {self.target_stg_table} file_format = (type = 'PARQUET');"
       ).fetchone()

   def close(self):
       self.cur.close()
       self.conn.close()

   def drop_table(self, table):
       return self.cur.execute(f"DROP TABLE IF EXISTS {table}").fetchone()

   def table_def_update(self):
       target_cols = self.get_columns(self.schema, self.target_table)
       stg_cols = self.get_columns(self.schema, self.target_stg_table)

       new_cols = set(target_cols.keys()) - set(stg_cols.keys())
       for new_col in new_cols:
           self.add_column(new_col, target_cols[new_col])

   def overwrite_target_table(self, col_definition, cols):

       swap_table = f"swap_{self.target_table}"
       temp_table = f"temp_{self.target_table}"

       self.drop_table(swap_table)
       self.create_table(col_definition, swap_table)
       self.copy_from_stage(cols, swap_table)
       self.drop_table(temp_table)
       self.swap_table(self.target_table, swap_table, temp_table)

   def swap_table(self, target: str, source: str, temp: str) -> str:
       result = self.cur.execute(f"ALTER TABLE {target} RENAME TO {temp};").fetchone()
       result1 = self.cur.execute(
           f"ALTER TABLE {source} RENAME TO {target};"
       ).fetchone()
       return {result, result1}

   def copy_from_stage(self, cols, tablename):

       statement = ""
       for col in cols:
           if col == "DWH_TIMESTAMP":
               statement += f'to_timestamp($1:"DWH_TIMESTAMP"::int, 9),'
           else:
               statement += f'$1:"{col}",'

       return self.cur.execute(
           f"""COPY INTO {tablename} FROM (SELECT {statement[:-1]} FROM @{self.target_stg_table})"""
       ).fetchone()

   def get_count(self, table=None) -> int:

       if table is None:
           table = self.target_table

       return self.cur.execute(
           f"SELECT COUNT(*) FROM {self.schema}.{table};"
       ).fetchone()[0]

   def get_max(self, column, table=None) -> int:

       if table is None:
           table = self.target_table

       return self.cur.execute(
           f"SELECT MAX({column}) FROM {self.schema}.{table};"
       ).fetchone()[0]

   def merge_table(self, cols, unique_columns):

       on = ",".join([f'TARGET."{col}" = SOURCE."{col}"' for col in unique_columns])
       update = ",".join([f'TARGET."{col}" = SOURCE."{col}"' for col in cols])
       columns = ",".join([f'"{col}"' for col in cols])
       values = ",".join([f'SOURCE."{col}"' for col in cols])

       stage_statement = ""
       for col in cols:
           if col == "DWH_TIMESTAMP":
               stage_statement += (
                   f'to_timestamp($1:"DWH_TIMESTAMP"::int, 9) AS "DWH_TIMESTAMP",'
               )
           else:
               stage_statement += f'$1:"{col}" AS "{col}",'

       stage_statement = f"SELECT {stage_statement[:-1]} FROM @{self.target_stg_table}"

       sql_statement = f"""
       MERGE INTO
           {self.schema}.{self.target_table} AS TARGET
       USING
           ({stage_statement}) AS SOURCE
       ON {on}
       WHEN MATCHED THEN
       UPDATE SET {update}
       WHEN NOT MATCHED THEN
       INSERT ({columns})
       VALUES ({values})
       """

       return self.cur.execute(sql_statement).fetchone()[0]


ImportError: cannot import name 'connector' from 'snowflake' (C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\snowflake.py)

In [7]:
!pip show snowflake-connector-python
!pip install snowflake-connector-python


Name: snowflake-connector-python
Version: 2.7.11
Summary: Snowflake Connector for Python
Home-page: https://www.snowflake.com/
Author: Snowflake, Inc
Author-email: triage-snowpark-python-api-dl@snowflake.com
License: Apache-2.0
Location: c:\users\administrator\appdata\local\programs\python\python38\lib\site-packages
Requires: asn1crypto, certifi, cffi, charset-normalizer, cryptography, idna, oscrypto, pycryptodomex, pyjwt, pyOpenSSL, pytz, requests, setuptools, typing-extensions, urllib3
Required-by: 


In [8]:
conns = {'SnowflakeDB':{ 'UserName': 'python','Password':'Pythonuser1', 'Host':'ne79526.ap-south.1.aws'}}
import snowflake.connector 

#create the connection 
connection = snowflake.connector.connect( 
    user=conns['SnowflakeDB']['UserName']
    , password=conns['SnowflakeDB']['Password']
    , account=conns['SnowflakeDB']['Host'])


ModuleNotFoundError: No module named 'snowflake.connector'; 'snowflake' is not a package