### Import packages
#### Create connection to Database

In [1]:
!pip install sqlalchemy
!pip install pandas
from sqlalchemy import create_engine
import pandas as pd

# Connect to the database

# Define the database connection string
db_connection_string = 'sqlite:///chinook.db' 

# Create SQLAlchemy engine to connect to the database
db_engine = create_engine(url=db_connection_string)

# Establish connection
db_conn = db_engine.connect()



#### Read table from database

In [None]:
# Approach 1: Use Pandas.read_sql_table to read all columns from 'customers' table

table_name = 'customers'
df1 = pd.read_sql_table(table_name, con=db_conn)
df1.tail(5)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
54,55,Mark,Taylor,,421 Bourke Street,Sidney,NSW,Australia,2010.0,+61 (02) 9332 3633,,mark.taylor@yahoo.au,4
55,56,Diego,Gutiérrez,,307 Macacha Güemes,Buenos Aires,,Argentina,1106.0,+54 (0)11 4311 4333,,diego.gutierrez@yahoo.ar,4
56,57,Luis,Rojas,,"Calle Lira, 198",Santiago,,Chile,,+56 (0)2 635 4444,,luisrojas@yahoo.cl,5
57,58,Manoj,Pareek,,"12,Community Centre",Delhi,,India,110017.0,+91 0124 39883988,,manoj.pareek@rediff.com,3
58,59,Puja,Srivastava,,"3,Raj Bhavan Road",Bangalore,,India,560001.0,+91 080 22289999,,puja_srivastava@yahoo.in,3


In [3]:
# Approach 2: Use Pandas.read_sql_query to read these columns

table_name = 'customers'
columns = ['CustomerId', 'FirstName', 'LastName', 'Phone', 'Email', 'SupportRepId']
df2 = pd.read_sql_query(
    sql='SELECT CustomerId, FirstName, LastName FROM customers', 
    con=db_conn
)
df2.tail(5)

Unnamed: 0,CustomerId,FirstName,LastName
54,55,Mark,Taylor
55,56,Diego,Gutiérrez
56,57,Luis,Rojas
57,58,Manoj,Pareek
58,59,Puja,Srivastava


In [4]:
# Inspect tables in the database

from sqlalchemy import inspect

inspector = inspect(db_engine)
print(inspector.get_table_names())

['albums', 'artists', 'customers', 'employees', 'genres', 'invoice_items', 'invoices', 'media_types', 'playlist_track', 'playlists', 'tracks']


#### Config-Driven Ingestion (hard-coded folder)

In [5]:
import os

# Define the folder where CSVs will be saved
folder_path = "destination/chinook"

# List of tables to extract from the database
all_tables = [
    'customers', 'albums', 'artists', 'employees',
    'genres', 'invoice_items', 'invoices',
    'media_types', 'playlist_track', 'playlists', 'tracks'
]

# Ensure the folder exists, create if it doesn't
os.makedirs(folder_path, exist_ok=True)

# Loop over each table, extract it, and save as CSV
for table_name in all_tables:
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv', index=False)
    print('Completed!\n')


Extracting customers ...
Completed!

Extracting albums ...
Completed!

Extracting artists ...
Completed!

Extracting employees ...
Completed!

Extracting genres ...
Completed!

Extracting invoice_items ...
Completed!

Extracting invoices ...
Completed!

Extracting media_types ...
Completed!

Extracting playlist_track ...
Completed!

Extracting playlists ...
Completed!

Extracting tracks ...
Completed!



#### Config-driven Ingestion using YAML

In [7]:
!pip install pyyaml
import yaml
import io
from datetime import datetime

# Load configuration from a YAML file
config_file = 'config.yml'
with open(config_file, 'r') as f:
    config = yaml.safe_load(f)

