# Data pipeline tool Mage ai

**Cassandra to MySQL**

**DataLake to DataWarehouse**

Durante la implementacion y el uso de esta herramienta nos enfrentamos a varios desafíos, especialmente en el manejo de grandes volúmenes de datos en relación con los recursos disponibles en nuestro servidor donde se encuentra Mage. Para abordar estos problemas, se decidió no procesar todos los datos al mismo tiempo y se buscaron formas de particionarlos por regiones e índices. Se implementó un enfoque automatizado para procesar pequeñas cantidades de datos y cargarlos progresivamente en MySQL.

Mas adelante se muestra parte del codigo final desplegado en mage, donde a partir de variables globales que lleva un conteo de los datos y la particion mas optima para cada caso, con lo que podemos ir cargando y procesando sin ningun problema asi como llevar registro de la cantidad de datos cargados, pensado tambien ya que se tiene una carga incremental a traves de las API's

**Desafíos encontrados**

*Limitaciones de recursos del servidor: Nuestro servidor tenía recursos limitados para procesar grandes cantidades de datos de manera eficiente.

*Procesamiento simultáneo de datos: La carga y el procesamiento de todos los datos al mismo tiempo causaban problemas de rendimiento y agotamiento de recursos.

**Soluciones implementadas**

*Particionamiento de datos por regiones e índices: En lugar de procesar todos los datos a la vez, se adoptó un enfoque de particionamiento. Se dividieron los datos en regiones y se aplicaron índices para facilitar la carga incremental y el procesamiento eficiente.

*Carga y procesamiento progresivo: En lugar de cargar todos los datos de una vez, se implementó un mecanismo para cargar y procesar pequeñas cantidades de datos de manera automática y progresiva. Esto ayudó a evitar problemas de rendimiento y a aprovechar al máximo los recursos disponibles.

*Uso de variables globales para el seguimiento y optimización: Se introdujeron variables globales en el código final para llevar un conteo de los datos cargados y determinar la partición óptima en función de la cantidad de datos y los recursos disponibles. Esto permitió un control más eficiente del proceso de carga y procesamiento de datos.

## stores_google

In [None]:
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

import pandas as pd

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

from mage_ai.data_preparation.variable_manager import set_global_variable

