# Helper Methods

In [42]:
from IPython.core.magic import register_cell_magic


@register_cell_magic
def write_and_run(line, cell):
    """Write command to file before executing on notebook"""
    args = line.split()
    append = ''
    if '-a' in args:
        append = args.pop()
    for arg in args:
        file = arg
        mode = 'w' if append != '-a' else 'a'
        with open(file, mode) as f:
            f.write(f"{cell}\n")
           
    get_ipython().run_cell(cell)

# Dependencies Installation

In [43]:
!pip install -r requirements.txt



## Connect to our DB server
First thing we should do is to connect to our DB(test)

**PS**: Do not do this in production

In [44]:
SCRIPT_FILENAME = 'scripts/simple_etl.py'
%store SCRIPT_FILENAME

Stored 'SCRIPT_FILENAME' (str)


In [45]:
%%write_and_run $SCRIPT_FILENAME
import os

import pandas as pd

from dotenv import dotenv_values
from sqlalchemy import create_engine, inspect


Read environmental variables defined in the `.env` file, if not available read from environments

In [46]:
%%write_and_run $SCRIPT_FILENAME -a

CONFIG = dotenv_values('.env')
if not CONFIG:
    CONFIG = os.environ

connection_uri = "postgresql+psycopg2://{}:{}@{}:{}".format(
    CONFIG["POSTGRES_USER"],
    CONFIG["POSTGRES_PASSWORD"],
    CONFIG['POSTGRES_HOST'],
    CONFIG["POSTGRES_PORT"],
)

Create a connection to the DB.

In [47]:
%%write_and_run $SCRIPT_FILENAME -a

engine = create_engine(connection_uri, pool_pre_ping=True)
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fa7ca5e34f0>

# Extract the data from the hosting service

In [48]:
%%write_and_run $SCRIPT_FILENAME -a
# Extract
dataset = "https://gist.githubusercontent.com/mmphego/5b6fc4d6dc3c8fba4fce9d994a2fe16b/raw/ab5df0e76812e13df5b31e466a5fb787fac0599a/wine_quality.csv"

Load in the 'Wine Quality' dataset and creare a dataframe

In [84]:
%%write_and_run $SCRIPT_FILENAME -a

df = pd.read_csv(dataset)

Always good to make sure your data look correct

In [50]:
df.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,winecolor
0,7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6,white
1,6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6,white
2,8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6,white
3,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6,white
4,7.2,0.23,0.32,8.5,0.058,47.0,186.0,0.9956,3.19,0.4,9.9,6,white


Simple table summary containing basic statistical metrics (count, mean, std, min, max, and percentiles)

In [51]:
df.describe()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
count,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0
mean,7.215307,0.339666,0.318633,5.443235,0.056034,30.525319,115.744574,0.994697,3.218501,0.531268,10.491801,5.818378
std,1.296434,0.164636,0.145318,4.757804,0.035034,17.7494,56.521855,0.002999,0.160787,0.148806,1.192712,0.873255
min,3.8,0.08,0.0,0.6,0.009,1.0,6.0,0.98711,2.72,0.22,8.0,3.0
25%,6.4,0.23,0.25,1.8,0.038,17.0,77.0,0.99234,3.11,0.43,9.5,5.0
50%,7.0,0.29,0.31,3.0,0.047,29.0,118.0,0.99489,3.21,0.51,10.3,6.0
75%,7.7,0.4,0.39,8.1,0.065,41.0,156.0,0.99699,3.32,0.6,11.3,6.0
max,15.9,1.58,1.66,65.8,0.611,289.0,440.0,1.03898,4.01,2.0,14.9,9.0


Notice that index 12 contains an object (string), our future ML model will not understand this.
Therefore we will need to transform it to numeric

