In [9]:
import pymongo
from pymongoarrow.monkey import patch_all
import polars as pl
from pymongoarrow.api import write, Schema
from pyarrow import float64, int64
import datetime
import time
import json
import uuid
import concurrent.futures

In [1]:
patch_all()
pl.Config.set_fmt_float("full")
pl.Config.set_tbl_rows(20)
pl.Config.set_fmt_str_lengths(300)

NameError: name 'patch_all' is not defined

In [22]:
# Constants
CONCILIATION_STATUS = "CONCILIATED"
NON_CONCILIATION_STATUS = "REMANENT"
EXTERNAL_SOURCE_NAME = "EXTERNAL_ARCHIVE.csv"
RC_KEYS_JSON = [
    # {"ext_col": "codigo_aprobacion", "int_col": "approval_code"},
    # {"ext_col": "tipo_de_transaccion", "int_col": "transaction_type"},
    # {"ext_col": "estado_transaccion", "int_col": "transaction_status_type"},
    {"ext_col": "codigo_ksh", "int_col": "transaction_code"},
    {"ext_col": "importe", "int_col": "approved_transaction_amount"},
    {"ext_col": "fecha", "int_col": "create_timestamp"},
    {"ext_col": "digitos_bin", "int_col": "bin_code"},
    {"ext_col": "kind_card", "int_col": "card_type"},
    {"ext_col": "ultimos4", "int_col": "last_four_digit_code"},
]
RC_KEYS = json.dumps(RC_KEYS_JSON)
DEFAULT_INTERNAL_FIELDS = [
    "_id",
    "reference_transaction_code",
    "approval_code",
    "processor_type",
    "merchant_name",
    "processor_name",
    "transaction_code",
    "transaction_status_type",
    "transaction_type",
]
INTEGER_FIELDS = ["create_timestamp"]
DOUBLE_FIELDS = ["approved_transaction_amount"]

# Calculated constants
CONCILIATION_TIMESTAMP = 1712188800000
CONCILIATION_DATE = "2024-01-02"
EXECUTION_TYPE = "AUTOMATIC"
EXECUTION_ID = str(uuid.uuid4())
EXECUTION_DATE = str(datetime.date.today())
EXECUTION_TIMESTAMP = int(time.mktime(datetime.datetime.now().timetuple()) * 1000)

# Filters for query internal data
TIMESTAMP_FROM = 1714521600000  # 2024-05-01 00:00:00 records: 1536 aprobadas
TIMESTAMP_TO = 1714523400000  # 2024-05-01 00:30:00
PROCESSOR_NAME = "Kushki Acquirer Processor"
COUNTRY_NAME = "Mexico"

# DB and collections
ODL_DB = "reconciliation_test"
ODL_COLL = "card_transaction"
EXTERNAL_COLL = "external_transaction"
RESULT_COLL = "reconciliation_transactions"
AGG_RESULT_COLL = "reconciliation_summary"
EXT_REMANENT_RECORDS = "external_remanent_records"
TMP_COLL = "tmp_rc_transactions"

In [13]:
import os


client = pymongo.MongoClient(os.getenv("MONGO_URI"))

In [14]:
a_to_b_nm_df = pl.DataFrame().lazy()
b_to_a_nm_df = pl.DataFrame().lazy()

In [16]:
def get_expressions():
    # Receive an array with reconciliation keys
    json_rc_keys = json.loads(RC_KEYS)
    reconciliation_columns_pairs = []
    internal_fields = DEFAULT_INTERNAL_FIELDS

    for key in json_rc_keys:
        reconciliation_columns_pairs.append(
            {"internal_key_name": key["int_col"], "external_key_name": key["ext_col"]}
        )
        internal_fields.append(key["int_col"])

    # Mapp the external keys to match the internal key column names
    rename_exp = {}
    join_exp = []

    for key in reconciliation_columns_pairs:
        rename_exp[f"{key['external_key_name']}"] = key["internal_key_name"]
        join_exp.append(key["internal_key_name"])

    # rename_exp[date_field] = "transaction_create_timestamp" no me acuerdo por que metí esta línea xd
    return rename_exp, join_exp, internal_fields


