<b> This notebook shows the code for ETL pipeline using Mage and it is deployed in GCP compute engine. The following steps are applied: </b>
* Raw csv file is extracted from google storage api.
* Transformation logic is applied.
* Finally data is loaded into BigQuery.

<h2> Extract </h2>

In [None]:
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
    """
    Template for loading data from API
    """
    url = 'https://storage.googleapis.com/electric-guitars-bucket-kolcak/electric-guitars.csv'
    response = requests.get(url)

    return pd.read_csv(io.StringIO(response.text), sep=',')


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


<h2> Transform </h2>

In [None]:
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
import numpy as np

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

def create_guitar_metadata_df(df):
    metadata_df = df[['guitar_id','Link']] \
                    .assign(metadata_id = df['guitar_id'],
                            scrape_date = pd.to_datetime(df['Scrape Date'])) \
                    .rename(columns={'Link':'scrape_url'})
    
    
    return metadata_df[['metadata_id','scrape_url','scrape_date','guitar_id']], df.drop(['Link','Scrape Date'],axis=1)

def create_guitar_body_df(df):
    """
    Function to create guitar_body dataframe.
    """
    body_df = df[['guitar_id', 'Colors', 'Body Material', 'Bridge', 'Type']] \
                .rename(columns={'Colors':'guitar_color', 'Body Material':'body_material', 'Bridge':'bridge', 'Type':'body_type'}) \
                .groupby(['guitar_color','body_material','bridge','body_type'],dropna=False,as_index=False) \
                .agg({'guitar_id':lambda x:list(x)}) \
                .reset_index(drop=True) \
                .pipe(lambda df_b: df_b.assign(body_id=df_b.index))
    
    return body_df[['body_id','guitar_id','guitar_color','body_material','bridge','body_type']], \
            df.drop(['Colors', 'Body Material', 'Bridge', 'Type'],axis=1)

def create_guitar_neck_df(df):
    """
    Function to create guitar_neck dataframe.
    """
    neck_df = df[['guitar_id', 'Neck Joint', 'Neck Material', 'Scale Size', 'Shape', 'Nut', 'Nut Width']] \
                .rename(columns={'Neck Joint':'neck_joint','Neck Material':'neck_material','Scale Size':'scale_size',
                                 'Shape':'shape','Nut':'nut','Nut Width':'nut_width'}) \
                .groupby(['neck_joint','neck_material','scale_size','shape','nut','nut_width'],dropna=False,as_index=False) \
                .agg({'guitar_id':lambda x:list(x)}) \
                .reset_index(drop=True) \
                .pipe(lambda df_n: df_n.assign(neck_id=df_n.index,
                                              scale_size=df_n['scale_size'].str[:2].astype(float),
                                              nut_width=df_n['nut_width'].replace('mm (\'\')','42mm').str[:2].astype(float))
                     )
    
    return neck_df[['neck_id','neck_joint','neck_material','scale_size','shape','nut','nut_width','guitar_id']], \
            df.drop(['Neck Joint', 'Neck Material', 'Scale Size', 'Shape', 'Nut', 'Nut Width'],axis=1)

def create_guitar_fretboard_df(df):
    """
    Function to create guitar_fretboard dataframe.
    """
    fretboard_df = df[['guitar_id','Fretboard','Decoration','Thickness','Frets','Fretboard Radius']] \
                    .rename(columns={'Fretboard':'fretboard_material', 'Decoration':'fretboard_decoration', 
                                     'Thickness':'fretboard_thickness', 'Frets':'frets', 'Fretboard Radius':'fretboard_radius'})
    
    
    fretboard_df['fretboard_thickness'] = fretboard_df['fretboard_thickness'].fillna(fretboard_df['fretboard_thickness'].value_counts().index[0])
    fretboard_df['fretboard_decoration'] = fretboard_df['fretboard_decoration'].fillna(fretboard_df['fretboard_decoration'].value_counts().index[0])
    
    fretboard_df = fretboard_df.assign(fret_number=fretboard_df['frets'].str[:2].astype(int),
                                       frets=fretboard_df['frets'].str[3:], 
                                       fretboard_radius=fretboard_df['fretboard_radius'].str.split("\"",n=1,expand=True)[0].astype(float)) \
                                .groupby(['fretboard_material','fretboard_decoration','fretboard_thickness','frets',
                                          'fret_number','fretboard_radius'],dropna=False,as_index=False) \
                                .agg({'guitar_id':lambda x:list(x)}) \
                                .reset_index(drop=True) \
                                .pipe(lambda df_f: df_f.assign(fretboard_id = df_f.index))
    
    return fretboard_df[['fretboard_id','fretboard_material','fretboard_decoration','fretboard_thickness','frets', 
                         'fret_number','fretboard_radius', 'guitar_id']], \
            df.drop(['Fretboard','Decoration','Thickness','Frets','Fretboard Radius'],axis=1)