# Function to extract a table and save as CSV
def extract_table(table_name, con, folder_path):
    """
    Extracts a table from the database and saves it as a CSV file.
    Args:
            table_name (str): Name of the table to extract
            con (SQLAlchemy Connection): Database connection
            folder_path (str): Path to save the CSV file

    """

     # Ensure folder exists
    os.makedirs(folder_path, exist_ok=True)
    print(f'Extracting {table_name} ...')

    # Read table into a pandas DataFrame
    df = pd.read_sql_table(table_name=table_name, con=con)

    # Get current date as YYYYMMDD
    current_date = datetime.now().strftime('%Y%m%d')

    # Save CSV with dynamic date prefix
    df.to_csv(f'{folder_path}/{current_date}_{table_name}.csv', index=False)



# Function to get a database connection
def get_connection(db_type, host):
    """
    Returns a database connection based on db_type and host.
     Args:
            db_type (str): Type of the database (e.g., 'sqlite', 'Oracle')
            host (str): Database host or file name
    
        Returns:\n",
            SQLAlchemy Connection object

    """

    if db_type == 'sqlite':
        db_connection_string = f'sqlite:///{host}.db'
        db_engine = create_engine(url=db_connection_string)
        return db_engine.connect()
    elif db_type == 'Oracle':
        db_connection_string = f'Oracle://{host}:1234'
        db_engine = create_engine(url=db_connection_string)
        return db_engine.connect()

# Establish database connection using config
db_conn = get_connection(**config.get('source').get('database'))

# Extract tables defined in YAML config
for table_name in config.get('source').get('table'):
    extract_table(table_name=table_name, con=db_conn, folder_path='destination/config_driven')

# Print the database configuration
print(config.get('source').get('database'))


Extracting albums ...
Extracting artists ...
Extracting customers ...
Extracting employees ...
Extracting genres ...
Extracting invoice_items ...
Extracting invoices ...
Extracting media_types ...
Extracting playlist_track ...
Extracting playlists ...
Extracting tracks ...
{'host': 'chinook', 'db_type': 'sqlite'}


### Metadata-Driven Ingestion

In [8]:
# SQL query to get all user-defined tables
metadata_sql = """
SELECT name 
FROM sqlite_master 
WHERE type='table' AND name NOT LIKE 'sqlite_%'
"""

# Execute the query and store result in a DataFrame
table_df = pd.read_sql_query(metadata_sql, con=db_conn)

# Convert table names to a Python list for iteration
names = list(table_df['name'])

print("Tables discovered for metadata-driven ingestion:")
print(names)

# Define a reusable function to extract a table and save as CSV
def extract_table(table_name, con, folder_path):
    """
    Extracts a table from the database and saves it as a CSV file.
    Args:
            table_name (str): Name of the table to extract
            con (SQLAlchemy Connection): Database connection
            folder_path (str): Folder path to save the CSV

    """

    # Ensure the destination folder exists
    os.makedirs(folder_path, exist_ok=True)

    print(f'Extracting {table_name} ...')

    # Read the table into a pandas DataFrame
    df = pd.read_sql_table(table_name=table_name, con=db_conn)

    # Save the DataFrame as a CSV file
    df.to_csv(f'{folder_path}/{table_name}.csv', index=False)

    print('Completed!\n')

# Loop through all discovered tables and extract them
for name in names:
    extract_table(table_name=name, con=db_conn, folder_path='destination/metadata')

Tables discovered for metadata-driven ingestion:
['albums', 'artists', 'customers', 'employees', 'genres', 'invoices', 'invoice_items', 'media_types', 'playlists', 'playlist_track', 'tracks']
Extracting albums ...
Completed!

Extracting artists ...
Completed!

Extracting customers ...
Completed!

Extracting employees ...
Completed!

Extracting genres ...
Completed!

Extracting invoices ...
Completed!

Extracting invoice_items ...
Completed!

Extracting media_types ...
Completed!

Extracting playlists ...
Completed!

Extracting playlist_track ...
Completed!

Extracting tracks ...
Completed!