def get_internal_schemas(odl_fields):
    project_exp = {}
    schema = {}
    remanent_project_exp = {}
    for field in odl_fields:
        project_exp[field] = {"$toString": f"${field}"}
        remanent_project_exp[field] = {"$toString": f"$transaction.{field}"}
        schema[field] = str

        if field in INTEGER_FIELDS:
            project_exp[field] = 1
            remanent_project_exp[field] = f"$transaction.{field}"
            schema[field] = int64()

        if field in DOUBLE_FIELDS:
            project_exp[field] = 1
            remanent_project_exp[field] = f"$transaction.{field}"
            schema[field] = float64()

    return project_exp, remanent_project_exp, schema


def get_internal_df(project_exp, schema):
    card_trx_coll = client[ODL_DB][ODL_COLL]
    internal_df = card_trx_coll.aggregate_polars_all(
        [
            {
                "$match": {
                    "processor_name": PROCESSOR_NAME,
                    "country_name": COUNTRY_NAME,
                    "transaction_status_type": {"$in": ["APPROVED"]},
                    "create_timestamp": {
                        "$gte": TIMESTAMP_FROM,
                        "$lt": TIMESTAMP_TO,
                    },
                },
            },
            {"$project": project_exp},
        ],
        schema=Schema(schema),
    ).lazy()

    return internal_df


def get_remanent_internal_df(project_exp, schema):
    card_trx_coll = client[ODL_DB][RESULT_COLL]
    internal_df = card_trx_coll.aggregate_polars_all(
        [
            {
                "$match": {
                    "processor_name": PROCESSOR_NAME,
                    "transaction_status_type": {"$in": ["APPROVED"]},
                    "conciliation_status": "REMANENT",
                }
            },
            {
                "$lookup": {
                    "from": "card_transaction",
                    "localField": "_id",
                    "foreignField": "_id",
                    "as": "transaction",
                }
            },
            {"$unwind": {"path": "$transaction"}},
            {
                "$project": project_exp,
            },
        ],
        schema=Schema(schema),
    ).lazy()

    return internal_df


def get_external_df(rename_exp):
    ext_trx_coll = client[ODL_DB][EXTERNAL_COLL]
    rename_exp["index"] = "external_row_number"
    renamed_columns = [*rename_exp.values()]

    # Cast to have the same data type for amounts or double values
    with_column_exp = [pl.col("external_row_number").cast(str)]
    for field in DOUBLE_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Float64))

    for field in INTEGER_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Int64))

    external_df = (
        ext_trx_coll.aggregate_polars_all(
            [
                {
                    "$match": {
                        "transaction_status_type": {"$in": ["APPROVED"]},
                        "create_timestamp": {
                            "$gte": TIMESTAMP_FROM,
                            "$lt": TIMESTAMP_TO,
                        },
                    },
                },
                {
                    "$project": {
                        "_id": 1,
                        "referencia": {"$toString": "$reference_transaction_code"},
                        "monto": {"$toString": "$approved_transaction_amount"},
                        "tipo_de_transaccion": {"$toString": "$transaction_type"},
                        "codigo_aprobacion": {"$toString": "$approval_code"},
                        "estado_transaccion": {"$toString": "$transaction_status_type"},
                        "fecha_transaccion": {"$toString": "$create_timestamp"},
                        "bin": {"$toString": "$tmp1_bin_code"},
                        "tipo_tarjeta": {"$toString": "$card_type"},
                        "ultimos_digitos": {"$toString": "$last_four_digit_code"},
                        "codigo_ksh": {"$toString": "$transaction_code"},
                        "processor_name": {"$toString": "$processor_name"},
                        "country_name": {"$toString": "$country_name"},
                        "processor_type": {"$toString": "$processor_type"},
                        "ticket_code": {"$toString": "$ticket_code"},
                    },
                },
            ]
        )
        .with_row_index(offset=1)
        .rename(rename_exp)
        .with_columns(with_column_exp)
        .lazy()
    )

    return external_df


