In [23]:
from sqlalchemy import create_engine, text
import pandas as pd
import os

# Get password and user ID from environment variable
pwd = os.environ['PGPASS']
uid = os.environ['PGUID']

# SQL Server DB details
driver = "ODBC Driver 17 for SQL Server"
server = ""
database = ""

# Extract data from SQL Server
def extract():
    src_conn = None
    try:
        # Create an SQLAlchemy engine for SQL Server
        src_engine = create_engine(f'mssql+pyodbc://{uid}:{pwd}@{server}/{database}?driver={driver}')
        src_conn = src_engine.connect()

        # Use SQLAlchemy text() for the query
        query = text("""
            SELECT s.name as schema_name, t.name as table_name
            FROM sys.tables t
            JOIN sys.schemas s ON t.schema_id = s.schema_id
            WHERE s.name = 'Production' AND t.name IN ('Product', 'ProductCategory')
        """)
        src_tables = src_conn.execute(query).fetchall()

        if not src_tables:
            print("No tables found or returned by the query.")
            return

        for schema_name, table_name in src_tables:
            full_table_name = f"{schema_name}.{table_name}"
            print(f"Processing table: {full_table_name}")
            df = pd.read_sql_query(f'select * FROM {full_table_name}', src_conn)
            print(f"Data extracted for {full_table_name}, number of rows: {len(df)}")
            load(df, full_table_name)

    except Exception as e:
        print("Data extract error: " + str(e))
    finally:
        if src_conn:
            src_conn.close()

# Load data to PostgreSQL
def load(df, tbl):
    try:
        rows_imported = 0
        pg_conn_string = f'postgresql://{uid}:{pwd}@localhost:5432/AdventureWorks'
        engine = create_engine(pg_conn_string)
        # Adjust the table name format if needed
        formatted_table_name = 'stg_' + tbl.replace('.', '_')
        print(f'Importing rows {rows_imported} to {rows_imported + len(df)}... for table {formatted_table_name}')

        df.to_sql(formatted_table_name, engine, if_exists='replace', index=False)
        rows_imported += len(df)
        print("Data imported successful")
    except Exception as e:
        print("Data load error for table", formatted_table_name, ":", str(e))


try:
    #call extract function
    extract()
except Exception as e:
    print("Error while extracting data: " + str(e))

Processing table: Production.Product
Data extracted for Production.Product, number of rows: 504
Data load error for table Production.Product : No module named 'psycopg2'
Processing table: Production.ProductCategory
Data extracted for Production.ProductCategory, number of rows: 4
Data load error for table Production.ProductCategory : No module named 'psycopg2'
