### Import packages
#### Create connection to Database

In [1]:
from sqlalchemy import create_engine
import pandas as pd

# Connect to the database
db_connection_string = 'sqlite:///chinook.db'
db_engine = create_engine(url=db_connection_string)
db_conn = db_engine.connect()

#### Read table from database

In [None]:
# HINTS
# Load data into DataFrame
# user Pandas to read data from a table into a DataFrame

In [None]:
# # Approach 1: Use Pandas.read_sql_table to read all columns from 'customers' table
table_name = 'customers'
df = pd.read_sql_table(table_name=table_name, con=db_conn)
df.tail(5)

In [18]:
# # Approach 2: Use Pandas.read_sql_query to read these columns
# table_name = 'customers'
# columns = ['CustomerId', 'FirstName', 'LastName', 'Phone', 'Email', 'SupportRepId']
df = pd.read_sql_query(sql='select CustomerId, FirstName, LastName, Phone, Email from customers', con=db_conn)
df.tail(5)

Unnamed: 0,CustomerId,FirstName,LastName,Phone,Email
54,55,Mark,Taylor,+61 (02) 9332 3633,mark.taylor@yahoo.au
55,56,Diego,Gutiérrez,+54 (0)11 4311 4333,diego.gutierrez@yahoo.ar
56,57,Luis,Rojas,+56 (0)2 635 4444,luisrojas@yahoo.cl
57,58,Manoj,Pareek,+91 0124 39883988,manoj.pareek@rediff.com
58,59,Puja,Srivastava,+91 080 22289999,puja_srivastava@yahoo.in


In [6]:
df.to_csv('customers.csv')

In [7]:
tables = ["customers", "albums"]

for t in tables:
    print(t)
    pd.read_sql_table(t, con=db_conn).head()
    

customers
albums


#### Homework 23 Nov

In [None]:
from sqlalchemy import inspect
from pathlib import Path

inspector = inspect(db_engine)
table_names = inspector.get_table_names()
print(table_names)

destination = Path("destination")
destination.mkdir(exist_ok=True)

for table in table_names:
    df = pd.read_sql_table(table, con=db_conn)
    df.to_csv(destination / f"{table}.csv")

In [None]:
pip install pyyaml

In [None]:
import os
import yaml
import io
config_file = 'config.yml'
f = open(config_file, 'r')
config = yaml.safe_load(f)
config

def extract_table(table_name, con, folder_path):
    os.makedirs(folder_path)
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv')
    print('Completed!\n')

def get_connection(db_type, host):
    if db_type == 'sqlite':
        db_connection_string = f'sqlite:///{host}.db'
        db_engine = create_engine(url=db_connection_string)
        return db_engine.connect()
    elif db_type == 'Oracle':
        db_connection_string = 'Oracle://{host}:1234'
        return db_engine.connect()
db_conn = get_connection(**config.get('source').get('database'))
extract_table(table_name='albums', con=db_conn, folder_path='destination/config_driven')

In [None]:
print(config.get('source').get('database'))

In [21]:
import os

import yaml
import io
config_file = 'config.yml'
f = open(config_file, 'r')
config = yaml.safe_load(f)
config

def extract_table(table_name, con, folder_path):
    os.makedirs(folder_path, exist_ok=True)
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=con)
    df.to_csv(f'{folder_path}/20251127_{table_name}.csv',index=False)
    print('Completed!\n')

def get_connection(db_type, host):
    if db_type == 'sqlite':
        db_connection_string = f'sqlite:///{host}.db'
        db_engine = create_engine(url=db_connection_string)
        return db_engine.connect()
    elif db_type == 'Oracle':
        db_connection_string = 'Oracle://{host}:1234'
        return db_engine.connect()
db_conn = get_connection(**config.get('source').get('database'))

for table_name in config.get('source').get('table'):
    extract_table(table_name=table_name, con=db_conn, folder_path='destination/config_driven')

Extracting albums ...
Completed!

Extracting artists ...
Completed!

Extracting customers ...
Completed!

Extracting employees ...
Completed!

Extracting genres ...
Completed!

Extracting invoice_items ...
Completed!

Extracting invoices ...
Completed!

Extracting media_types ...
Completed!

Extracting playlist_track ...
Completed!

Extracting playlists ...
Completed!

Extracting tracks ...
Completed!



#### Config-Driven Ingestion

In [None]:
metadata_sql = """select name from sqlite_master where 1=1 and type = 'table'"""
table_df = pd.read_sql_query(metadata_sql, con=db_conn)
names = [tb for tb in list(table_df['name']) if tb not in ('sqlite_stat1', 'sqlite_sequence')]
import os 


# loop for each table from the DataFrame
# read and extract table
# save to path: chinook/metadata_driven/
# note: use os.makedirs() if path is not exists
def extract_table(table_name, con, folder_path):
    os.makedirs(folder_path, exist_ok=True)
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv', index=False)
    print('Completed!\n')
for name in names:
    extract_table(table_name=name, con=db_conn, folder_path='destination/metadata')

In [15]:
# # HINTS
# # Read configs stored in the 'config.yml' file

