# ETL with soft dependencies 

this notebook tries to replicate a real world application:

1. We update the new data (i this case a "new") pokemon
2. We run some queries in paralallel because they are not co-dependent
3. We run a final query after we have all the sources refreshed

## First we can create the funtions

1. Update info from a pokemon id
2. Check ids and update a new pokemon
3. Some random rankings
4. A final ranking that uses the results from (3)

In [1]:
import requests
import pandas as pd
import os
from google.cloud import bigquery

client = bigquery.Client()


i = 200

def get_pokemon_data(i):
    url = f'https://pokeapi.co/api/v2/pokemon-species/{i}'
    x = requests.get(url)
    aux = x.json()['flavor_text_entries']
    
    aux = x.json()['flavor_text_entries']
    text = [x['flavor_text'].replace('\n',' ') for x in aux]
    lang = [x['language']['name'] for x in aux]
    version = [x['version']['name'] for x in aux]
    df = pd.DataFrame({'text':text,'lang':lang,'version':version}).query('lang=="en"')
    df['name'] = x.json()['name']
    df['pokemon_id'] = i 
    
    pokedex = df[['pokemon_id','text','lang','version']]
    names = df[['pokemon_id','name']].drop_duplicates()
    
    url = f'https://pokeapi.co/api/v2/pokemon/{i}'
    x = requests.get(url)
    
    stats = pd.DataFrame([(x ['base_stat'], x['stat']['name'])for  x in x.json()['stats']])
    stats= stats.set_index([1]).transpose()
    stats['pokemon_id'] = i
    stats.columns = ['hp','attack','defense','specialAttack','specialDefense','speed','pokemon_id']
    
    
    sprites = pd.DataFrame((x.json()['sprites']['front_default'], i+1)).transpose()
    
    sprites.columns = ['sprite_link','pokemon_id']
    
    sprites = sprites[['pokemon_id','sprite_link']]

    return names, pokedex, stats, sprites

def get_games_order(game):
    url = f'https://pokeapi.co/api/v2/version/{game}'
    x = requests.get(url)
    aux = x.json()
    aux = (aux['id'], aux['name'])
    game_order = pd.DataFrame(aux).transpose()
    game_order.columns = ['id','version']
    return game_order

def table_upload_append(client,df,table):
    job_config = bigquery.LoadJobConfig(
            write_disposition="WRITE_APPEND",
        )


    job = client.load_table_from_dataframe(
        df , table, job_config=job_config
    )  # Make an API request.
    job.result()  # Wait for the job to complete.


def refresh_one_pokemon():
    table = 'examples.names'

    try:
        sql = f'''
        SELECT max(pokemon_id) i FROM {table}
        '''
        #print(sql)
        
        i = client.query(sql).result().to_dataframe().i.values[0] +1
    except:
        #print('no table?')
        i = 1
    #print(i)

    names, pokedex, stats, sprites = get_pokemon_data(i)

    table_upload_append(client,names,'examples.names')
    table_upload_append(client,stats,'examples.stats')
    table_upload_append(client,sprites,'examples.sprites')
    table_upload_append(client,pokedex,'examples.pokedex')

    sql = f'''
        SELECT distinct version  FROM examples.pokedex
        '''
        
    versions = client.query(sql).result().to_dataframe().version
    
    games = pd.concat([get_games_order(x) for x in versions] )

    job_config = bigquery.LoadJobConfig(
            write_disposition="WRITE_TRUNCATE",
        )


    job = client.load_table_from_dataframe(
        games , 'examples.games', job_config=job_config
    )  # Make an API request.
    job.result()  # Wait for the job to complete.

    return names



def set_top_all():
    job_config = bigquery.QueryJobConfig(
            write_disposition="WRITE_TRUNCATE",
            destination = f'''{os.environ['GCP_PROJECT']}.examples.top_all'''
    )
   
    sql = '''SELECT  pokemon_id, name, 
    row_number() over(order by attack + `specialAttack`+ defense + `specialDefense` + speed desc) ranking
    FROM examples.stats 
    left join examples.names using(pokemon_id)
    order by attack + `specialAttack`+ defense + `specialDefense` + speed desc
    limit 50
    '''

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        sql,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config,
    )  # API request - starts the query

    query_job.result()


def set_top_offensive():
    job_config = bigquery.QueryJobConfig(
            write_disposition="WRITE_TRUNCATE",
            destination = f'''{os.environ['GCP_PROJECT']}.examples.top_offense'''
    )

    sql = '''SELECT  pokemon_id, name, row_number() over(order by attack + `specialAttack` desc) ranking
    FROM examples.stats 
    left join examples.names using(pokemon_id)
    order by defense + attack + `specialAttack` desc
    limit 50
    '''

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        sql,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config,
    )  # API request - starts the query

    query_job.result()