@data_loader
def load_data(*args, **kwargs):
    """
    Template code for loading data from any source.

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    categories_searched = ['Restaurant',
                        'restaurant',
                        'Bar',
                        'bar',
                        'Deli',
                        'Grocery',
                        'Coffee',
                        'Bakery',
                        'Sandwich']

    dictionary = {'gmap_id':[],
                'name': [],
                'address': [],
                'latitude': [],
                'longitude': [],
                'category':[],
                'misc':[]}



    for category_searched in categories_searched:
    # Construct the query
        query = f"""SELECT gmap_id, name, address, latitude, longitude, category, misc
                    FROM stores
                    WHERE category CONTAINS '{category_searched}'
                    """
        statement = SimpleStatement(query, fetch_size=500)
        answer = session.execute(statement, timeout=None)

        for row in answer:
            dictionary['gmap_id'].append(row['gmap_id'])
            dictionary['name'].append(row['name'])
            dictionary['address'].append(row['address'])
            dictionary['latitude'].append(row['latitude'])
            dictionary['longitude'].append(row['longitude'])
            # Convert the SortedSet to a comma-separated string
            category_str = ', '.join(row['category'])
            dictionary['category'].append(category_str)
            dictionary['misc'].append(row['misc'])


    stores = pd.DataFrame(dictionary)

    return stores


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


In [None]:
#Filtrado por estado
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

import pandas as pd

from geopy.geocoders import Nominatim

from mage_ai.data_preparation.variable_manager import set_global_variable

@transformer
def transform(stores, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Acceder a las variables globales de carga
    indice_inicio = kwargs['indice_inicio']
    particion = kwargs['particion'] #100

    indice_fin = indice_inicio + particion

    #Particionar
    stores = stores[indice_inicio:indice_fin]


    # Instanciar la herramienta Nominatim
    geolocator = Nominatim(user_agent="GetLoc")

    # Definir los estados deseados
    estados_deseados = ['Florida', 'New York', 'California', 'Nevada', 'Texas']

    # Filtrar los datos por estado
    df_filtrado = pd.DataFrame(columns=stores.columns)

    for index, row in stores.iterrows():
        latitude = row['latitude']
        longitude = row['longitude']
        if pd.notnull(latitude) and pd.notnull(longitude):
            location = geolocator.reverse(f"{latitude}, {longitude}")
            if location is not None and 'address' in location.raw and 'state' in location.raw['address']:
                estado = location.raw['address']['state']
                if estado in estados_deseados:
                    row['state'] = estado
                    df_filtrado = df_filtrado.append(row)

    stores_f = df_filtrado

    indice_fin = min(indice_fin, indice_inicio + len(stores))

    set_global_variable('stores_google', 'indice_inicio', indice_fin)

    return stores_f


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


In [None]:
#Tabla_locales
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

import pandas as pd

import mysql.connector

mydb = mysql.connector.connect(
  host="162.241.60.182",
  user="leudadoc_henry",
  password="tenboj-dEckow-nokka8",
  database="leudadoc_Henry"
)

cursor = mydb.cursor()
cursor.execute("USE leudadoc_Henry;")

@custom
def transform_custom(stores_f, *args, **kwargs):
    """
    Args:
        data: The output from the upstream parent block (if applicable)
        args: The output from any additional upstream blocks

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    stores = stores_f

    def check_delivery(x):
        service_options = x.get("misc")

        if service_options and "Delivery" in service_options:
            return "Y"
        else:
            return "N"

    stores['delivery'] = stores.apply(check_delivery, axis=1)

    locales = stores[['gmap_id', 'name', 'address', 'latitude', 'longitude', 'state', 'delivery']]

    #Buscar id del estado
    sql = "SELECT * FROM estados"
    cursor.execute(sql)
    data = cursor.fetchall()
    estados = pd.DataFrame(data)
    estados.columns = ["stateID", "state"]

    locales = pd.merge(locales, estados, on='state', how='inner')
    locales = locales.drop('state', axis=1)
    locales = locales[['gmap_id', 'stateID', 'name', 'address', 'latitude', 'longitude', 'delivery']]

    #Insertar en tabla locales
    values = [(row['gmap_id'], row['stateID'], row['name'], row['address'], row['latitude'], row['longitude'], row['delivery'], row['stateID'], row['name'], row['address'], row['latitude'], row['longitude'], row['delivery']) for _, row in locales.iterrows()]
    insert_query = f"INSERT INTO locales (storeID, stateID, name, address, latitude, longitude, delivery) VALUES (%s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE stateID=%s, name=%s, address=%s, latitude=%s, longitude=%s, delivery=%s;"
    cursor.executemany(insert_query, values)
    mydb.commit()

    print('valores insertados:', values)

    mydb.commit()
    mydb.close()

    return stores



@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


In [None]:
#Tabla_categorias
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

import pandas as pd

import mysql.connector

mydb = mysql.connector.connect(
  host="162.241.60.182",
  user="leudadoc_henry",
  password="tenboj-dEckow-nokka8",
  database="leudadoc_Henry"
)

cursor = mydb.cursor()
cursor.execute("USE leudadoc_Henry;")


