### Import packages
#### Create connection to Database

In [1]:
from sqlalchemy import create_engine
import pandas as pd

# Connect to the database
db_connection_string = 'sqlite:///chinook.db'
db_engine = create_engine(url=db_connection_string)
db_conn = db_engine.connect()

#### Read table from database

In [2]:
# HINTS
# Load data into DataFrame
# user Pandas to read data from a table into a DataFrame

In [3]:
# # Approach 1: Use Pandas.read_sql_table to read all columns from 'customers' table
table_name = 'customers'
df = pd.read_sql_table(table_name=table_name, con=db_conn)
df.tail(5)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
54,55,Mark,Taylor,,421 Bourke Street,Sidney,NSW,Australia,2010.0,+61 (02) 9332 3633,,mark.taylor@yahoo.au,4
55,56,Diego,Gutiérrez,,307 Macacha Güemes,Buenos Aires,,Argentina,1106.0,+54 (0)11 4311 4333,,diego.gutierrez@yahoo.ar,4
56,57,Luis,Rojas,,"Calle Lira, 198",Santiago,,Chile,,+56 (0)2 635 4444,,luisrojas@yahoo.cl,5
57,58,Manoj,Pareek,,"12,Community Centre",Delhi,,India,110017.0,+91 0124 39883988,,manoj.pareek@rediff.com,3
58,59,Puja,Srivastava,,"3,Raj Bhavan Road",Bangalore,,India,560001.0,+91 080 22289999,,puja_srivastava@yahoo.in,3


In [4]:
# # Approach 2: Use Pandas.read_sql_query to read these columns
# table_name = 'customers'
# columns = ['CustomerId', 'FirstName', 'LastName', 'Phone', 'Email', 'SupportRepId']
# df = pd.read_sql_query()

In [None]:
import os

folder_path = 'destination/chinook'

all_tables = [
    'albums',
    'artists',
    'customers',
    'employees',
    'genres',
    'invoice_items',
    'invoices',
    'media_types',
    'playlist_track',
    'playlists',
    'tracks',
    'tracks_asd'
]

for table_name in all_tables:
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv')
    print('Completed!\n')

#### Config-Driven Ingestion

In [None]:
# HINTS
# Read configs stored in the 'config.yml' file

# Read yaml file
# Package: yaml (pip install pyyaml)
# Function: load / safe_load
# Print it after loading

import yaml
import io

config_file = 'config.yml'

f = open(config_file, 'r')

config = yaml.safe_load(f)
config

{'source': {'database': {'host': 'chinook', 'db_type': 'sqlite'},
  'table': ['albums',
   'artists',
   'customers',
   'employees',
   'genres',
   'invoice_items',
   'invoices',
   'media_types',
   'playlist_track',
   'playlists',
   'tracks']}}

In [None]:
# Use loop function to read tables within config.source.table
# Export output into CSV
# Name Convention: '<date>__<table_name>.csv'
# Path: destination/config_driven/
# note: use os.makedirs() if path is not exists

def extract_table(table_name, con, folder_path):
    os.makedirs(folder_path)
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv')
    print('Completed!\n')


def get_connection(db_type, host):
    if db_type == 'sqlite':
        db_connection_string = f'sqlite:///{host}.db'
        db_engine = create_engine(url=db_connection_string)
        return db_engine.connect()
    elif db_type == 'Oracle':
        db_connection_string = 'Oracle://{host}:1234'
        return db_engine.connect()

db_conn = get_connection(**config.get('source').get('database'))

extract_table(table_name='albums', con=db_conn, folder_path='destination/config_driven')

### Metadata-Driven Ingestion

In [16]:
# # HINTS
# # Read metadata from the database inlcuding tables / columns
# sqlite_metadata_table = 'sqlite_master'
# sqlite_metadata_condition = "type = 'table'"
# metadata_sql = f""" select 1"""
# print(metadata_sql)
# table_df = pd.read_sql_query(metadata_sql)
# print(table_df)

metadata_sql = """select name from sqlite_master where 1=1 and type = 'table'"""
table_df = pd.read_sql_query(metadata_sql, con=db_conn)
table_df

Unnamed: 0,name
0,albums
1,sqlite_sequence
2,artists
3,customers
4,employees
5,genres
6,invoices
7,invoice_items
8,media_types
9,playlists


In [None]:
metadata_sql = """select name from sqlite_master where 1=1 and type = 'table'"""
table_df = pd.read_sql_query(metadata_sql, con=db_conn)

names = [tb for tb in list(table_df['name']) if tb not in ('sqlite_stat1', 'sqlite_sequence')]
names

['albums',
 'artists',
 'customers',
 'employees',
 'genres',
 'invoices',
 'invoice_items',
 'media_types',
 'playlists',
 'playlist_track',
 'tracks']

In [25]:
metadata_sql = """select name from sqlite_master where 1=1 and type = 'table' 	and name not like 'sqlite_%'"""
table_df = pd.read_sql_query(metadata_sql, con=db_conn)

names = list(table_df['name'])

import os 



# loop for each table from the DataFrame
# read and extract table
# save to path: chinook/metadata_driven/
# note: use os.makedirs() if path is not exists
def extract_table(table_name, con, folder_path):
    os.makedirs(folder_path, exist_ok=True)
    print(f'Extracting {table_name} ...')
    df = pd.read_sql_table(table_name=table_name, con=db_conn)
    df.to_csv(f'{folder_path}/{table_name}.csv', index=False)
    print('Completed!\n')

for name in names:
    extract_table(table_name=name, con=db_conn, folder_path='destination/metadata')

Extracting albums ...
Completed!

Extracting artists ...
Completed!

Extracting customers ...
Completed!

Extracting employees ...
Completed!

Extracting genres ...
Completed!

Extracting invoices ...


Completed!

Extracting invoice_items ...
Completed!

Extracting media_types ...
Completed!

Extracting playlists ...
Completed!

Extracting playlist_track ...
Completed!

Extracting tracks ...
Completed!