def get_external_df_from_file(rename_exp, file_path):
    rename_exp["index"] = "external_row_number"
    renamed_columns = [*rename_exp.values()]

    # Cast to have the same data type for amounts or double values
    with_column_exp = [pl.col("external_row_number").cast(str)]
    for field in DOUBLE_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Float64))

    for field in INTEGER_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Int64))

    external_df = (
        pl.read_csv(file_path, infer_schema_length=0)
        .with_row_index(offset=1)
        .rename(rename_exp)
        .with_columns(with_column_exp)
        .lazy()
    )

    return external_df


def get_external_df_from_s3(rename_exp):
    rename_exp["index"] = "external_row_number"
    renamed_columns = [*rename_exp.values()]

    # Cast to have the same data type for amounts or double values
    with_column_exp = [pl.col("external_row_number").cast(str)]
    for field in DOUBLE_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Float64))

    for field in INTEGER_FIELDS:
        if field in renamed_columns:
            with_column_exp.append(pl.col(field).cast(pl.Int64))

    external_df = (
        pl.read_csv(
            "s3://finops-interchange-plus/external_one_day.csv",
            infer_schema_length=0,
        )
        .with_row_index(offset=1)
        .rename(rename_exp)
        .with_columns(with_column_exp)
        .lazy()
    )

    return external_df


def get_matching_records(internal_df, external_df, join_exp):
    print(join_exp)

    join_df = (
        internal_df.join(external_df, on=join_exp, how="inner").lazy()
        .with_columns(
            pl.lit(CONCILIATION_STATUS).alias("conciliation_status"),
            pl.lit(CONCILIATION_TIMESTAMP).alias("conciliation_timestamp"),
            pl.lit(CONCILIATION_DATE).alias("conciliation_date"),
            pl.lit(EXECUTION_ID).alias("execution_id"),
            pl.lit(EXECUTION_TYPE).alias("execution_type"),
            pl.lit(EXECUTION_DATE).alias("execution_date"),
            pl.lit(EXECUTION_TIMESTAMP).alias("execution_timestamp"),
            pl.lit(",".join(join_exp)).alias("conciliation_key_code"),
            pl.lit("Descripción").alias("conciliation_key_description"),
            pl.lit(EXTERNAL_SOURCE_NAME).alias("external_source_name"),
        )
        .select(
            [
                "_id",
                "external_row_number",
                "conciliation_status",
                "conciliation_timestamp",
                "conciliation_date",
                "execution_id",
                "execution_type",
                "execution_date",
                "execution_timestamp",
                "conciliation_key_code",
                "conciliation_key_description",
                "external_source_name",
                "create_timestamp",
                "processor_name",
                "transaction_status_type",
                "approved_transaction_amount",
            ]
        )
        .rename({"create_timestamp": "transaction_create_timestamp"})
        .lazy()
    )

    return join_df


def get_internal_not_matching_records(left_df, right_df, join_exp):
    # Find transactions which doesn´t match
    global a_to_b_nm_df
    raw_nm_df = left_df.join(right_df, on=join_exp, how="anti").lazy()
    a_to_b_nm_df = pl.concat([a_to_b_nm_df, raw_nm_df])

    nc_df = (
        raw_nm_df
        .with_columns(
            pl.lit(NON_CONCILIATION_STATUS).alias("conciliation_status"),
            pl.lit(CONCILIATION_TIMESTAMP).alias("conciliation_timestamp"),
            pl.lit(CONCILIATION_DATE).alias("conciliation_date"),
            pl.lit(EXECUTION_ID).alias("execution_id"),
            pl.lit(EXECUTION_TYPE).alias("execution_type"),
            pl.lit(EXECUTION_DATE).alias("execution_date"),
            pl.lit(EXECUTION_TIMESTAMP).alias("execution_timestamp"),
        )
        .select(
            [
                "_id",
                "conciliation_status",
                "conciliation_timestamp",
                "conciliation_date",
                "execution_id",
                "execution_type",
                "execution_date",
                "execution_timestamp",
                "create_timestamp",
                "processor_name",
                "transaction_status_type",
                "approved_transaction_amount",
            ]
        )
        .rename({"create_timestamp": "transaction_create_timestamp"})
        .lazy()
    )

    return nc_df


