In [1]:
import os
import glob
import csv
import pandas as pd
import shutil
import snowflake.connector
import configparser
import fnmatch
from configparser import ConfigParser
import os

In [5]:



class Metaframework:
    
    def __init__(self,path,stage = 'internal'):
        self.path = path
        self.stage = stage
        parser = ConfigParser()
        parser.read(f'{self.path}/SNOWFLAKE_CREDS.cfg')
        account = parser.get('my_api','account')
        user = parser.get('my_api','user')
        password = parser.get('my_api','password')
        database = parser.get('my_api','database')
        schema = parser.get('my_api','schema')
        role = parser.get('my_api','role')
        warehouse = parser.get('my_api','warehouse')
       
        conn = snowflake.connector.connect( user = user ,password = password,account = account ,warehouse = warehouse ,
                                           database = database,schema = schema,role = role)
        self.curs = conn.cursor()
        
        self.curs.execute(f'CREATE STAGE IF NOT EXISTS {self.stage};')
        print(f'{self.stage} has been created')
        
        
    def create_table(self,target_table,file_path,delimiter=',',replace=False):
        file_format = f"CREATE OR REPLACE FILE FORMAT my_csv_format TYPE = 'CSV' FIELD_DELIMITER = '{delimiter}'  PARSE_HEADER = TRUE;"
        self.curs.execute(file_format)
        statement = ' OR REPLACE TABLE' if replace else "TABLE IF NOT EXISTS "
        query = f"""
                    CREATE {statement} {target_table}
                    USING TEMPLATE (
                           SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                           WITHIN GROUP (ORDER BY ORDER_ID)
                           FROM TABLE(
                               INFER_SCHEMA(
                                     LOCATION=>'{file_path}',
                                     FILE_FORMAT=>'my_csv_format')));
                """
        print(query)
        self.curs.execute(query)
        print(f'\n{target_table.upper()} has been created\n')
        
        
    def ingest_csv_file(self, csv_file_path, table_name, skip_header = 1, field_delimiter = ',', target_operation = 'append', *args):
         
         # Put file to stage 
        import os
        full_path = os.path.join(self.path,csv_file_path) 
        put_file = f"PUT 'file://{os.path.join(self.path,csv_file_path)}'' @{os.path.join(self.stage,csv_file_path)} AUTO_COMPRESS = FALSE OVERWRITE = TRUE"
        
        #creating table with path and delimiter from metadata
        self.create_table(table_name,stage_path,delimiter=field_delimiter)
        
        # append or overwrite
        if target_operation.casefold() == 'overwrite':
            print(f"{table_name.upper()} is truncated")
            truncate_query = f"TRUNCATE TABLE {table_name.upper()}"  
            curs.execute(truncate_query)
        
        
        # Copy CSV file data into Snowflake table
        copy_query = f"""
                            COPY INTO {table_name.upper()} 
                            FROM '@{database}.{schema}.{stage_path}'
                            FILE_FORMAT = (
                                    TYPE = CSV   
                                    SKIP_HEADER = {skip_header}
                                    FIELD_DELIMITER = '{field_delimiter}')
                            on_error = continue;"""
        print(copy_query)
        self.curs.execute(copy_query)

        
    def main(self):
        csv_file_path = 'inbound/'
        archive_folder = 'archive/'
        missing_folder = 'missing/'
        failed = 'failed/'

        df = pd.read_csv('mapping/metadata1.csv',header = 0)

        for file in os.listdir(self.csv_file_path):
            matching = False
            for ind in df.index:
                if fnmatch.fnmatch(file, df['src_file_prefix'][ind]+'*.csv'):
                    if df['is_active'][ind] == 'Y':
                        delim = ','
                        if df['src_file_delim'][ind] != ',':
                            delim = df['src_file_delim'][ind]
                        ingest_csv_file(stage,self.csv_file_path+file,df['tgt_table'][ind],field_delimiter = delim)
        #                 print(f"ingest_csv_file({stage},{csv_file_path+file},{df['tgt_table'][ind]})")
                        shutil.move(csv_file_path+file,'archive/')

                    else:
                        print(f'Skipping CSV file {file} for table {df["tgt_table"][ind]} due to inactive flag.')
                        # Move the skipped file to the missing mapping folder
                        shutil.move(csv_file_path + file, failed)
                        print(f'CSV file "{file}" moved to missing mapping folder "{failed}".')
                    matching = True
                    break
            if not matching:
                    print(f'\n No file name pattern found for CSV file "{file}" . Skipping the file.\n')
                    shutil.move(csv_file_path + file, missing_folder)
                    print(f'\nCSV file "{file}" moved to missing mapping folder {missing_folder}".\n')