def create_guitar_electronics_df(df):
    """
    Function to create guitar_electronics dataframe.
    """
    electronics_df = df[['guitar_id','Configuration','Tuners','Switch','Knobs','Pickup Mods','Volume Controls','Tone Controls',
                        'Bridge Pickup', 'Neck Pickup', 'Middle Pickup']] \
                    .rename(columns={'Configuration':'pickup_configuration','Tuners':'tuners','Switch':'switch','Knobs':'knobs'
                                    ,'Pickup Mods':'pickup_mods','Volume Controls':'volume_controls','Tone Controls':'tone_controls'
                                    ,'Bridge Pickup':'bridge_pickup','Neck Pickup':'neck_pickup','Middle Pickup':'middle_pickup'}) \
                    .groupby(['pickup_configuration','tuners','switch','knobs','pickup_mods','volume_controls','tone_controls',
                              'bridge_pickup','neck_pickup','middle_pickup'],dropna=False,as_index=False) \
                    .agg({'guitar_id':lambda x:list(x)}) \
                    .reset_index(drop=True) \
                    .pipe(lambda df_e: df_e.assign(electronics_id = df_e.index)) 
    
    return electronics_df[['electronics_id','pickup_configuration','tuners','switch','knobs','pickup_mods','volume_controls',
                           'tone_controls','bridge_pickup','neck_pickup','middle_pickup','guitar_id']], \
            df.drop(['Configuration','Tuners','Switch','Knobs','Pickup Mods','Volume Controls','Tone Controls','Bridge Pickup', 
                     'Neck Pickup', 'Middle Pickup'],axis=1)

def create_guitar_info_df(df):
    """
    Function to create guitar_info dataframe.
    """
    info_df = df[['guitar_id','Brand','Series','Year','Made in','Strings','Left-Handed Version']] \
                .rename(columns={'Brand':'brand','Series':'series','Year':'year','Made in':'made_in','Strings':'strings',
                                'Left-Handed Version':'left_handed_version'}) \
                .groupby(['brand','series','year','made_in','strings','left_handed_version'],as_index=False) \
                .agg({'guitar_id':lambda x:list(x)}) \
                .reset_index(drop=True) \
                .pipe(lambda df_i: df_i.assign(
                                    left_handed_version = np.where(df_i['left_handed_version'] == 'Yes',True,False),
                                    info_id = df_i.index
                ))
    
    return info_df[['info_id','brand','series','year','made_in','strings','left_handed_version','guitar_id']], \
            df.drop(['Brand','Series','Year','Made in','Strings','Left-Handed Version'],axis=1)