def get_external_not_matching_records(left_df, right_df, join_exp):
    # Find transactions which doesn´t match
    global b_to_a_nm_df
    raw_nm_df = left_df.join(right_df, on=join_exp, how="anti").lazy()
    b_to_a_nm_df = pl.concat([b_to_a_nm_df, raw_nm_df])

    nc_df = (
        raw_nm_df
        .with_columns(
            pl.lit(NON_CONCILIATION_STATUS).alias("conciliation_status"),
            pl.lit(CONCILIATION_TIMESTAMP).alias("conciliation_timestamp"),
            pl.lit(CONCILIATION_DATE).alias("conciliation_date"),
            pl.lit(EXECUTION_ID).alias("execution_id"),
            pl.lit(EXECUTION_TYPE).alias("execution_type"),
            pl.lit(EXECUTION_DATE).alias("execution_date"),
            pl.lit(EXECUTION_TIMESTAMP).alias("execution_timestamp"),
            pl.lit(EXTERNAL_SOURCE_NAME).alias("external_source_name"),
            pl.lit(PROCESSOR_NAME).alias("processor_name"),
        )
        .select(
            [
                "_id",
                "external_row_number",
                "conciliation_status",
                "conciliation_timestamp",
                "conciliation_date",
                "execution_id",
                "execution_type",
                "execution_date",
                "execution_timestamp",
                "external_source_name",
                "processor_name",
                "create_timestamp",
            ]
        )
        .rename({"create_timestamp": "transaction_create_timestamp"})
        .lazy()
    )

    return nc_df


def write_to_mongo(dataframe, dest_collection):
    results_coll = client[ODL_DB][dest_collection]
    # print(dataframe.head(5))
    write(results_coll, dataframe.collect())


def clean_tmp_collections():
    client[ODL_DB][TMP_COLL].drop()


def divide_timestamps_into_intervals(n, source_coll, dest_coll):
    interval = (TIMESTAMP_TO - TIMESTAMP_FROM) / n
    intervals = []
    start = TIMESTAMP_FROM
    for i in range(n):
        end = start + interval
        intervals.append((int(start), int(end), source_coll, dest_coll))
        start = end
    return intervals


def move_tmp_data(interval):
    client[ODL_DB][interval[2]].aggregate(
        [
            {
                "$match": {
                    "transaction_create_timestamp": {
                        "$gte": interval[0],
                        "$lt": interval[1],
                    }
                }
            },
            {
                "$merge": {
                    "into": {"db": ODL_DB, "coll": interval[3]},
                    "on": "_id",
                    "whenMatched": "merge",
                    "whenNotMatched": "insert",
                }
            },
        ]
    )


def move_tmp_data_to_final(source_coll, dest_coll):
    ivs = divide_timestamps_into_intervals(5, source_coll, dest_coll)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(move_tmp_data, ivs)


def save_aggregated_results(match_df: pl.LazyFrame, unmatch_df: pl.LazyFrame):
    # Get the aggregated results as DF and transform them to dictionaries
    agg_match_dic = (
        match_df.select(pl.sum("approved_transaction_amount"), pl.count("_id"))
        .collect()
        .iter_rows(named=True)
        .__next__()
    )
    agg_unmatch_dic = (
        unmatch_df.select(pl.sum("approved_transaction_amount"), pl.count("_id"))
        .collect()
        .iter_rows(named=True)
        .__next__()
    )

    mongo_data = {
        "_id": EXECUTION_ID,
        "execution_type": EXECUTION_TYPE,
        "execution_timestamp": EXECUTION_TIMESTAMP,
        "execution_date": EXECUTION_DATE,
        "conciliation_timestamp": CONCILIATION_TIMESTAMP,
        "conciliation_date": CONCILIATION_DATE,
        "processor_name": PROCESSOR_NAME,
        "conciliated_transactions_number": agg_match_dic["_id"],
        "remanent_transactions_number": agg_unmatch_dic["_id"],
        "conciliated_amount": agg_match_dic["approved_transaction_amount"],
        "remanent_amount": agg_unmatch_dic["approved_transaction_amount"],
        "conciliation_currency": "MXN",
    }

    print(mongo_data)

    agg_results_coll = client[ODL_DB][AGG_RESULT_COLL]
    agg_results_coll.insert_one(mongo_data)