def set_top_defensive():
    job_config = bigquery.QueryJobConfig(
            write_disposition="WRITE_TRUNCATE",
            destination = f'''{os.environ['GCP_PROJECT']}.examples.top_defensive'''
    )

    sql = '''SELECT  pokemon_id, name, row_number() over(order by defense + `specialDefense` desc) ranking
    FROM examples.stats 
    left join examples.names using(pokemon_id)
    order by defense + `specialDefense` desc
    limit 50
    '''

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        sql,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config,
    )  # API request - starts the query

    query_job.result()


def set_top_fast():
    job_config = bigquery.QueryJobConfig(
            write_disposition="WRITE_TRUNCATE",
            destination = f'''{os.environ['GCP_PROJECT']}.examples.top_fast'''
    )

    sql = '''SELECT  pokemon_id, name, row_number() over(order by speed desc) ranking
    FROM examples.stats 
    left join examples.names using(pokemon_id)
    order by speed desc
    limit 50
    '''

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        sql,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config,
    )  # API request - starts the query

    query_job.result()


def final_ranking():

    sql = '''
    SELECT pokemon_id, names.name, row_number() over(order by  t1.ranking + t2.ranking + t3.ranking + t4.ranking) ranking_agg
    FROM `examples.top_all` t1
    inner join `examples.top_defensive`t2 using(pokemon_id)
    inner join `examples.top_offense`t3 using(pokemon_id)
    inner join `examples.top_fast`t4 using(pokemon_id)
    inner join `examples.names` names using(pokemon_id)
    order by t1.ranking + t2.ranking + t3.ranking + t4.ranking    
    '''

    job_config = bigquery.QueryJobConfig(
            write_disposition="WRITE_TRUNCATE",
            destination = f'''{os.environ['GCP_PROJECT']}.examples.top_final'''
    )

    # Start the query, passing in the extra configuration.
    query_job = client.query(
        sql,
        # Location must match that of the dataset(s) referenced in the query
        # and of the destination table.
        location="US",
        job_config=job_config,
    )  # API request - starts the query

    query_job.result()


The steps to build and run are the following:

1. The common task before defining a dag is defining the function that we will run in each node (we did this in the previous chunk)
2. Creates nodes (please check that we did not add the dependency directly in here in this example)
3. Define dependencies using > (thats the Soft dependency operator)
4. Create the nodes list using all the ExecNodes availables in the envioronment... if you do not want to do it with all the created nodes please create the list by yourself as usual
5. Create the dag with the list of nodes
6. Run the dag
7. Check the html output with one iframe

In [2]:
from easydags import  ExecNode, DAG



add_pokemon = ExecNode('add', exec_function = refresh_one_pokemon)


ranking_all = ExecNode('all', exec_function = set_top_all)

ranking_offensive = ExecNode('offensive', exec_function = set_top_offensive)

ranking_defensive = ExecNode('defensive', exec_function = set_top_defensive)

ranking_fast = ExecNode('fast', exec_function = set_top_fast)

ranking_final = ExecNode('final', exec_function = final_ranking)



add_pokemon > ranking_all > ranking_final

add_pokemon > ranking_offensive > ranking_final
add_pokemon > ranking_defensive > ranking_final
add_pokemon > ranking_fast > ranking_final



# Creating the list of nodes... you can also do it by yourself! 
#nodes = [node_prepro, node_base,node_under,node_over,node_smote,node_metrics] 
nodes = [] 
globs = globals().copy()
for obj_name in globs:         
    if isinstance(globs[obj_name], ExecNode):
        nodes.append(globs[obj_name])


dag = DAG(nodes,name = 'Real ETL example (using pokemon)',max_concurrency=3, debug = False, error_type_fatal= False)

dag.execute()
    
from IPython.display import IFrame
IFrame(src=f"{dag.name}_states_run.html", width='100%', height=600)


2023-06-15 11:10:09.721 | INFO     | easydags.node:execute:146 - Start executing add at 2023-06-15, 11:10:09
2023-06-15 11:10:41.848 | INFO     | easydags.node:execute:146 - Start executing offensive at 2023-06-15, 11:10:41
2023-06-15 11:10:41.849 | INFO     | easydags.node:execute:146 - Start executing all at 2023-06-15, 11:10:41
2023-06-15 11:10:41.851 | INFO     | easydags.node:execute:146 - Start executing fast at 2023-06-15, 11:10:41
2023-06-15 11:10:44.617 | INFO     | easydags.node:execute:146 - Start executing defensive at 2023-06-15, 11:10:44
2023-06-15 11:10:47.837 | INFO     | easydags.node:execute:146 - Start executing final at 2023-06-15, 11:10:47


drawing


# Final DAG

If you run this tutorial you will get the dag html by yourself, here i will add a png version so you can check it out without running the tutorial:

[Motivation](https://raw.githubusercontent.com/magralo/easydags/main/resource_readme/dag_tut_data_eng.png)
              