a = Metaframework('D:\snowpark')

a.put_file('inbound/data_85.csv')

a.create_table('TARGET_1','@internal/inbound/data_85.csv')

In [6]:
a = Metaframework('D:\snowpark','internal_4')
#  csv_file_path, table_name, skip_header = 1, field_delimiter = ',', target_operation = 'append', *args):


In [78]:
class Metaframework:
    
    def __init__(self,path,stage = None):
        self.path = path
        
        parser = ConfigParser()
        parser.read(f'{os.path.join(self.path,"SNOWFLAKE_CREDS.cfg")}')
        account = parser.get('my_api','account')
        user = parser.get('my_api','user')
        password = parser.get('my_api','password')
        database = parser.get('my_api','database')
        schema = parser.get('my_api','schema')
        role = parser.get('my_api','role')
        warehouse = parser.get('my_api','warehouse')
       
        conn = snowflake.connector.connect( user = user ,password = password,account = account ,warehouse = warehouse ,
                                           database = database,schema = schema,role = role)
        self.curs = conn.cursor()
        
        if stage is None:
            self.stage = 'internal'
            self.curs.execute(f'CREATE OR REPLACE STAGE {self.stage}')
        else:
            self.stage = stage
            self.curs.execute(f'CREATE STAGE IF NOT EXISTS {self.stage};')
        
        print(f'{self.stage} has been created')
        
        
    def create_table(self,target_table,file_path,delimiter=',',replace=False):
        file_format = f"CREATE OR REPLACE FILE FORMAT my_csv_format TYPE = 'CSV' FIELD_DELIMITER = '{delimiter}'  PARSE_HEADER = TRUE;"
        self.curs.execute(file_format)
        statement = ' OR REPLACE TABLE' if replace else "TABLE IF NOT EXISTS "
        query = f"""
                    CREATE {statement} {target_table.upper()}
                    USING TEMPLATE (
                           SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                           WITHIN GROUP (ORDER BY ORDER_ID)
                           FROM TABLE(
                               INFER_SCHEMA(
                                     LOCATION=>'@{self.stage}/{file_path}',
                                     FILE_FORMAT=>'my_csv_format')));
                """
        print(query)
        self.curs.execute(query)
        print(f'\n{target_table.upper()} has been created\n')
        
        
    def ingest_csv_file(self, file, table_name, skip_header = 1, field_delimiter = ',', target_operation = 'append', *args):
         
         # Put file to stage 
        import os
        full_path = os.path.join(self.path,'inbound',file)
        put_file = f"PUT file://{full_path} @{self.stage} AUTO_COMPRESS = FALSE OVERWRITE = TRUE"
        print(put_file)
        self.curs.execute(put_file)
        
        #creating table with path and delimiter from metadata
        print(f'self.create_table({table_name},{file},delimiter = "{field_delimiter}")')
        self.create_table(table_name,file,delimiter=field_delimiter)
        
        # append or overwrite
        if target_operation.casefold() == 'overwrite':
            print(f"{table_name.upper()} is truncated")
            truncate_query = f"TRUNCATE TABLE {table_name.upper()}"  
            curs.execute(truncate_query)
        
        
        # Copy CSV file data into Snowflake table
        copy_query = f"""
                            COPY INTO {table_name.upper()} 
                            FROM '@{database}.{schema}.{self.stage}/{file}'
                            FILE_FORMAT = (
                                    TYPE = CSV   
                                    SKIP_HEADER = {skip_header}
                                    FIELD_DELIMITER = '{field_delimiter}')
                            on_error = continue;"""
        print(copy_query)
        self.curs.execute(copy_query)

        
    def main(self):
        
        csv_folder = os.path.join(self.path,'inbound')
        archive_folder = os.path.join(self.path,'archive')
        missing_folder = os.path.join(self.path,'missing')
        failed_folder = os.path.join(self.path,'failed')

        df = pd.read_csv('mapping/metadata1.csv',header = 0)

        for file in os.listdir(os.path.join(csv_folder)):
            matching = False
            for ind in df.index:
                if fnmatch.fnmatch(file, df['src_file_prefix'][ind]+'*.csv'):
                    if df['is_active'][ind] == 'Y':
                        delim = df['src_file_delim'][ind]
                        
                        print(f"self.ingest_csv_file({file}),{df['tgt_table'][ind]},field_delimiter = delim)")
                        self.ingest_csv_file(file,df['tgt_table'][ind],field_delimiter = delim)
                        shutil.move(os.path.join(csv_folder, file),archive_folder)

                    else:
                        print(f'Skipping CSV file {file} for table {df["tgt_table"][ind]} due to inactive flag.')
                        # Move the skipped file to the missing mapping folder
                        shutil.move(os.path.join(csv_folder, file), failed_folder)
                        print(f'CSV file "{file}" moved to missing mapping folder "{failed_folder}".')
                    matching = True
                    break
            if not matching:
                    print(f'\n No file name pattern found for CSV file "{file}" . Skipping the file.\n')
                    shutil.move(os.path.join(csv_folder, file), missing_folder)
                    print(f'\nCSV file "{file}" moved to missing mapping folder {missing_folder}".\n')
        #           
        self.curs.close()

