In [1]:
import pandas as pd

In [2]:
import sqlalchemy

In [3]:
sqlalchemy.__version__

'1.4.5'

In [4]:
# Connection with dataset

connection_uri = "sqlite:///Chinook.sqlite"
#connection_uri = "postgresql://user:password@localhost:5432/Chinook"

In [5]:
# Create engine: db_engine
db_engine = sqlalchemy.create_engine(connection_uri)

## ETL
### 1. Preprocessing of Extract

In [6]:
# Names of the tables
# with sqlalchemy v1.3
# table_names = db_engine.table_names()
# table_names

# with sqlalchemy v1.4
inspector = sqlalchemy.inspect(db_engine)
table_names = inspector.get_table_names()
table_names

['Album',
 'Artist',
 'Customer',
 'Disaster_ETL2',
 'Employee',
 'Genre',
 'Invoice',
 'InvoiceLine',
 'MediaType',
 'Playlist',
 'PlaylistTrack',
 'Track']

In [7]:
# Open engine connection: con
con = db_engine.connect()

In [8]:
# Perform some query
qr_invoice = con.execute("SELECT * FROM Invoice")

type(qr_invoice)

sqlalchemy.engine.cursor.LegacyCursorResult

In [9]:
qr_invoice.keys()

RMKeyView(['InvoiceId', 'CustomerId', 'InvoiceDate', 'BillingAddress', 'BillingCity', 'BillingState', 'BillingCountry', 'BillingPostalCode', 'Total'])

In [10]:
df = pd.DataFrame(qr_invoice.fetchall())
df.columns = qr_invoice.keys()

In [11]:
df

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86
...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,,Portugal,,8.91
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,,Finland,00530,13.86


### 2. Preprocessing of Transforming

##### 2.1. Calculating average billing per Invoice by Country.

In [12]:
# Calculating average billing by Country
df_g = df.groupby(['BillingCountry'])[['Total']].mean()
df_g

Unnamed: 0_level_0,Total
BillingCountry,Unnamed: 1_level_1
Argentina,5.374286
Australia,5.374286
Austria,6.088571
Belgium,5.374286
Brazil,5.431429
Canada,5.427857
Chile,6.66
Czech Republic,6.445714
Denmark,5.374286
Finland,5.945714


In [13]:
df_g = df_g.reset_index()
df_g.rename(columns = {"Total":"Average"}, inplace=True)
df_g.head(5)

Unnamed: 0,BillingCountry,Average
0,Argentina,5.374286
1,Australia,5.374286
2,Austria,6.088571
3,Belgium,5.374286
4,Brazil,5.431429


In [14]:
# Joining calculated Average into Dataset
df = df.merge(df_g, how="left", left_on = "BillingCountry", right_on = "BillingCountry")
df.head(5)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Average
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96,5.66
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912


##### 2.2. Processing of Null values in features fill with None and '99999'.

In [15]:
# Print out the number of missing values per column of Each Features
display('The Number of Null for Each Features', df.isnull().sum())

'The Number of Null for Each Features'

InvoiceId              0
CustomerId             0
InvoiceDate            0
BillingAddress         0
BillingCity            0
BillingState         202
BillingCountry         0
BillingPostalCode     28
Total                  0
Average                0
dtype: int64

In [16]:
 df= df.fillna({"BillingState": "None", "BillingPostalCode":"99999"})

In [17]:
display('The Number of Null for Each Features after transformed: ', df.isnull().sum())

'The Number of Null for Each Features after transformed: '

InvoiceId            0
CustomerId           0
InvoiceDate          0
BillingAddress       0
BillingCity          0
BillingState         0
BillingCountry       0
BillingPostalCode    0
Total                0
Average              0
dtype: int64

In [18]:
df

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Average
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96,5.660000
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912
...,...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96,5.747912
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94,5.427857
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,,Portugal,99999,8.91,5.517143
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,,Finland,00530,13.86,5.945714


### 3. Preprocessing of Loading
* Loading finally project to sql

In [19]:
con

<sqlalchemy.engine.base.Connection at 0x1d18db24408>

In [20]:
sqlite_table = "New_Invoice"
df.to_sql(sqlite_table, con, if_exists='fail')

In [21]:
con.close()

### 4. Creating functions for ETL

##### 4.1. Creating functions for Extract: 
Function name: **extract_table_to_pandas**

In [22]:
# Function to extract for taken engine and connection from database
def extract_database(path):
    '''
    Input:
        path: Path of the database
        
    Output:
        return 
        db_engine : The engine is get database
        db_connect: The connect is get connection with engine
    '''
    db_engine = sqlalchemy.create_engine(path)
    db_connect = db_engine.connect()
    
    return db_engine, db_connect

