In [1]:
!pip install pandas

You should consider upgrading via the '/home/drapaiton/.cache/pypoetry/virtualenvs/tiendapago-examen-ea-xYsG0OIB-py3.9/bin/python -m pip install --upgrade pip' command.[0m


In [3]:
import pandas as pd
from pandas import DataFrame as DaFe

CUSTOMERS_FILE_PATH = "Customers.csv"
TRANSACTIONS_FILE_PATH = 'Transactions.csv'
DISTRIBUTOR_FILE_PATH = 'Distributor.csv'

TRANSACTION_AMOUNT_COLUMN = 'TransactionAmount'
DISTRIBUTOR_ID_COLUMN = 'DistributionCenterID'
CUSTOMER_ID_COLUMN = 'CustomerID'
DATE_COLUMN = 'Date'

DATE_FORMAT = '%m/%d/%Y %H:%M'

In [562]:
"""
1. Crear procesos de integración de datos que extraigan de los archivos insumos y generen
las transformaciones necesarias para un modelo de Business Intelligence y Data
Warehouse.

Empresa: Bytelian SA de CV
"""

def integrate_bi_input_files() -> DaFe:
    """generate a big DaFe to calculate easier, as this file grows strategy should be modified
    with chunk iteration, date iteration, unique client isolation  etc..."""
    try:
        customers_df = pd.read_csv(CUSTOMERS_FILE_PATH)
        transactions_df = pd.read_csv(TRANSACTIONS_FILE_PATH)
        distributor_df = pd.read_csv(DISTRIBUTOR_FILE_PATH)

        df = customers_df.join(transactions_df.set_index(CUSTOMER_ID_COLUMN), on=CUSTOMER_ID_COLUMN)
        if df.empty:
            FILES_TRIED_TO_JOIN = [CUSTOMERS_FILE_PATH,TRANSACTIONS_FILE_PATH]
            raise ValueError(f"these files couldn't be merged {FILES_TRIED_TO_JOIN}")

        df = df.join(distributor_df.set_index(DISTRIBUTOR_ID_COLUMN), on=DISTRIBUTOR_ID_COLUMN)
        if df.empty:
            FILES_TRIED_TO_JOIN = [[CUSTOMERS_FILE_PATH,TRANSACTIONS_FILE_PATH],DISTRIBUTOR_FILE_PATH]
            raise ValueError(f"these files couldn't be merged {FILES_TRIED_TO_JOIN}")

        # clean empty indexes (this should filter wrong data written at original file)
        df[CUSTOMER_ID_COLUMN] = df[CUSTOMER_ID_COLUMN].dropna()
        df[DISTRIBUTOR_ID_COLUMN] = df[DISTRIBUTOR_ID_COLUMN].dropna()

        # date parse
        df[DATE_COLUMN] = pd.to_datetime(df[DATE_COLUMN], format=DATE_FORMAT, errors='ignore')
        if df.empty:
            raise ValueError(f"couldn't parse {DATE_COLUMN=}")
    except Exception as e:
        raise e
    else:
        return df

def dag_bytelian_transactions_pipeline():
    """generate a big DaFe to calculate easier, as this file grows strategy should be modified
    with chunk iteration, date iteration, unique client isolation  etc..."""
    return integrate_bi_input_files()

dag_bytelian_transactions_pipeline()\
    .to_csv('pipeline_results.csv')

In [555]:
"""
2. Previo a un análisis de los datos, crear un modelo de Business Intelligence
a nivel analítico para el área comercial.
"""

'\n2. Previo a un análisis de los datos, crear un modelo de Business Intelligence\na nivel analítico para el área comercial.\n'

In [528]:
"""
3. Calcular 2 métricas (monto colocado y número transacciones) donde:
a. Monto Colocado es la suma de la transacción (transaction amount).
b. Número de transacciones es el conteo de las transacciones totales.
"""

def calculate_sum_count_metrics(
    df: DaFe,
    output_sum_column,
    output_count_column,
    parent_column=CUSTOMER_ID_COLUMN,
    children_column=TRANSACTION_AMOUNT_COLUMN,
):
    return (
        df[[parent_column, children_column]]
        .groupby(parent_column)
        .agg(["sum", "count"])
        .reset_index()
        .set_axis(
            [parent_column, output_sum_column, output_count_column], axis="columns"
        )
    )

full_df = dag_bytelian_transactions_pipeline()
calculate_sum_count_metrics(full_df, "Monto Colocado", "Número de transacciones")\
    .to_excel('two_metrics.xlsx')

In [None]:
"""
4. Proponer y justificar 3 métricas, que creas son importantes para la toma de decisiones del
gerente Hugo Montoya.

"""

In [561]:
"""
5. Generar un tablero de control que muestre los principales indicadores de una forma
amigable para que el gerente pueda tomar decisiones de una manera eficaz y sencilla en
diferentes puntos del tiempo.

NOTA: Nuestra empresa necesita de un tablero de control que muestre los
principales indicadores (5) para la correcta toma de decisiones del
gerente comercial Hugo Montoya.
"""

full_df = dag_bytelian_transactions_pipeline()
"""i would rather prefer to iterate thru months,
but as dataset size is micro, daily is more explanatory"""
def customer_with_higher_transaction_amount_sum_per_day(my_df: DaFe) -> DaFe:
    df = my_df.copy()
    # shrink hours minutes seconds, to group only days
    df[DATE_COLUMN] = df[DATE_COLUMN].dt.date
    return (
        df[[DATE_COLUMN,CUSTOMER_ID_COLUMN,TRANSACTION_AMOUNT_COLUMN]]
        .groupby([DATE_COLUMN,CUSTOMER_ID_COLUMN],as_index=False)
        .sum()
        .reset_index(drop=True)
    )

customer_with_higher_transaction_amount_sum_per_day(full_df)\
    .drop(columns=CUSTOMER_ID_COLUMN)\
    .describe()\