def persist_results(
    matching_df: pl.LazyFrame,
    unmatching_df: pl.LazyFrame,
    ext_unmatching_df: pl.LazyFrame,
):
    write_to_mongo(matching_df, TMP_COLL)
    move_tmp_data_to_final(TMP_COLL, RESULT_COLL)
    clean_tmp_collections()

    write_to_mongo(unmatching_df, TMP_COLL)
    move_tmp_data_to_final(TMP_COLL, RESULT_COLL)
    clean_tmp_collections()

    write_to_mongo(ext_unmatching_df, TMP_COLL)
    move_tmp_data_to_final(TMP_COLL, EXT_REMANENT_RECORDS)
    clean_tmp_collections()

    save_aggregated_results(matching_df, unmatching_df)

In [28]:
# Generate base experssions to operate
rename_exp, join_exp, odl_fields = get_expressions()
int_prjt_exp, rmnt_int_prjt_exp, schema = get_internal_schemas(odl_fields)

# Get the time window data and the remanant data to conciliate
ksh_df = get_internal_df(int_prjt_exp, schema)
ksh_df.collect()

_id,reference_transaction_code,approval_code,processor_type,merchant_name,processor_name,transaction_code,transaction_status_type,transaction_type,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code
str,str,str,str,str,str,str,str,str,f64,i64,str,str,str
"""821714521600608331390""","""528d6584-9eda-4182-933c-d54d218cb228""","""847924""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521600608331390""","""APPROVED""","""SALE""",1,1714521600000,"""52885191""","""credit""","""3566"""
"""821714521600784557783""","""fa3fcf02-405e-4a9e-a6d5-280609e5c271""","""694055""","""aggregator_formal""","""WINPOT OA VIP""","""Kushki Acquirer Processor""","""821714521600784557783""","""APPROVED""","""SALE""",500,1714521600000,"""48151630""","""debit""","""9059"""
"""821714521601280264609""","""17e9ec37-ad2a-461b-b9df-d35f94d81218""","""001431""","""aggregator_formal""","""Justo MX""","""Kushki Acquirer Processor""","""821714521601280264609""","""APPROVED""","""SALE""",339,1714521601000,"""49156663""","""debit""","""0560"""
"""821714521601853677768""","""6987854e-1af7-464f-bff0-07d9e209a7dc""","""510139""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521601853677768""","""APPROVED""","""SALE""",399,1714521601000,"""53492613""","""debit""","""9181"""
"""821714521601967915232""","""c7bc0b7a-5936-491f-9c7f-5acc692cda0c""","""073396""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521601967915232""","""APPROVED""","""SALE""",1,1714521601000,"""42131661""","""debit""","""2003"""
"""d684b61e-7783-4fa5-bc28-f3e1821b7e8e""","""d684b61e-7783-4fa5-bc28-f3e1821b7e8e""","""694904""","""aggregator_formal""","""ASTORIA2""","""Kushki Acquirer Processor""","""111714521602554397""","""APPROVED""","""SALE""",300,1714521602554,"""41523139""","""debit""","""9126"""
"""9e2aecc1-f210-4832-be65-df2ff27c3d3d""","""9e2aecc1-f210-4832-be65-df2ff27c3d3d""","""00114Z""","""aggregator_formal""","""AGREGADOR""","""Kushki Acquirer Processor""","""111714521603217222""","""APPROVED""","""SALE""",1000,1714521603217,"""51567694""","""credit""","""8068"""
"""821714521604724672790""","""5f4a6536-b664-48c3-82b9-fe239b667439""","""000322""","""aggregator_formal""","""Aplazo""","""Kushki Acquirer Processor""","""821714521604724672790""","""APPROVED""","""SALE""",10,1714521604000,"""55790701""","""debit""","""6199"""
"""31be6822-a8d9-4da5-ba77-d77b57b4b9ff""","""31be6822-a8d9-4da5-ba77-d77b57b4b9ff""","""695986""","""aggregator_formal""","""JUMAMEX SALAS""","""Kushki Acquirer Processor""","""111714521606647029""","""APPROVED""","""SALE""",300,1714521606647,"""41523141""","""debit""","""0161"""
"""f64d54d6-7744-4c4a-9200-1a140112555f""","""f64d54d6-7744-4c4a-9200-1a140112555f""","""696000""","""aggregator_formal""","""ASTORIA2""","""Kushki Acquirer Processor""","""111714521606692073""","""APPROVED""","""SALE""",200,1714521606692,"""41523142""","""debit""","""0029"""