In [23]:
# Function to extract table to a pandas DataFrame

def extract_table_to_pandas(tablename, db_connect):
    '''
    Input:
        Table Name: Name of the table to be extracted
        db_connect: The connect to get connection with engine
    Output:
        return 
        df        : The dataframe to be transformed
    '''
    query = "SELECT * FROM {}".format(tablename)
    result = db_connect.execute(query)

    df = pd.DataFrame(result.fetchall())
    df.columns = result.keys()
    
    return df

##### 4.2. Creating functions for Transform: 
Function name:
- **transform_avg_billing**
- **transform_fill_null**

In [24]:
# The transformation function of Average 
def transform_avg_billing(data):
    '''
    Group by Country and extract average billing per Invoice
    Input:
        data : The dataframe to get to transform
    Output:
        return dataframe transformed
    '''
    # Calculation of average by Country
    df_g = data.groupby(['BillingCountry'])[['Total']].mean()
    df_g = df_g.reset_index()
    df_g.rename(columns = {"Total":"Average"}, inplace=True)
    
    df = data.merge(df_g, how="left", left_on = "BillingCountry", right_on = "BillingCountry")

    return df

In [25]:
# The transformation function of filling in the missing values
def transform_fill_null(data):
    '''
    Some processing about Null values
        "BillingState" > "None"
        "BillingPostalCode" > "99999"
    Input:
        data : The dataframe to get to transform
    Output:
        return dataframe transformed
    '''
    # Processing of filling in the missing values
    data = data.fillna({"BillingState": "None", "BillingPostalCode":"99999"})

    return data

##### 4.3. Creating functions for Loading: 

In [26]:
# Function to loading a pandas dataframe into table in database
def loading_to_sql(data, connect, sqlite_table='New_Table'):
    '''
    Input:
        data        : The dataframe to get to loading
        connect     : Name of the engine connection 
        sqlite_table: Table name for loaing
    '''
    # Processing of filling in the missing values
   
    data.to_sql(sqlite_table, connect, if_exists='fail')
    connect.close()
    return 'Loading is Done'

##### 4.4. Testing functions of ETL

In [27]:
path = "sqlite:///D:/01_DS_Document/Data_Sample/Chinook_SQLite/Chinook_Test.sqlite"

# Extracting 
extract_db = extract_database(path)

tablename = 'Invoice'
engine = extract_db[0]
extract = extract_table_to_pandas(tablename, engine)

# Transformation
transform = transform_avg_billing(extract)
transform = transform_fill_null(transform)

# Loading
data = transform
connect = extract_db[1]
sqlite_table
loading_to_sql(data, connect, sqlite_table)

'Loading is Done'

In [28]:
# Print results
transform

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total,Average
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-Straße 34,Stuttgart,,Germany,70174,1.98,5.588571
1,2,4,2009-01-02 00:00:00,Ullevålsveien 14,Oslo,,Norway,0171,3.96,5.660000
2,3,8,2009-01-03 00:00:00,Grétrystraat 63,Brussels,,Belgium,1000,5.94,5.374286
3,4,14,2009-01-06 00:00:00,8210 111 ST NW,Edmonton,AB,Canada,T6G 2C7,8.91,5.427857
4,5,23,2009-01-11 00:00:00,69 Salem Street,Boston,MA,USA,2113,13.86,5.747912
...,...,...,...,...,...,...,...,...,...,...
407,408,25,2013-12-05 00:00:00,319 N. Frances Street,Madison,WI,USA,53703,3.96,5.747912
408,409,29,2013-12-06 00:00:00,796 Dundas Street West,Toronto,ON,Canada,M6J 1V1,5.94,5.427857
409,410,35,2013-12-09 00:00:00,"Rua dos Campeões Europeus de Viena, 4350",Porto,,Portugal,99999,8.91,5.517143
410,411,44,2013-12-14 00:00:00,Porthaninkatu 9,Helsinki,,Finland,00530,13.86,5.945714


In [29]:
transform.isnull().sum()

InvoiceId            0
CustomerId           0
InvoiceDate          0
BillingAddress       0
BillingCity          0
BillingState         0
BillingCountry       0
BillingPostalCode    0
Total                0
Average              0
dtype: int64

**Finally** block will help with **`always closing`** the **connection to DB** even in case of error.

In [30]:
# finally block will help with always closing the connection to DB even in case of error.
# sqlite_connection.close()

con.close()