@custom
def transform_custom(stores, *args, **kwargs):
    """
    Args:
        data: The output from the upstream parent block (if applicable)
        args: The output from any additional upstream blocks

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Create a new DataFrame with the unique categories
    df_category = pd.DataFrame({'category': stores['category'].str.lower().str.split(", ").explode().unique()})
    import re
    pattern = r'(\w+\s+)restaurant'
    df_category['category'] = df_category['category'].str.replace(pattern, r'\1', regex=True).str.strip()

    cursor.execute("SELECT * FROM categorias;")
    resp = cursor.fetchall()

    mysql_category = pd.DataFrame(resp, columns = ['categoryID', 'category']) # Tabla con categorías ya cargada en MySQL

    no_match = df_category.merge(mysql_category, on='category', how='left', indicator=True)
    no_match = no_match[no_match['_merge'] == 'left_only']
    no_match = no_match.drop('_merge', axis=1)
    new_category = no_match [["category"]]

    #Insertar en tabla categorias
    values = [(row['category'], ) for _, row in new_category.iterrows()]
    insert_query = f"INSERT INTO categorias (category) VALUES (%s);"
    cursor.executemany(insert_query, values)
    mydb.commit()

    print('valores insertados:', values)

    mydb.commit()
    mydb.close()

    return stores


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'


In [None]:
#Tabla_intermedia_categoria_local
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

import pandas as pd

import mysql.connector

mydb = mysql.connector.connect(
  host="162.241.60.182",
  user="leudadoc_henry",
  password="tenboj-dEckow-nokka8",
  database="leudadoc_Henry"
)

cursor = mydb.cursor()
cursor.execute("USE leudadoc_Henry;")

@custom
def transform_custom(stores, *args, **kwargs):
    """
    Args:
        data: The output from the upstream parent block (if applicable)
        args: The output from any additional upstream blocks

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    # Create a new DataFrame with the unique categories
    df_category_store = stores.assign(category = stores['category'].str.lower().str.split(", ")).explode('category')
    import re
    pattern = r'(\w+\s+)restaurant'
    df_category_store['category'] = df_category_store['category'].str.replace(pattern, r'\1', regex=True).str.strip()
    df_category_store = df_category_store[['gmap_id', 'category']]

    cursor.execute("SELECT * FROM categorias;")
    resp = cursor.fetchall()

    mysql_category = pd.DataFrame(resp, columns = ['categoryID', 'category']) # Tabla con categorías ya cargada en MySQL

    df_category_store = df_category_store.merge(mysql_category, how='inner', on='category')
    df_category_store = df_category_store[['gmap_id', 'categoryID']]

    #Insertar en tabla intermedia categoria_local
    values = [(row['categoryID'], row['gmap_id']) for _, row in df_category_store.iterrows()]
    insert_query = f"INSERT INTO categoria_local (categoryID, storeID) VALUES (%s, %s);"
    cursor.executemany(insert_query, values)
    mydb.commit()

    print('valores insertados:', values)

    mydb.commit()
    mydb.close()

    return stores


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

## reviews_california

In [None]:
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

import pandas as pd

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from mage_ai.data_preparation.variable_manager import set_global_variable

@data_loader
def load_data(*args, **kwargs):

    # Acceder a las variables globales de carga
    indice_inicio = kwargs['indice_inicio']
    tamaño_particion = kwargs['tamaño_particion'] #5000

    # Calcular el índice final
    indice_fin = indice_inicio + tamaño_particion

    statement = SimpleStatement("SELECT * FROM reviews WHERE state = 'California';", fetch_size = 500)
    resp = session.execute(statement, timeout=None)

    dictionary = {'gmap_id':[], 'state': [], 'user_id':[], 'name': [], 'time':[], 'rating': [], 'text':[], 'resp': []}

    for i, row in enumerate(resp):
        # Verificar si el índice actual está dentro del rango de la partición
        if indice_inicio <= i < indice_fin:
            dictionary['gmap_id'].append(row['gmap_id'])
            dictionary['state'].append(row['state'])
            dictionary['user_id'].append(row['user_id'])
            dictionary['name'].append(row['name'])
            dictionary['time'].append(row['time'])
            dictionary['rating'].append(row['rating'])
            dictionary['text'].append(row['text'])
            dictionary['resp'].append(row['resp'])

    reviews_california = pd.DataFrame(dictionary)

    indice_fin = min(indice_fin, indice_inicio + len(reviews_california))

    set_global_variable('prueba_cassandra_to_sql', 'indice_inicio', indice_fin)

    return reviews_california

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'



In [None]:
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

import pandas as pd