In [29]:
# Get remanent data to conciliate
ksh_remanent_df = get_remanent_internal_df(rmnt_int_prjt_exp, schema)
ksh_complete_df = pl.concat([ksh_df, ksh_remanent_df]).unique(
    keep="first", maintain_order=True
)
ksh_complete_df.collect()

_id,reference_transaction_code,approval_code,processor_type,merchant_name,processor_name,transaction_code,transaction_status_type,transaction_type,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code
str,str,str,str,str,str,str,str,str,f64,i64,str,str,str
"""821714521600608331390""","""528d6584-9eda-4182-933c-d54d218cb228""","""847924""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521600608331390""","""APPROVED""","""SALE""",1,1714521600000,"""52885191""","""credit""","""3566"""
"""821714521600784557783""","""fa3fcf02-405e-4a9e-a6d5-280609e5c271""","""694055""","""aggregator_formal""","""WINPOT OA VIP""","""Kushki Acquirer Processor""","""821714521600784557783""","""APPROVED""","""SALE""",500,1714521600000,"""48151630""","""debit""","""9059"""
"""821714521601280264609""","""17e9ec37-ad2a-461b-b9df-d35f94d81218""","""001431""","""aggregator_formal""","""Justo MX""","""Kushki Acquirer Processor""","""821714521601280264609""","""APPROVED""","""SALE""",339,1714521601000,"""49156663""","""debit""","""0560"""
"""821714521601853677768""","""6987854e-1af7-464f-bff0-07d9e209a7dc""","""510139""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521601853677768""","""APPROVED""","""SALE""",399,1714521601000,"""53492613""","""debit""","""9181"""
"""821714521601967915232""","""c7bc0b7a-5936-491f-9c7f-5acc692cda0c""","""073396""","""aggregator_formal""","""UNDOSTRES""","""Kushki Acquirer Processor""","""821714521601967915232""","""APPROVED""","""SALE""",1,1714521601000,"""42131661""","""debit""","""2003"""
"""d684b61e-7783-4fa5-bc28-f3e1821b7e8e""","""d684b61e-7783-4fa5-bc28-f3e1821b7e8e""","""694904""","""aggregator_formal""","""ASTORIA2""","""Kushki Acquirer Processor""","""111714521602554397""","""APPROVED""","""SALE""",300,1714521602554,"""41523139""","""debit""","""9126"""
"""9e2aecc1-f210-4832-be65-df2ff27c3d3d""","""9e2aecc1-f210-4832-be65-df2ff27c3d3d""","""00114Z""","""aggregator_formal""","""AGREGADOR""","""Kushki Acquirer Processor""","""111714521603217222""","""APPROVED""","""SALE""",1000,1714521603217,"""51567694""","""credit""","""8068"""
"""821714521604724672790""","""5f4a6536-b664-48c3-82b9-fe239b667439""","""000322""","""aggregator_formal""","""Aplazo""","""Kushki Acquirer Processor""","""821714521604724672790""","""APPROVED""","""SALE""",10,1714521604000,"""55790701""","""debit""","""6199"""
"""31be6822-a8d9-4da5-ba77-d77b57b4b9ff""","""31be6822-a8d9-4da5-ba77-d77b57b4b9ff""","""695986""","""aggregator_formal""","""JUMAMEX SALAS""","""Kushki Acquirer Processor""","""111714521606647029""","""APPROVED""","""SALE""",300,1714521606647,"""41523141""","""debit""","""0161"""
"""f64d54d6-7744-4c4a-9200-1a140112555f""","""f64d54d6-7744-4c4a-9200-1a140112555f""","""696000""","""aggregator_formal""","""ASTORIA2""","""Kushki Acquirer Processor""","""111714521606692073""","""APPROVED""","""SALE""",200,1714521606692,"""41523142""","""debit""","""0029"""


In [30]:
# Get the external data to conciliate
external_df = get_external_df_from_file(rename_exp, "files/odl_2024-05-01_30_mins.csv")
external_df.collect()