c1 = Metaframework(r'D:\snowpark')
c1.main()

internal has been created
self.ingest_csv_file(data_85.csv),target_table1,field_delimiter = delim)
PUT file://D:\snowpark\inbound\data_85.csv @internal AUTO_COMPRESS = FALSE OVERWRITE = TRUE
self.create_table(target_table1,data_85.csv,delimiter = ",")

                    CREATE TABLE IF NOT EXISTS  TARGET_TABLE1
                    USING TEMPLATE (
                           SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
                           WITHIN GROUP (ORDER BY ORDER_ID)
                           FROM TABLE(
                               INFER_SCHEMA(
                                     LOCATION=>'@internal/data_85.csv',
                                     FILE_FORMAT=>'my_csv_format')));
                

TARGET_TABLE1 has been created


                            COPY INTO TARGET_TABLE1 
                            FROM '@DB1.SH1.internal/data_85.csv'
                            FILE_FORMAT = (
                                    TYPE = CSV   
                                  

In [47]:
parser = ConfigParser()
parser.read(os.path.join(r"D:\snowpark","SNOWFLAKE_CREDS.cfg"))
account = parser.get('my_api','account')
user = parser.get('my_api','user')
password = parser.get('my_api','password')
database = parser.get('my_api','database')
schema = parser.get('my_api','schema')
role = parser.get('my_api','role')
warehouse = parser.get('my_api','warehouse')

conn = snowflake.connector.connect( user = user ,password = password,account = account ,warehouse = warehouse ,
                                   database = database,schema = schema,role = role)
curs = conn.cursor()
file_path = 'D:' + '/' + 'snowpark' + '/' + 'inbound' + '/' + 'data_85.csv'
print(file_path)
curs.execute(f"PUT 'file://{file_path}' @internal/inbound/data_87.csv AUTO_COMPRESS = FALSE OVERWRITE = TRUE")

D:/snowpark/inbound/data_85.csv


<snowflake.connector.cursor.SnowflakeCursor at 0x1ed6637d280>

In [None]:
print('os.path.join(r"D:\snowpark","SNOWFLAKE_CREDS.cfg")}')
    

In [42]:
for i in os.listdir(r'D:\snowpark'):
    print(i)

.git
.ipynb_checkpoints
archive
failed
final.ipynb
final_revised.ipynb
inbound
logs
mapping
missing
SNOWFLAKE_CREDS.cfg


In [59]:
FILE = os.path.join(r"D:\snowpark",'inbound','sales_2023.csv')

In [60]:
curs.execute(f"PUT file://{FILE} @internal ")

<snowflake.connector.cursor.SnowflakeCursor at 0x1ed6637d280>

In [65]:
curs.execute(f'PUT file://D:\snowpark\inbound\data_85.csv @internal AUTO_COMPRESS = FALSE OVERWRITE = TRUE')

<snowflake.connector.cursor.SnowflakeCursor at 0x1ed6637d280>

In [63]:
FILE

'D:\\snowpark\\inbound\\sales_2023.csv'

In [None]:
FILE = FILE.replace('//','\')