@transformer
def transform(reviews_california, *args, **kwargs):
    """
    Template code for a transformer block.

    Add more parameters to this function if this block has multiple parent blocks.
    There should be one parameter for each output variable from each parent block.

    Args:
        data: The output from the upstream parent block
        args: The output from any additional upstream blocks (if applicable)

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    #Null
    reviews_california = reviews_california.dropna(subset=['text'])

    # Time to date
    reviews_california['time'] = pd.to_datetime(reviews_california['time'], unit='ms')

    return reviews_california

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

In [None]:
if 'custom' not in globals():
    from mage_ai.data_preparation.decorators import custom
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, dict_factory

cluster = Cluster(['186.87.6.161'], port='9042', protocol_version = 5) #IP del servidor y el puerto estandar de cassandra 9042
session = cluster.connect('henry')
session.row_factory = dict_factory

import pandas as pd

import mysql.connector

mydb = mysql.connector.connect(
  host="162.241.60.182",
  user="leudadoc_henry",
  password="tenboj-dEckow-nokka8",
  database="leudadoc_Henry"
)

cursor = mydb.cursor()
cursor.execute("USE leudadoc_Henry;")

@custom
def transform_custom(reviews_california, *args, **kwargs):
    """
    Args:
        data: The output from the upstream parent block (if applicable)
        args: The output from any additional upstream blocks

    Returns:
        Anything (e.g. data frame, dictionary, array, int, str, etc.)
    """
    reviews_california['time'] = pd.to_datetime(reviews_california['time'], unit='ms')

    #Insertar en tabla clientes
    insert_query = f"INSERT INTO clientes (clientID, name) VALUES (%s, %s) ON DUPLICATE KEY UPDATE name=%s"
    values = [(row['user_id'], row['name'], row['name']) for _, row in reviews_california.iterrows()]
    cursor.executemany(insert_query, values)
    mydb.commit()

    #Insertar en tabla reviews
    insert_query = f"INSERT INTO reviews (clientID, storeID, date, rating, text) VALUES (%s, %s, %s, %s, %s)"
    values = [(row['user_id'], row['gmap_id'], row['time'], row['rating'], row['text']) for _, row in reviews_california.iterrows()]
    cursor.executemany(insert_query, values)
    mydb.commit()

    #ETL para tabla comentarios
    registros_nuevos = len(reviews_california)
    sql = "SELECT * FROM reviews ORDER BY review_id DESC LIMIT {}".format(registros_nuevos)
    cursor.execute(sql)
    data = cursor.fetchall()
    reviews = pd.DataFrame(data)

    df_review_2 = reviews[['user_id', 'time', 'Review_id']]

    reviews_final_2 = reviews_california.dropna(subset=['resp'])
    reviews_final_2 = reviews_final_2[reviews_final_2['resp'] != 'None']

    reviews_final_2 = reviews_final_2[['user_id', 'resp', 'time']]
    merged = reviews_final_2.merge(df_review_2, on=['user_id', 'time'], how='inner')

    df_comentarios = merged[['Review_id','resp']]
    import ast
    # Convertimos la cadena en una lista de diccionarios
    df_comentarios['resp'] = df_comentarios['resp'].apply(ast.literal_eval)

    df_comentarios['time'] = df_comentarios['resp'].apply(lambda x: x['time'])
    df_comentarios['text'] = df_comentarios['resp'].apply(lambda x: x['text'])

    df_comentarios = df_comentarios.drop('resp', axis=1)

    # Time to date
    df_comentarios['time'] = pd.to_datetime(df_comentarios['time'], unit='ms')

    #Insertar en tabla comentarios
    if (len(df_comentarios)) > 0:
        insert_query = f"INSERT INTO comentarios (reviewID, text, date) VALUES (%s, %s, %s)"
        values = [(row['Review_id'], row['text'], row['date']) for _, row in df_comentarios.iterrows()]
        if (len(df_comentarios)) > 1:
            cursor.executemany(insert_query, values)
            mydb.commit()
        elif (len(df_comentarios)) == 1:
            cursor.execute(insert_query, values)
            mydb.commit()
        else: print("No hay datos para insertar en comentarios")
    else: print("No hay datos para insertar en comentarios")

    cursor.close()
    mydb.close()

    return reviews_california


@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'