### Variables

In [None]:
DB_NAME = 'db_demo_rh'
DB_USER = ''
DB_PASSWORD = ''
DB_HOST = ''
DB_PORT = '5432'
SCHEMA = ''

S3_BUCKET = '/Volumes/path/to/volume/'
TEMP_DIR = '/Volumes/path/to/volume/tmp/'

### Functions

In [None]:
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, inspect
import pandas as pd
import os
import zipfile
import binascii
import io
%pip install openpyxl xlrd

def get_tables():
    engine = get_engine()
    inspector = inspect(engine)
    tables = inspector.get_table_names(schema=SCHEMA)
    engine.dispose()
    return tables


def get_table_columns(table_name):
    engine = get_engine()
    inspector = inspect(engine)
    columns = [col['name'] for col in inspector.get_columns(table_name, schema=SCHEMA)]
    engine.dispose()
    return columns


def get_engine():
    connection_string = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    engine = create_engine(connection_string)
    return engine


def load_csv_to_table(df, table_name):
    engine = get_engine()
    df.to_sql(table_name, engine, schema=SCHEMA, if_exists='append', index=False)
    print(f"Records inserted to the table: {table_name}")



def decode_and_extract_zip(binary_data):
    with zipfile.ZipFile(io.BytesIO(binary_data)) as zip_ref:
        extracted_files = {}
        for file_info in zip_ref.infolist():
            with zip_ref.open(file_info) as file:
                extracted_files[file_info.filename] = file.read().decode('utf-8')
        return extracted_files


def bulk_import():
    tables = get_tables()
    for table in tables:
        binary_path = f"{S3_BUCKET}{table}.csv"
        try:
            
            with open(binary_path, 'rb') as binary_file:
                binary_data = binary_file.read()

            
            df = pd.read_excel(io.BytesIO(binary_data), header=None)

            columns = get_table_columns(table)
            df.columns = columns

            load_csv_to_table(df, table)
        except FileNotFoundError:
            print(f"File {binary_path} not found, skipped")
        except Exception as e:
            print(f"Error when processing the file {binary_path}: {e}")



### Execute

In [None]:
bulk_import()