external_row_number,_id,referencia,approved_transaction_amount,tipo_de_transaccion,codigo_aprobacion,estado_transaccion,create_timestamp,bin_code,card_type,last_four_digit_code,transaction_code,processor_name,country_name,processor_type,ticket_code
str,str,str,f64,str,str,str,i64,str,str,str,str,str,str,str,str
"""1""","""821714522384647225341""","""bb4a9b93-0bfa-4518-b8de-ed8ca82d4a9d""",5264.73,"""PREAUTHORIZATION""","""934247""","""APPROVED""",1714522384000,"""48151430""","""credit""","""4837""","""821714522384647225341""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714522384656276"""
"""2""","""821714523398297851988""","""f530c55a-92ce-4471-9439-af5e333c989a""",1887.65,"""SALE""","""001207""","""APPROVED""",1714523398000,"""49317300""","""credit""","""1985""","""821714523398297851988""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523398295552"""
"""3""","""821714523395264732530""","""2cbb709f-6054-4de0-83c9-f722441b589b""",567.5,"""SALE""","""324937""","""APPROVED""",1714523395000,"""41982200""","""debit""","""4620""","""821714523395264732530""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523395291795"""
"""4""","""821714523394560829528""","""e631b6e8-b0c6-4501-a6cb-fb47892bfc1e""",1,"""SALE""","""080234""","""APPROVED""",1714523394000,"""52041656""","""debit""","""2295""","""821714523394560829528""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523394555132"""
"""5""","""821714523393191606388""","""42c0b887-2c5f-48df-8264-7d20cc611f66""",116.87,"""SALE""","""358371""","""APPROVED""",1714523393000,"""44217700""","""debit""","""2273""","""821714523393191606388""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523393218671"""
"""6""","""821714523392439769236""","""09122040-ff86-4deb-8930-f5e4e85b3d02""",10,"""SALE""","""675415""","""APPROVED""",1714523392000,"""41982200""","""debit""","""4620""","""821714523392439769236""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523392483651"""
"""7""","""82171452339025457444""","""ca0e9461-bfc6-4bc8-bf03-73024751ee75""",417.82,"""SALE""","""545366""","""APPROVED""",1714523390000,"""54287803""","""debit""","""1627""","""82171452339025457444""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523390023658"""
"""8""","""821714523390712104648""","""192b184b-3cc9-41fb-82be-36ebddda2d0c""",10,"""SALE""","""083826""","""APPROVED""",1714523390000,"""52567838""","""debit""","""2993""","""821714523390712104648""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523390739168"""
"""9""","""821714523387733410597""","""01dff8db-9d6d-4125-8d7f-646e050cd2cf""",10,"""SALE""","""173013""","""APPROVED""",1714523387000,"""45890910""","""credit""","""8076""","""821714523387733410597""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523387742512"""
"""10""","""821714523383906395907""","""9b8ec1f0-6891-45b0-bdb6-d776483a2f43""",50,"""SALE""","""569004""","""APPROVED""",1714523383000,"""52677711""","""credit""","""3623""","""821714523383906395907""","""Kushki Acquirer Processor""","""Mexico""","""aggregator_formal""","""821714523383933460"""


In [31]:
 # Get the matching records
matching_df = get_matching_records(ksh_complete_df, external_df, join_exp)
matching_df.collect()

['transaction_code', 'approved_transaction_amount', 'create_timestamp', 'bin_code', 'card_type', 'last_four_digit_code']


_id,external_row_number,conciliation_status,conciliation_timestamp,conciliation_date,execution_id,execution_type,execution_date,execution_timestamp,conciliation_key_code,conciliation_key_description,external_source_name,transaction_create_timestamp,processor_name,transaction_status_type,approved_transaction_amount
str,str,str,i64,str,str,str,str,i64,str,str,str,i64,str,str,f64
"""821714522384647225341""","""1""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714522384000,"""Kushki Acquirer Processor""","""APPROVED""",5264.73
"""821714523398297851988""","""2""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523398000,"""Kushki Acquirer Processor""","""APPROVED""",1887.65
"""821714523395264732530""","""3""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523395000,"""Kushki Acquirer Processor""","""APPROVED""",567.5
"""821714523394560829528""","""4""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523394000,"""Kushki Acquirer Processor""","""APPROVED""",1
"""821714523393191606388""","""5""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523393000,"""Kushki Acquirer Processor""","""APPROVED""",116.87
"""821714523392439769236""","""6""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523392000,"""Kushki Acquirer Processor""","""APPROVED""",10
"""82171452339025457444""","""7""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523390000,"""Kushki Acquirer Processor""","""APPROVED""",417.82
"""821714523390712104648""","""8""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523390000,"""Kushki Acquirer Processor""","""APPROVED""",10
"""821714523387733410597""","""9""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523387000,"""Kushki Acquirer Processor""","""APPROVED""",10
"""821714523383906395907""","""10""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523383000,"""Kushki Acquirer Processor""","""APPROVED""",50