def create_guitar_strengths_and_weaknesses_df(df):
    """
    Function to create guitar_strengths_and_weaknesses dataframe.
    """
    st_wk_df = df[['guitar_id','No Locking Tuners','Tremolo','Top Brand Pickups', 'Expensive Wood', 'Neck-Through Build', 
                   'No High-Quality Nut']] \
                .rename(columns={'No Locking Tuners':'has_locking_tuners', 'Tremolo':'has_tremolo', 
                                 'Top Brand Pickups':'has_top_brand_pickups',
                                'Expensive Wood':'has_expensive_wood','Neck-Through Build':'has_neck_through_build',
                                'No High-Quality Nut':'has_high_quality_nut'})
    
    st_wk_df['has_locking_tuners'] = st_wk_df['has_locking_tuners'].fillna(True)
    st_wk_df['has_tremolo'] = st_wk_df['has_tremolo'].fillna(False)
    st_wk_df['has_top_brand_pickups'] = st_wk_df['has_top_brand_pickups'].fillna(False)
    st_wk_df['has_expensive_wood'] = st_wk_df['has_expensive_wood'].fillna(False)
    st_wk_df['has_neck_through_build'] = st_wk_df['has_neck_through_build'].fillna(False)
    st_wk_df['has_high_quality_nut'] = st_wk_df['has_high_quality_nut'].fillna(False)
    
    st_wk_df = st_wk_df.groupby(['has_locking_tuners','has_tremolo','has_top_brand_pickups','has_expensive_wood',
                                 'has_neck_through_build','has_high_quality_nut'],as_index=False) \
                        .agg({'guitar_id':lambda x:list(x)}) \
                        .reset_index(drop=True)\
                        .pipe(lambda df_s: df_s.assign(st_and_wk_id = df_s.index))
    
    return st_wk_df[['st_and_wk_id','has_locking_tuners','has_tremolo','has_top_brand_pickups','has_expensive_wood',
                     'has_neck_through_build','has_high_quality_nut','guitar_id']], \
            df.drop(['No Locking Tuners','Tremolo','Top Brand Pickups', 'Expensive Wood', 'Neck-Through Build', 
                     'No High-Quality Nut'],axis=1)

def create_electric_guitars_df(df,metadata_df,body_df,neck_df,fretboard_df,electronics_df,info_df,st_wk_df):
    """
    Function to create electric_guitars df.
    """
    electric_guitars_df = df[['guitar_id','Model','Price','Score']] \
                            .rename(columns={'Model':'model_name','Price':'price','Score':'guitar_score'}) \
                            .pipe(lambda df_e: df_e.assign(price=df_e.price.str.replace("$","").str.replace(",","").astype(float)))
    
    electric_guitars_df = electric_guitars_df.merge(metadata_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(body_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(neck_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(fretboard_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(electronics_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(info_df.explode('guitar_id'),on='guitar_id') \
                                                .merge(st_wk_df.explode('guitar_id'),on='guitar_id') \
                            [['guitar_id','body_id','neck_id','fretboard_id','electronics_id','info_id','st_and_wk_id',
                              'metadata_id','model_name','price','guitar_score']]
    
    
    return electric_guitars_df,metadata_df.drop("guitar_id",axis=1),body_df.drop("guitar_id",axis=1)\
            ,neck_df.drop("guitar_id",axis=1),fretboard_df.drop("guitar_id",axis=1),electronics_df.drop("guitar_id",axis=1)\
            ,info_df.drop("guitar_id",axis=1),st_wk_df.drop("guitar_id",axis=1)

@transformer
def transform(df, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        df: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Specify your transformation logic here
    df['guitar_id'] = df.index

    metadata_df, df = create_guitar_metadata_df(df)
    body_df, df = create_guitar_body_df(df)
    neck_df, df = create_guitar_neck_df(df)
    fretboard_df, df = create_guitar_fretboard_df(df)
    electronics_df, df = create_guitar_electronics_df(df)
    info_df, df = create_guitar_info_df(df)
    st_wk_df, df = create_guitar_strengths_and_weaknesses_df(df)

    electric_guitars_df,metadata_df,body_df,neck_df,fretboard_df,electronics_df,info_df,st_wk_df = \
    create_electric_guitars_df(df,metadata_df,body_df,neck_df,fretboard_df,electronics_df,info_df,st_wk_df)


    return {
        "electric_guitars":electric_guitars_df.to_dict(orient="dict"),
        "guitar_metadata":metadata_df.to_dict(orient="dict"),
        "guitar_body":body_df.to_dict(orient="dict"),
        "guitar_neck":neck_df.to_dict(orient="dict"),
        "guitar_fretboard":fretboard_df.to_dict(orient="dict"),
        "guitar_electronics":electronics_df.to_dict(orient="dict"),
        "guitar_info":info_df.to_dict(orient="dict"),
        "guitar_strengths_and_weaknesses":st_wk_df.to_dict(orient="dict"),
    }


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


<h2> Load </h2>

In [None]:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.bigquery import BigQuery
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_big_query(data, **kwargs) -> None:
    config_path = path.join(get_repo_path(),'io_config.yaml')
    config_profile = "default"

    for table_name in data:
        df = DataFrame(data[table_name])
        table_id = f"steady-copilot-413818.electric_guitars_dataset.{table_name}"
        BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
            df,table_id,if_exists='replace'
        )