In [52]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6497 entries, 0 to 6496
Data columns (total 13 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   fixed acidity         6497 non-null   float64
 1   volatile acidity      6497 non-null   float64
 2   citric acid           6497 non-null   float64
 3   residual sugar        6497 non-null   float64
 4   chlorides             6497 non-null   float64
 5   free sulfur dioxide   6497 non-null   float64
 6   total sulfur dioxide  6497 non-null   float64
 7   density               6497 non-null   float64
 8   pH                    6497 non-null   float64
 9   sulphates             6497 non-null   float64
 10  alcohol               6497 non-null   float64
 11  quality               6497 non-null   int64  
 12  winecolor             6497 non-null   object 
dtypes: float64(11), int64(1), object(1)
memory usage: 660.0+ KB


## Load the raw data into a production system

Once we have our data in a dataframe, we can then save it into our DB for future use

In [53]:
%%write_and_run $SCRIPT_FILENAME -a
# load to DB
table_name = 'wine_quality_raw_dataset'
df.to_sql(table_name, engine, if_exists='replace')

Check if table was created!

In [54]:
f"{table_name!r} exists!" if table_name in inspect(engine).get_table_names() else f"{table_name} does not exist!"

"'wine_quality_raw_dataset' exists!"

# Transform it into a usable format

Now we impute the color from `str` to `int`

In [95]:
%%write_and_run $SCRIPT_FILENAME -a
# transformation
df_transform = df.copy()
winecolor_encoded = pd.get_dummies(df_transform['winecolor'], prefix='winecolor')
df_transform[winecolor_encoded.columns.to_list()] = winecolor_encoded
df_transform.drop('winecolor', axis=1, inplace=True)

In [96]:
# apply normalization techniques
for column in df_transform.columns:
    df_transform[column] = (df_transform[column] - df_transform[column].mean()) / df_transform[column].std()  

In [99]:
df_transform.describe()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,winecolor_red,winecolor_white
count,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0,6497.0
mean,2.099803e-16,-2.44977e-16,3.499672e-17,3.499672e-17,-3.499672e-17,-8.749179e-17,0.0,-3.51717e-15,2.720995e-15,2.099803e-16,-8.399212e-16,-2.82161e-16,-3.499672e-17,1.749836e-16
std,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
min,-2.634386,-1.577208,-2.192664,-1.017956,-1.342536,-1.663455,-1.941631,-2.529997,-3.100376,-2.091774,-2.089189,-3.227439,-0.5713226,-1.750055
25%,-0.6288845,-0.66611,-0.4722972,-0.7657389,-0.514759,-0.7620156,-0.68548,-0.7858922,-0.6748102,-0.6805395,-0.8315512,-0.9371575,-0.5713226,0.5713226
50%,-0.1660764,-0.3016707,-0.05940918,-0.5135217,-0.2578628,-0.08593639,0.039904,0.06448391,-0.05287017,-0.1429263,-0.1608107,0.207983,-0.5713226,0.5713226
75%,0.3738663,0.366468,0.4911081,0.5584015,0.2559297,0.5901428,0.71221,0.7647937,0.6312639,0.4618885,0.6776148,0.207983,-0.5713226,0.5713226
max,6.69891,7.533774,9.23057,12.68585,15.84097,14.56245,5.736815,14.76765,4.92265,9.870119,3.695947,3.643405,1.750055,0.5713226


In [100]:
df_transform.head()

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,winecolor_red,winecolor_white
0,-0.166076,-0.42315,0.284664,3.206682,-0.314951,0.815503,0.959902,2.102052,-1.358944,-0.546136,-1.418449,0.207983,-0.571323,0.571323
1,-0.706019,-0.240931,0.147035,-0.807775,-0.200775,-0.931035,0.287595,-0.232314,0.506876,-0.27733,-0.831551,0.207983,-0.571323,0.571323
2,0.682405,-0.362411,0.559923,0.306184,-0.172231,-0.029596,-0.331634,0.134515,0.2581,-0.613338,-0.328496,0.207983,-0.571323,0.571323
3,-0.011807,-0.66611,0.009405,0.642474,0.056121,0.928182,1.242978,0.301255,-0.177258,-0.882144,-0.496181,0.207983,-0.571323,0.571323
4,-0.011807,-0.66611,0.009405,0.642474,0.056121,0.928182,1.242978,0.301255,-0.177258,-0.882144,-0.496181,0.207983,-0.571323,0.571323


# Load transformed data into a production system

Once our data transformation is complete, we can save our dataframe in an SQL table

In [59]:
%%write_and_run $SCRIPT_FILENAME -a
# load
table_name = table_name.replace('raw', 'clean')

In [60]:
%%write_and_run $SCRIPT_FILENAME -a
df.to_sql(table_name, engine, if_exists='replace')

Check if table was created!

In [61]:
f"{table_name!r} exists!" if table_name in inspect(engine).get_table_names() else f"{table_name} does not exist!"

"'wine_quality_clean_dataset' exists!"

# Read table from SQL
for sanity!

## Raw Dataset

In [62]:
pd.read_sql("SELECT * FROM wine_quality_raw_dataset", engine)

Unnamed: 0,index,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,winecolor
0,0,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6,white
1,1,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6,white
2,2,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6,white
3,3,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6,white
4,4,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6,white
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6492,6492,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5,red
6493,6493,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,0.76,11.2,6,red
6494,6494,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6,red
6495,6495,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5,red


## Cleaned Dataset

In [104]:
pd.read_sql("SELECT * FROM wine_quality_clean_dataset", engine)

Unnamed: 0,index,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,winecolor
0,0,-0.166076,-0.423150,0.284664,3.206682,-0.314951,0.815503,0.959902,2.102052,-1.358944,-0.546136,-1.418449,0.207983,-0.571323
1,1,-0.706019,-0.240931,0.147035,-0.807775,-0.200775,-0.931035,0.287595,-0.232314,0.506876,-0.277330,-0.831551,0.207983,-0.571323
2,2,0.682405,-0.362411,0.559923,0.306184,-0.172231,-0.029596,-0.331634,0.134515,0.258100,-0.613338,-0.328496,0.207983,-0.571323
3,3,-0.011807,-0.666110,0.009405,0.642474,0.056121,0.928182,1.242978,0.301255,-0.177258,-0.882144,-0.496181,0.207983,-0.571323
4,4,-0.011807,-0.666110,0.009405,0.642474,0.056121,0.928182,1.242978,0.301255,-0.177258,-0.882144,-0.496181,0.207983,-0.571323
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6492,6492,-0.783154,1.581266,-1.642146,-0.723703,0.969530,0.083083,-1.269324,0.067819,1.439786,0.327485,0.006874,-0.937157,1.750055
6493,6493,-1.014558,1.277566,-1.504517,-0.681666,0.170298,0.477463,-1.145479,0.141185,1.875144,1.537115,0.593772,0.207983,1.750055
6494,6494,-0.706019,1.034607,-1.298073,-0.660648,0.569914,-0.085936,-1.340094,0.347943,1.253204,1.469913,0.426087,0.207983,1.750055
6495,6495,-1.014558,1.854595,-1.366888,-0.723703,0.541370,0.083083,-1.269324,0.257903,2.186114,1.201107,-0.244653,-0.937157,1.750055


In [103]:
engine


# Done!

In [64]:
%%writefile dags/simple_etl_dag.py
import os

from functools import wraps

import pandas as pd

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

from dotenv import dotenv_values
from sqlalchemy import create_engine, inspect


args = {"owner": "me myself and I", "start_date": days_ago(1)}

dag = DAG(dag_id="simple_etl_dag", default_args=args, schedule_interval=None)

DATASET_URL = "https://gist.githubusercontent.com/mmphego/5b6fc4d6dc3c8fba4fce9d994a2fe16b/raw/ab5df0e76812e13df5b31e466a5fb787fac0599a/wine_quality.csv"


CONFIG = dotenv_values(".env")
if not CONFIG:
    CONFIG = os.environ


def logger(fn):
    from datetime import datetime, timezone

    @wraps(fn)
    def inner(*args, **kwargs):
        called_at = datetime.now(timezone.utc)
        print(f">>> Running {fn.__name__!r} function. Logged at {called_at}")
        to_execute = fn(*args, **kwargs)
        print(f">>> Function: {fn.__name__!r} executed. Logged at {called_at}")
        return to_execute

    return inner


@logger
def connect_db():
    print("Connecting to DB")
    connection_uri = "postgresql+psycopg2://{}:{}@{}:{}".format(
        CONFIG["POSTGRES_USER"],
        CONFIG["POSTGRES_PASSWORD"],
        CONFIG["POSTGRES_HOST"],
        CONFIG["POSTGRES_PORT"],
    )

    engine = create_engine(connection_uri, pool_pre_ping=True)
    engine.connect()
    return engine


@logger
def extract(dataset_url):
    print(f"Reading dataset from {dataset_url}")
    df = pd.read_csv(dataset_url)
    return df


@logger
def transform(df):
    # transformation
    print("Transforming data")

    def impute_color(x):
        return 0 if x == "white" else 1

    df["winecolor"] = df["winecolor"].apply(impute_color)
    return df


@logger
def check_table_exists(table_name, engine):
    if table_name in inspect(engine).get_table_names():
        print(f"{table_name!r} exists in the DB!")
    else:
        print(f"{table_name} does not exist in the DB!")


@logger
def load_to_db(df, table_name, engine):
    print(f"Loading dataframe to DB on table: {table_name}")
    df.to_sql(table_name, engine, if_exists="replace")
    check_table_exists(table_name, engine)


@logger
def etl():
    db_engine = connect_db()

    raw_df = extract(DATASET_URL)
    raw_table_name = "wine_quality_raw_dataset"

    clean_df = transform(raw_df)
    clean_table_name = "wine_quality_clean_dataset"

    load_to_db(raw_df, raw_table_name, db_engine)
    load_to_db(clean_df, clean_table_name, db_engine)


with dag:
    run_etl_task = PythonOperator(task_id="run_etl_task", python_callable=etl)


Overwriting dags/simple_etl_dag.py