In [32]:
# Get duplicated records in the matching dataframe
duplicate_df = matching_df.filter(pl.col("_id").is_duplicated())
duplicate_df.collect()

_id,external_row_number,conciliation_status,conciliation_timestamp,conciliation_date,execution_id,execution_type,execution_date,execution_timestamp,conciliation_key_code,conciliation_key_description,external_source_name,transaction_create_timestamp,processor_name,transaction_status_type,approved_transaction_amount
str,str,str,i64,str,str,str,str,i64,str,str,str,i64,str,str,f64
"""821714523398297851988""","""2""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523398000,"""Kushki Acquirer Processor""","""APPROVED""",1887.65
"""821714523395264732530""","""3""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523395000,"""Kushki Acquirer Processor""","""APPROVED""",567.5
"""821714523394560829528""","""4""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523394000,"""Kushki Acquirer Processor""","""APPROVED""",1.0
"""821714523398297851988""","""1537""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523398000,"""Kushki Acquirer Processor""","""APPROVED""",1887.65
"""821714523395264732530""","""1538""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523395000,"""Kushki Acquirer Processor""","""APPROVED""",567.5
"""821714523394560829528""","""1539""","""CONCILIATED""",1712188800000,"""2024-01-02""","""9134fad3-10e9-40eb-a3ca-c7a983a738de""","""AUTOMATIC""","""2024-05-27""",1716843408000,"""transaction_code,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code""","""Descripción""","""EXTERNAL_ARCHIVE.csv""",1714523394000,"""Kushki Acquirer Processor""","""APPROVED""",1.0


In [33]:
# Get records that are in Kushki data but not in external data
a_to_b_unmatching_df = get_internal_not_matching_records(
    ksh_complete_df, external_df, join_exp
)
a_to_b_unmatching_df.collect()

_id,conciliation_status,conciliation_timestamp,conciliation_date,execution_id,execution_type,execution_date,execution_timestamp,transaction_create_timestamp,processor_name,transaction_status_type,approved_transaction_amount
str,str,i64,str,str,str,str,i64,i64,str,str,f64


In [34]:
a_to_b_nm_df.collect()

_id,reference_transaction_code,approval_code,processor_type,merchant_name,processor_name,transaction_code,transaction_status_type,transaction_type,approved_transaction_amount,create_timestamp,bin_code,card_type,last_four_digit_code
str,str,str,str,str,str,str,str,str,f64,i64,str,str,str


In [35]:
# Get records that are in external data but not in Kushki data
b_to_a_unmatching_df = get_external_not_matching_records(
    external_df, ksh_complete_df, join_exp
)
b_to_a_unmatching_df.collect()

_id,external_row_number,conciliation_status,conciliation_timestamp,conciliation_date,execution_id,execution_type,execution_date,execution_timestamp,external_source_name,processor_name,transaction_create_timestamp
str,str,str,i64,str,str,str,str,i64,str,str,i64


In [36]:
b_to_a_nm_df.collect()

external_row_number,_id,referencia,approved_transaction_amount,tipo_de_transaccion,codigo_aprobacion,estado_transaccion,create_timestamp,bin_code,card_type,last_four_digit_code,transaction_code,processor_name,country_name,processor_type,ticket_code
str,str,str,f64,str,str,str,i64,str,str,str,str,str,str,str,str


In [18]:
# results_coll = client[ODL_DB][TMP_COLL]
# write(results_coll, ksh_df.collect())

{'insertedCount': 254548}