# # Read yaml file
# # Package: yaml (pip install pyyaml)
# # Function: load / safe_load
# # Print it after loading

# import yaml
# import json

# config_file = 'config.yml'
import yaml

# # Step 1 — Load the YAML file
with open("config.yml", "r") as f:
    config = yaml.safe_load(f)

print(config)

{'source': {'database': {'host': 'chinook', 'db_type': 'sqlite'}, 'table': ['albums', 'artists', 'customers', 'employees', 'genres', 'invoice_items', 'invoices', 'media_types', 'playlist_track', 'playlists', 'tracks']}}


In [None]:
# Use loop function to read tables within config.source.table
# Export output into CSV
# Name Convention: '<date>__<table_name>.csv'
# Path: chinook/config_driven/
# note: use os.makedirs() if path is not exists

import os
import datetime

# # Step 2: get table list from config:
tables = config["source"]["table"] #recco: tables = config.get("source", {}).get("table", []) để error proof
# print(tables)

# # Step 3: Create output folder
path = "destination/config_driven"
os.makedirs(path, exist_ok=True)

# # Step 4: naming logic
date = datetime.date.today().strftime("%Y-%m-%d")

# # Step 5: Loop & Ingest
for table in tables: 
    print("Extracting " + table)
    df = pd.read_sql_table(table_name=table, con=db_conn)

    output_file = path + "/" + date + "__" + table + ".csv"
    df.to_csv(output_file, index=False) # để pandas ko tự đánh index cho rows

    print("Saved " + output_file +"\n")


Extracting albums
Saved destination/config_driven/2025-12-15__albums.csv

Extracting artists
Saved destination/config_driven/2025-12-15__artists.csv

Extracting customers
Saved destination/config_driven/2025-12-15__customers.csv

Extracting employees
Saved destination/config_driven/2025-12-15__employees.csv

Extracting genres
Saved destination/config_driven/2025-12-15__genres.csv

Extracting invoice_items
Saved destination/config_driven/2025-12-15__invoice_items.csv

Extracting invoices
Saved destination/config_driven/2025-12-15__invoices.csv

Extracting media_types
Saved destination/config_driven/2025-12-15__media_types.csv

Extracting playlist_track
Saved destination/config_driven/2025-12-15__playlist_track.csv

Extracting playlists
Saved destination/config_driven/2025-12-15__playlists.csv

Extracting tracks
Saved destination/config_driven/2025-12-15__tracks.csv



In [None]:
# # HINTS
# # Read metadata from the database inlcuding tables / columns
# sqlite_metadata_table = 'sqlite_master'
# sqlite_metadata_condition = "type = 'table'"
# metadata_sql = f""" select 1"""
# print(metadata_sql)
# table_df = pd.read_sql_query(metadata_sql)
# print(table_df)

# # STEP 1: Setting metadata query
metadata_sql = """SELECT name
FROM sqlite_master 
WHERE type = 'table' AND name NOT LIKE 'sqlite%' """
# print(metadata_sql)

# # STEP 2: Get metadata to DataFrame (df))
table_df = pd.read_sql_query(metadata_sql, con=db_conn)
print(table_df)


              name
0           albums
1          artists
2        customers
3        employees
4           genres
5         invoices
6    invoice_items
7      media_types
8        playlists
9   playlist_track
10          tracks


In [23]:
# loop for each table from the DataFrame
# read and extract table
# save to path: chinook/metadata_driven/
# note: use os.makedirs() if path is not exists

import os
import datetime

# # Step 3: Create output folder
path = "destination/metadata_driven"
os.makedirs(path, exist_ok=True) #exist_ok=True là để ensure ko bị lỗi exist

# # Step 4: naming logic
date = datetime.date.today().strftime("%Y-%m-%d")

# # Step 5: Loop & Ingest
for table in table_df["name"]: 
    print("Extracting " + table)
    df = pd.read_sql_table(table_name=table, con=db_conn)

    output_file = path + "/" + date + "__" + table + ".csv"
    df.to_csv(output_file, index=False) # để pandas ko tự thêm cột index cho rows

    print("Saved " + output_file +"\n")

Extracting albums
Saved destination/metadata_driven/2025-12-15__albums.csv

Extracting artists
Saved destination/metadata_driven/2025-12-15__artists.csv

Extracting customers
Saved destination/metadata_driven/2025-12-15__customers.csv

Extracting employees
Saved destination/metadata_driven/2025-12-15__employees.csv

Extracting genres
Saved destination/metadata_driven/2025-12-15__genres.csv

Extracting invoices
Saved destination/metadata_driven/2025-12-15__invoices.csv

Extracting invoice_items
Saved destination/metadata_driven/2025-12-15__invoice_items.csv

Extracting media_types
Saved destination/metadata_driven/2025-12-15__media_types.csv

Extracting playlists
Saved destination/metadata_driven/2025-12-15__playlists.csv

Extracting playlist_track
Saved destination/metadata_driven/2025-12-15__playlist_track.csv

Extracting tracks
Saved destination/metadata_driven/2025-12-15__tracks.csv

