# Billpocket Data Engineer Challenge Winning Entry
The one notebook to rule them all and to land a job at Billpocket.

As per the original instructions file in `../docs/Test.docx` (translated and adapted for this notebook):

>Instructions:
>
>Inside the `../data/raw/` there exists a single file in CSV format, build a Python notebook to make the following tasks:
>	- Take the value of the columns and put them in the correct format.
>	- Remove from dataset all the transactions where the chargeback is greater than the amount.
>	- What is the total amount of the completed and failed transactions?
>	- Compute the percentage of completed transactions by merchant and month.
>	- Create a flag by merchant and month that marks if the merchant had at least one transaction in each status. Use 0 to indicate that it didn't have all statuses in the given month and 1 to indicate it had them all.
>	- Through Banxico's API, fetch the exchange rate (Tipo de cambio para solventar obligaciones denominadas en moneda extranjera Fecha de determinación (FIX)). https://www.banxico.org.mx/SieAPIRest/service/v1/
>	- Transform dollar monetary amounts to pesos (Use the monthly average of exchange rates).
>	- Export the resulting DataFrame to a csv file.
>Considerations
>	- The transaction id column must be numeric and with unique values.
>	- The operation month column must be of date type.



- ## Take the value of the columns and put them in the correct format:

In [1]:
import findspark
from ruamel import yaml
import os
from pyspark.sql import SparkSession, DataFrame, Column
from pyspark.sql.window import Window
from pyspark.sql import types as T
from pyspark.sql import functions as F
import httpx
from typing import Callable, Union
from datetime import date 
from functools import reduce
import pandas as pd
from dotenv import load_dotenv

# Read configuration environment variables from .env
load_dotenv()

# Show up to 15 cols, 50 rows by default
pd.set_option('display.max_columns', 15)
pd.set_option('display.max_rows', 50)

# Suitable default display for floats
pd.set_option('display.float_format', '{:,.2f}'.format)
pd.set_option('display.precision', 2)

# Adds PySpark path to sys.path at runtime
findspark.init()

spark : SparkSession = SparkSession.builder.appName(
    os.environ.get('PYSPARK_APP_NAME', 'billpocket-de-challenge')
).master(
    os.environ.get('PYSPARK_MASTER', 'local[4]')
).getOrCreate()
"""Current Spark Session

The spark session is initialized using the best practices of automatically
finding (Py)Spark via findspark and configuring session with environment variables.
"""

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/31 03:39:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
def extract(source : str) -> DataFrame:
    """Extracts a Spark Data Frame from a source schema definition YAML.
    
    Parameters
    ----------
    source : `str`
        the source schema definition YAML.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame described within the schema definition YAML.
        
    Examples
    --------
    >>> extract('../lib/schemas/Examen.sdf.yaml').printSchema()
    root
    |-- tx_agg_id: string (nullable = true)
    |-- merchant_id: string (nullable = true)
    |-- op_month_date: date (nullable = true)
    |-- status: string (nullable = true)
    |-- cc: string (nullable = true)
    |-- tx_cnt: string (nullable = true)
    |-- tx_amt: string (nullable = true)
    |-- cb_amt: string (nullable = true)
    
    >>> extract('../lib/schemas/Examen.sdf.yaml').inputFiles()
    ['file:///Users/masterquiroga/Documents/Interviews/Billpocket/billpocket-de-challenge/data/raw/Examen.csv']

    >>> extract('../lib/schemas/Examen.sdf.yaml').schema.jsonValue()
    {'type': 'struct',
    'fields': [{'name': 'tx_agg_id',
    'type': 'string',
    'nullable': True,
    'metadata': {'description': 'Transaction aggregate ID in a four digits format.',
        'validators': [{'type': 'long'}, {'min': 1001}, {'unique': True}],
        'alias': 'tx_agg_id',
        'column': 'Id transacci�n',
        'comments': "Best practice is to use snake-case and common field abreviations such as 'tx' or 'agg' for transaction. I also use a validator pipeline field based in a mongoose-like validation (since it is suitable as general-purpose validation) and a parser pipeline field based on an intuitive standard (useful for the later transformations). Since, by the visual inspection of the CSV, there's a rogue '+' character we delete it as well to keep data as tidy as possible.\n"}},
    {'name': 'merchant_id',
    'type': 'string',
    'nullable': True,
    'metadata': {'validators': [{'type': 'long'}, {'min': 0}],
        'alias': 'merchant_id',
        'column': 'Comercio Id',
        'description': 'Merchant ID in digits format.'}},
    {'name': 'op_month_date',
    'type': 'date',
    'nullable': True,
    'metadata': {'description': 'Month of operation of this transaction coded as a date.',
        'validators': [{'type': 'date'}],
        'alias': 'op_month_date',
        'column': 'Mes Operacion',
        'comments': "Another best practice from data-governance is to use data suffixes in order to avoid confusion such as 'date', which conveniently allows date arithmetic.\n"}},
    {'name': 'status',
    ...
    'nullable': True,
    'metadata': {'validators': [{'min': 0.0}],
        'alias': 'cb_amt',
        'column': 'Monto Contracargo (Venta)',
        'desription': None}}]}

    See Also
    --------
    ../lib/schemas/Examen.sdf.yaml :
        For a detailed rationale and descriptions of each field.
    """
    with open(file=source, mode='rb') as file:
        metadata, reader, schema = tuple(yaml.load_all(file, yaml.SafeLoader))
        
        sdf = spark.read.format(
            reader['format']
        ).options(
            **reader['options']
        ).schema(
            T.StructType.fromJson(schema)
        ).load(
            reader['path']
        ).dropDuplicates()
        
        for field in schema['fields']:
            sdf = sdf.withColumnRenamed(
                field['name'],
                field['metadata']['alias']
            ).withMetadata(
                field['metadata']['alias'],
                {
                    "column": field['name'],
                    **field['metadata']
                }
            )
        
        file.close()
        
        return sdf

sdf : DataFrame = extract('../lib/schemas/Examen.sdf.yaml')
"""The main Spark Data Frame used in this challenge.

It consists of a transactional aggregate of:
- transaction frequencies,
- transactional volume amount,
- and chargeback severity amount;

by:
- merchant,
- (operational) month
- and transaction status.
"""

sdf.cache()

sdf.describe().toPandas()

                                                                                

Unnamed: 0,summary,tx_agg_id,merchant_id,status,cc,tx_cnt,tx_amt,cb_amt
0,count,2000.0,2000.0,2000,2000,2000.0,2000,2000.0
1,mean,1999.9357752132464,125.047,,,196.67074317968016,400.871452991453,6.732720970537261
2,stddev,577.6373112455477,74.48365390036379,,,264.8798341061732,290.332694386573,65.6765376485799
3,min,1001.0,0.0,CANCELLED,AED,1.0,"$11,525,192.80",0.0
4,max,3000.0,99.0,IN_PROGRESS,USD,989.0,998299943.52,97993.0


In [3]:
def with_adequate_format(sdf : DataFrame) -> DataFrame:
    """Takes the columns values and puts them in correct format.
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame with the columns values in correct format.
        
    See Also
    --------
    `sdf`

    Examples
    --------
    >>> sdf.transform(with_adequate_format).show(5)
    +---------+-----------+-------------+-----------+---+-------+-------------+------+
    |tx_agg_id|merchant_id|op_month_date|     status| cc| tx_cnt|       tx_amt|cb_amt|
    +---------+-----------+-------------+-----------+---+-------+-------------+------+
    |     1001|          0|   2021-07-01|IN_PROGRESS|MXN|3679780|1.192891046E9|   0.0|
    |     1002|          0|   2021-07-01|  COMPLETED|MXN|  12366|  2.3472915E7|   0.0|
    |     1003|          0|   2021-07-01|     FAILED|MXN|   5115|   7124189.72|   0.0|
    |     1004|          0|   2021-08-01|IN_PROGRESS|MXN|2536306|8.193985268E9|   0.0|
    |     1005|          0|   2021-08-01|  CANCELLED|MXN| 876854|       2786.0|   0.0|
    +---------+-----------+-------------+-----------+---+-------+-------------+------+
    only showing top 5 rows
    """
    non_digits = str(r'[^\d\.]')
    return sdf.select(
        F.regexp_replace('tx_agg_id', non_digits, '').cast('long').alias('tx_agg_id'),
        F.col('merchant_id').cast('long'),
        'op_month_date',
        'status',
        'cc',
        F.regexp_replace('tx_cnt', non_digits, '').cast('long').alias('tx_cnt'),
        F.regexp_replace('tx_amt', non_digits, '').cast('double').alias('tx_amt'),
        F.regexp_replace('cb_amt', non_digits, '').cast('double').alias('cb_amt'),
    ).orderBy('tx_agg_id')
    
sdf.transform(with_adequate_format).toPandas()

                                                                                

Unnamed: 0,tx_agg_id,merchant_id,op_month_date,status,cc,tx_cnt,tx_amt,cb_amt
0,1001,0,2021-07-01,IN_PROGRESS,MXN,3679780,1192891046.00,0.00
1,1002,0,2021-07-01,COMPLETED,MXN,12366,23472915.00,0.00
2,1003,0,2021-07-01,FAILED,MXN,5115,7124189.72,0.00
3,1004,0,2021-08-01,IN_PROGRESS,MXN,2536306,8193985268.00,0.00
4,1005,0,2021-08-01,CANCELLED,MXN,876854,2786.00,0.00
...,...,...,...,...,...,...,...,...
1995,2996,259,2021-08-01,FAILED,MXN,1636,1966402.39,0.00
1996,2997,259,2021-08-01,COMPLETED,MXN,826,991902.66,9154.00
1997,2998,260,2021-08-01,CANCELLED,MXN,1,217369.92,0.00
1998,2999,260,2021-07-01,COMPLETED,MXN,1713,4374585.07,0.00


  - ## Remove from dataset all the transactions where the chargeback is greater than the amount.

In [4]:
def without_excessive_chargebacks(sdf : DataFrame) -> DataFrame:
    """Removes from dataset all the transactions where the chargeback is greater than the amount
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame with all the transactions where the chargeback is greater than the amount removed.
    
    Examples
    --------
    >>>
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    |tx_agg_id|merchant_id|op_month_date|     status| cc| tx_cnt|       tx_amt|   cb_amt|
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    |     1001|          0|   2021-07-01|IN_PROGRESS|MXN|3679780|1.192891046E9|      0.0|
    |     1002|          0|   2021-07-01|  COMPLETED|MXN|  12366|  2.3472915E7|      0.0|
    |     1003|          0|   2021-07-01|     FAILED|MXN|   5115|   7124189.72|      0.0|
    |     1004|          0|   2021-08-01|IN_PROGRESS|MXN|2536306|8.193985268E9|      0.0|
    |     1005|          0|   2021-08-01|  CANCELLED|MXN| 876854|       2786.0|      0.0|
    |     1006|          0|   2021-08-01|  COMPLETED|MXN|  11693|  2.2654486E7|      0.0|
    |     1007|          0|   2021-08-01|     FAILED|MXN|   4629|   6616879.32|  38500.0|
    |     1008|          1|   2021-07-01|IN_PROGRESS|MXN|1748668|2.975559643E9|      0.0|
    |     1009|          1|   2021-07-01|  COMPLETED|MXN|   8218|    9188680.0|      0.0|
    |     1010|          1|   2021-07-01|     FAILED|MXN|   3969|   3114530.59|      0.0|
    |     1011|          1|   2021-08-01|IN_PROGRESS|MXN|2024423|3.373727716E9|      0.0|
    |     1012|          1|   2021-08-01|  COMPLETED|MXN|   8433|  1.0123512E7|      0.0|
    |     1013|          1|   2021-08-01|     FAILED|MXN|   3923|   3188906.68|      0.0|
    |     1014|          2|   2021-07-01|     FAILED|MXN|1740857| 1.43075558E8|      0.0|
    |     1015|          2|   2021-07-01|  COMPLETED|MXN| 190740| 1.49914214E7|292951.89|
    |     1016|          2|   2021-07-01|      ERROR|MXN|   5528|     453723.8|      0.0|
    |     1017|          2|   2021-07-01|IN_PROGRESS|MXN|     27|      2306.56|      0.0|
    |     1018|          2|   2021-08-01|     FAILED|MXN|1399830|   1046906.47|      0.0|
    |     1020|          2|   2021-08-01|      ERROR|MXN|    555|     42137.63|      0.0|
    |     1021|          2|   2021-08-01|  CANCELLED|MXN|      8|   1748135.91|      0.0|
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    only showing top 20 rows

    
    See Also
    --------
    `sdf`
    """
    return sdf.where('cb_amt <= tx_amt')

def transform(
    sdf : DataFrame,
    pipeline : list[Callable[[DataFrame], DataFrame]]
) -> DataFrame:
    """Transforms a Spark Data Frame with a given transformation pipeline.

    Params
    ------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
    pipeline : `list[Callable[[pyspark.sql.DataFrame], pyspark.sql.DataFrame]]`
        a list of transformations to apply in order to this Spark Data Frame.

    Returns
    -------
    `pyspark.sql.DataFrame`
        the resulting Spark Data Frame after chaining the transformations
    
    Example
    -------
    >>>
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    |tx_agg_id|merchant_id|op_month_date|     status| cc| tx_cnt|       tx_amt|   cb_amt|
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    |     1001|          0|   2021-07-01|IN_PROGRESS|MXN|3679780|1.192891046E9|      0.0|
    |     1002|          0|   2021-07-01|  COMPLETED|MXN|  12366|  2.3472915E7|      0.0|
    |     1003|          0|   2021-07-01|     FAILED|MXN|   5115|   7124189.72|      0.0|
    |     1004|          0|   2021-08-01|IN_PROGRESS|MXN|2536306|8.193985268E9|      0.0|
    |     1005|          0|   2021-08-01|  CANCELLED|MXN| 876854|       2786.0|      0.0|
    |     1006|          0|   2021-08-01|  COMPLETED|MXN|  11693|  2.2654486E7|      0.0|
    |     1007|          0|   2021-08-01|     FAILED|MXN|   4629|   6616879.32|  38500.0|
    |     1008|          1|   2021-07-01|IN_PROGRESS|MXN|1748668|2.975559643E9|      0.0|
    |     1009|          1|   2021-07-01|  COMPLETED|MXN|   8218|    9188680.0|      0.0|
    |     1010|          1|   2021-07-01|     FAILED|MXN|   3969|   3114530.59|      0.0|
    |     1011|          1|   2021-08-01|IN_PROGRESS|MXN|2024423|3.373727716E9|      0.0|
    |     1012|          1|   2021-08-01|  COMPLETED|MXN|   8433|  1.0123512E7|      0.0|
    |     1013|          1|   2021-08-01|     FAILED|MXN|   3923|   3188906.68|      0.0|
    |     1014|          2|   2021-07-01|     FAILED|MXN|1740857| 1.43075558E8|      0.0|
    |     1015|          2|   2021-07-01|  COMPLETED|MXN| 190740| 1.49914214E7|292951.89|
    |     1016|          2|   2021-07-01|      ERROR|MXN|   5528|     453723.8|      0.0|
    |     1017|          2|   2021-07-01|IN_PROGRESS|MXN|     27|      2306.56|      0.0|
    |     1018|          2|   2021-08-01|     FAILED|MXN|1399830|   1046906.47|      0.0|
    |     1020|          2|   2021-08-01|      ERROR|MXN|    555|     42137.63|      0.0|
    |     1021|          2|   2021-08-01|  CANCELLED|MXN|      8|   1748135.91|      0.0|
    +---------+-----------+-------------+-----------+---+-------+-------------+---------+
    only showing top 20 rows    
        
    See Also
    --------
    `extract`
    `transform`
    """
    def _transformation_chain(
        sdf : DataFrame,
        pipe: Callable[[DataFrame], DataFrame]
    ) -> DataFrame:
        return sdf.transform(pipe)
        
    return reduce(
        _transformation_chain,
        pipeline,
        sdf
    )
    
transform(sdf, [
    with_adequate_format,
    without_excessive_chargebacks
]).toPandas()

                                                                                

Unnamed: 0,tx_agg_id,merchant_id,op_month_date,status,cc,tx_cnt,tx_amt,cb_amt
0,1001,0,2021-07-01,IN_PROGRESS,MXN,3679780,1192891046.00,0.00
1,1002,0,2021-07-01,COMPLETED,MXN,12366,23472915.00,0.00
2,1003,0,2021-07-01,FAILED,MXN,5115,7124189.72,0.00
3,1004,0,2021-08-01,IN_PROGRESS,MXN,2536306,8193985268.00,0.00
4,1005,0,2021-08-01,CANCELLED,MXN,876854,2786.00,0.00
...,...,...,...,...,...,...,...,...
1976,2996,259,2021-08-01,FAILED,MXN,1636,1966402.39,0.00
1977,2997,259,2021-08-01,COMPLETED,MXN,826,991902.66,9154.00
1978,2998,260,2021-08-01,CANCELLED,MXN,1,217369.92,0.00
1979,2999,260,2021-07-01,COMPLETED,MXN,1713,4374585.07,0.00


  - ## What is the total amount of the completed and failed transactions?

In [5]:
def finding_total_amount_of_terminal_transactions(sdf : DataFrame) -> DataFrame:
    """What is the total amount of completed and failed transactions?
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame with the total amount of terminal transactions aggregated by status and currency code.
        
    See Also
    --------
    `sdf`
    
    Examples
    --------
    >>> sdf.transform(
    ...    with_adequate_format
    ... ).transform(
    ...    computing_total_amount_of_terminal_transactions
    ... ).show()
    +---------+---+--------------------+
    |   status| cc|        total_tx_amt|
    +---------+---+--------------------+
    |COMPLETED|ARS|             10915.0|
    |COMPLETED|TWD|            184790.0|
    |COMPLETED|AED|            212380.0|
    |COMPLETED|USD|          2182832.23|
    |COMPLETED|EUR|        2.58241293E7|
    |COMPLETED|PKR|        1.04163539E8|
    |COMPLETED|CLP|        1.71798798E8|
    |COMPLETED|COP|       4.844836359E9|
    |COMPLETED|JPY|       7.790508917E9|
    |COMPLETED|MXN|1.291928067539999...|
    |   FAILED|TWD|             92330.0|
    |   FAILED|AED|            147808.0|
    |   FAILED|USD|          1110901.17|
    |   FAILED|ARS|           4062502.0|
    |   FAILED|EUR|           7036735.5|
    |   FAILED|CLP|         5.8623761E7|
    |   FAILED|PKR|         6.3533041E7|
    |   FAILED|JPY|        9.00587003E8|
    |   FAILED|COP|       2.148912107E9|
    |   FAILED|MXN| 9.063828382330002E9|
    +---------+---+--------------------+
    
    """
    return sdf.filter( F.col('status').isin(['COMPLETED', 'FAILED']) ).groupBy('status', 'cc').agg(
        F.sum('tx_amt').alias('total_tx_amt')
    ).orderBy('status','total_tx_amt')    


transform(sdf, [
    with_adequate_format,
    # Since this is the total amount I won't omit the excess chargebacks
    finding_total_amount_of_terminal_transactions
]).toPandas()

                                                                                

Unnamed: 0,status,cc,total_tx_amt
0,COMPLETED,ARS,10915.0
1,COMPLETED,TWD,184790.0
2,COMPLETED,AED,212380.0
3,COMPLETED,USD,2182832.23
4,COMPLETED,EUR,25824129.3
5,COMPLETED,PKR,104163539.0
6,COMPLETED,CLP,171798798.0
7,COMPLETED,COP,4844836359.0
8,COMPLETED,JPY,7790508917.0
9,COMPLETED,MXN,12919280675.4


  - ## Compute the percentage of completed transactions by merchant and month.

In [6]:
def computing_transaction_completion_percentage(sdf : DataFrame) -> DataFrame:
    """Computes the percentage of completed transactions by merchant and month.
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame with the (frequency) percentage of completed transactions aggregated by merchant and month.
        
    See Also
    --------
    `sdf`
        
    Examples
    --------
    >>> sdf.transform(
    ...      with_adequate_format
    ... ).transform(
    ...     computing_completed_transactions
    ... )
    +-----------+-------------+--------------------+
    |merchant_id|op_month_date|completed_tx_cnt_per|
    +-----------+-------------+--------------------+
    |          0|   2021-07-01|0.003344638098311...|
    |          0|   2021-08-01|0.003409552812932099|
    |          1|   2021-07-01|0.004667050949680695|
    |          1|   2021-08-01|0.004140360834435155|
    |          2|   2021-07-01| 0.09846413704242103|
    |          2|   2021-08-01| 0.10753197325386649|
    |          3|   2021-07-01|  0.4195941939380095|
    |          3|   2021-08-01|  0.4339967929202303|
    |          4|   2021-07-01|  0.6588401219586622|
    |          4|   2021-08-01|  0.6833754499294737|
    |          5|   2021-07-01| 0.03594910135093989|
    |          5|   2021-08-01| 0.04550878398972769|
    |          6|   2021-07-01| 0.39829061128751586|
    |          6|   2021-08-01|  0.3998209039028428|
    |          7|   2021-07-01|  0.2959017697067839|
    |          7|   2021-08-01|  0.4705738894572795|
    |          8|   2021-07-01|  0.2778922155688623|
    |          8|   2021-08-01| 0.29089263346125077|
    |          9|   2021-07-01|  0.8413678556936439|
    |          9|   2021-08-01|  0.8411236448946838|
    +-----------+-------------+--------------------+
    only showing top 20 rows    
    
    """
    merchant_month_window = Window.partitionBy(
        'merchant_id', 
        'op_month_date'
    ).orderBy(
        'merchant_id', 
        'op_month_date'
    )
    return sdf.select(
        F.col('merchant_id'),
        F.col('op_month_date'),
        (
            F.sum(
                F.when(F.col('status') == 'COMPLETED', F.col('tx_cnt')).otherwise(F.lit(0))
            ).over(merchant_month_window) / F.sum('tx_cnt').over(merchant_month_window)
        ).alias('completed_tx_cnt_per')
    ).distinct().orderBy('merchant_id', 'op_month_date')
    
transform(sdf, [
    with_adequate_format,
    # It was rather unspecified, but should be needed
    # the excess chargebacks can be removed by just
    # uncommenting the following line:
    # without_excessive_chargebacks,
    computing_transaction_completion_percentage
]).toPandas().style.format({
    'completed_tx_cnt_per': '{:.1%}'.format
})

                                                                                

Unnamed: 0,merchant_id,op_month_date,completed_tx_cnt_per
0,0,2021-07-01,0.3%
1,0,2021-08-01,0.3%
2,1,2021-07-01,0.5%
3,1,2021-08-01,0.4%
4,2,2021-07-01,9.8%
5,2,2021-08-01,10.8%
6,3,2021-07-01,42.0%
7,3,2021-08-01,43.4%
8,4,2021-07-01,65.9%
9,4,2021-08-01,68.3%


  - ## Create a flag by merchant and month that marks if the merchant had at least one transaction in each status.

In [7]:
def adding_all_statuses_flag(sdf : DataFrame) -> DataFrame:
    """Creates a flag by merchant and month that marks if the merchant had at least one transaction in each status.
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame with a flag by merchant and month that marks if the merchant had at least one transaction in each status.
        
    See Also
    --------
    `sdf`
    
    Examples
    --------
    >>> sdf.transform(
    ...     with_adequate_format
    ... ).transform(
    ...     adding_all_statuses_flag
    ... ).toPandas()
    +-----------+-------------+-----------------+
    |merchant_id|op_month_date|all_statuses_flag|
    +-----------+-------------+-----------------+
    |          0|   2021-07-01|                0|
    |          0|   2021-08-01|                0|
    |          1|   2021-07-01|                0|
    |          1|   2021-08-01|                0|
    |          2|   2021-07-01|                0|
    |          2|   2021-08-01|                0|
    |          3|   2021-07-01|                0|
    |          3|   2021-08-01|                0|
    |          4|   2021-07-01|                0|
    |          4|   2021-08-01|                0|
    |          5|   2021-07-01|                0|
    |          5|   2021-08-01|                0|
    |          6|   2021-07-01|                0|
    |          6|   2021-08-01|                0|
    |          7|   2021-07-01|                0|
    |          7|   2021-08-01|                0|
    |          8|   2021-07-01|                0|
    |          8|   2021-08-01|                0|
    |          9|   2021-07-01|                0|
    |          9|   2021-08-01|                0|
    +-----------+-------------+-----------------+
    only showing top 20 rows
    """
    merchant_month_window = Window.partitionBy('merchant_id', 'op_month_date').orderBy(
        'merchant_id',
        'op_month_date'
    )
    
    return sdf.select(
        'merchant_id',
        'op_month_date',
        (
            F.size(F.collect_set('status').over(merchant_month_window)) == 
            F.lit(sdf.select('status').distinct().count())
        ).cast('integer').alias('all_statuses_flag')
    ).distinct().orderBy('all_statuses_flag', 'merchant_id', 'op_month_date')
    
    
sdf.transform(
    with_adequate_format
).transform(
    adding_all_statuses_flag
).toPandas()

                                                                                

Unnamed: 0,merchant_id,op_month_date,all_statuses_flag
0,0,2021-07-01,0
1,0,2021-08-01,0
2,1,2021-07-01,0
3,1,2021-08-01,0
4,2,2021-07-01,0
...,...,...,...
517,260,2021-08-01,0
518,11,2021-07-01,1
519,11,2021-08-01,1
520,65,2021-07-01,1


  - ## Through Banxico's API, fetch the exchange rate (Tipo de cambio para solventar obligaciones denominadas en moneda extranjera Fecha de determinación (FIX)). 

In [8]:
def broadcast_bmx_fix(_broadcast : bool = True) -> DataFrame:
    """ Monthly average USD to MXN Banxico FIX rates in a broadcasted DataFrame.

    Since we have the rates since 1991, it's better to use a broadcasted dataframe
    (e. g. instead of a UDF) to reduce time complexity with a space complexity approach.
    
    Params
    ------
    _broadcast : `bool` 
        Should we broadcast this dataframe (`True` by default)?
    
    Returns
    -------
    `pyspark.sql.DataFrame`

    Examples
    --------
    >>> broadcast_bmx_fix(False).show()
    +----------+-------+
    |         t|      r|
    +----------+-------+
    |1991-01-01|    0.0|
    |1992-01-01| 3.0685|
    |1993-01-01|   3.11|
    |1994-01-01| 3.1075|
    |1995-01-01| 5.5133|
    |1996-01-01|  7.476|
    |1997-01-01| 7.8271|
    |1998-01-01| 8.2177|
    |1999-01-01|10.1351|
    |2000-01-01| 9.4878|
    |2001-01-01| 9.7766|
    |2002-01-01| 9.1616|
    |2003-01-01|10.6203|
    |2004-01-01|10.9151|
    |2005-01-01|11.2607|
    |2006-01-01| 10.547|
    |2007-01-01|10.9529|
    |2008-01-01|  10.91|
    |2009-01-01|13.8921|
    |2010-01-01|12.8019|
    +----------+-------+
    only showing top 20 rows

    """
    
    bmx_fix : DataFrame = spark.createDataFrame(
        httpx.get(
            url=os.environ.get('BANXICO_API_URL', 'https://www.banxico.org.mx/SieAPIRest/service/v1/') +
            'series/' + os.environ.get('BANXICO_FIX_SERIES_ID', 'SF17908') + '/datos',
            headers={
                'Accept': 'application/json',
                'Bmx-Token': os.environ.get('BANXICO_API_TOKEN', "a5ff4a8475701e98228af2f8f85fa8e81aa31e0b4ca8adc5d56e56ef711ba683")
            }
        ).json()['bmx']['series'][0]['datos']
    ).select(
        F.to_date('fecha', 'dd/MM/yyyy').alias('t'),
        F.col('dato').cast('double').alias('r')
    ).orderBy('fecha')
    
    return F.broadcast(bmx_fix) if _broadcast else bmx_fix
    
broadcast_bmx_fix(False).toPandas()

                                                                                

Unnamed: 0,t,r
0,1991-01-01,0.00
1,1992-01-01,3.07
2,1993-01-01,3.11
3,1994-01-01,3.11
4,1995-01-01,5.51
...,...,...
374,2017-12-01,19.18
375,2018-12-01,20.11
376,2019-12-01,19.11
377,2020-12-01,19.97


  - ## Transform dollar monetary amounts to pesos (Use the monthly average of exchange rates).

In [41]:
def with_dollars_to_pesos(sdf : DataFrame) -> DataFrame:
    """Transforms dollar monetary amounts to pesos (Using the monthly average of exchange rates).
    
    Parameters
    ----------
    sdf : `pyspark.sql.DataFrame`
        the input Spark Data Frame.
        
    Returns
    -------
    `pyspark.sql.DataFrame`
        A Spark Data Frame whose dollar monetary amounts are transformed to pesos (and also the currency codes).
        
    See Also
    --------
    `sdf`
    
    Examples
    --------
    >>> sdf.transform(
    ...     with_adequate_format
    ... ).transform(
    ...     adding_all_statuses_flag
    ... ).toPandas()
    """
    bmx_fix_bd : DataFrame = broadcast_bmx_fix()
    
    return sdf.join(
        bmx_fix_bd,
        on=(sdf.op_month_date == bmx_fix_bd.t),
        how='left'
    ).withColumn(
        # The currency codes are transformed in order to reflect the MXN to USD change
        'cc',
        F.when(
            F.col('cc') == F.lit('USD'), 
            F.lit('MXN')
        ).otherwise(F.col('cc'))
    ).withColumn(
        'tx_amt',
        F.when(
            F.col('cc') == F.lit('USD'), 
            F.col('tx_amt') * F.col('r')
        ).otherwise(F.col('tx_amt'))
    ).withColumn(
        # The chargeback severity is also another monetary amount by definition
        'cb_amt',
        F.when(
            F.col('cc') == F.lit('USD'), 
            F.col('cb_amt') * F.col('r')
        ).otherwise(F.col('cb_amt'))
    ).drop(*bmx_fix_bd.columns)
    
sdf.transform(
    with_adequate_format
).transform(
    with_dollars_to_pesos
).toPandas()

                                                                                

Unnamed: 0,tx_agg_id,merchant_id,op_month_date,status,cc,tx_cnt,tx_amt,cb_amt
0,1254,28,2021-08-01,CANCELLED,MXN,2,4201.72,0.00
1,1383,45,2021-07-01,COMPLETED,MXN,27988,9009903.00,158047.00
2,1582,71,2021-07-01,COMPLETED,MXN,10717,12530116.10,585092.01
3,1767,95,2021-07-01,FAILED,MXN,1484,466776.08,0.00
4,1962,119,2021-07-01,COMPLETED,MXN,4558,2133147.00,23749.00
...,...,...,...,...,...,...,...,...
1995,1838,103,2021-08-01,EXPIRED,MXN,94,1202226.95,0.00
1996,1850,105,2021-07-01,ERROR,MXN,19,2750.00,0.00
1997,1924,114,2021-07-01,ERROR,COP,41,1752400.00,0.00
1998,1989,122,2021-07-01,ERROR,MXN,6,35396.76,0.00


In [10]:
def load(sdf : DataFrame, target : str) -> Union[None, str]:
    return sdf.toPandas().to_csv(
        path_or_buf=target,
        header=True,
        index=False
    )   
    
lean_submission = transform(sdf=sdf, pipeline=[
    with_adequate_format,
    without_excessive_chargebacks,
    with_dollars_to_pesos
]).orderBy('tx_agg_id')

load(lean_submission, '../data/out/Examen.lean.csv')

lean_submission.toPandas()

                                                                                

Unnamed: 0,tx_agg_id,merchant_id,op_month_date,status,cc,tx_cnt,tx_amt,cb_amt
0,1001,0,2021-07-01,IN_PROGRESS,MXN,3679780,1192891046.00,0.00
1,1002,0,2021-07-01,COMPLETED,MXN,12366,23472915.00,0.00
2,1003,0,2021-07-01,FAILED,MXN,5115,7124189.72,0.00
3,1004,0,2021-08-01,IN_PROGRESS,MXN,2536306,8193985268.00,0.00
4,1005,0,2021-08-01,CANCELLED,MXN,876854,2786.00,0.00
...,...,...,...,...,...,...,...,...
1976,2996,259,2021-08-01,FAILED,MXN,1636,1966402.39,0.00
1977,2997,259,2021-08-01,COMPLETED,MXN,826,991902.66,9154.00
1978,2998,260,2021-08-01,CANCELLED,MXN,1,217369.92,0.00
1979,2999,260,2021-07-01,COMPLETED,MXN,1713,4374585.07,0.00


In [47]:
def suffixing_columns(sdf : DataFrame) -> DataFrame:
    return sdf.select([(
        F.col(column).alias('_' + column)
    ) for column in sdf.columns])

def with_total_amounts_of_terminal_transactions(sdf : DataFrame) -> DataFrame:
    bd : DataFrame = F.broadcast(transform(sdf, [
        finding_total_amount_of_terminal_transactions,
        suffixing_columns
    ]))
    
    return sdf.join(
        bd,
        on=[(
            sdf.status == bd._status
        ), (
            sdf.cc == bd._cc
        )],
        how='left'
    ).select(
        '*',
        F.coalesce(F.col('_total_tx_amt'), F.lit(0.0)).alias('total_terminal_tx_amt')
    ).drop(*bd.columns)
    

def with_transaction_completion_percentages(sdf : DataFrame) -> DataFrame:
    bd : DataFrame =  F.broadcast(transform(sdf, [
        computing_transaction_completion_percentage,
        suffixing_columns
    ]))
    
    return sdf.join(
        bd,
        on=[(
            sdf.merchant_id == bd._merchant_id
        ), (
            sdf.op_month_date == bd._op_month_date 
        )],
        how='left'
    ).select(
        '*',
        F.coalesce(F.col('_completed_tx_cnt_per'), F.lit(0.0)).alias('completed_tx_cnt_per')
    ).drop(*bd.columns)

def with_all_statuses_flag(sdf : DataFrame) -> DataFrame:
    bd : DataFrame = F.broadcast(transform(sdf, [
        adding_all_statuses_flag,
        suffixing_columns
    ]))
    
    return sdf.join(
        bd,
        on=[(
            sdf.merchant_id == bd._merchant_id
        ), (
            sdf.op_month_date == bd._op_month_date 
        )],
        how='left'
    ).select(
        '*',
        F.coalesce(F.col('_all_statuses_flag'), F.lit(0)).alias('all_statuses_flag')
    ).drop(*bd.columns)

full_submission = transform(sdf, [
    # Take the value of the columns and put them in the correct format.
    with_adequate_format,
    # Remove from dataset all the transactions where the chargeback is greater than the amount.
    without_excessive_chargebacks,
    # What is the total amount of the completed and failed transactions?
    with_total_amounts_of_terminal_transactions,
    # Compute the percentage of completed transactions by merchant and month.
    with_transaction_completion_percentages,
    # Create a flag by merchant and month that marks if the merchant had at least one transaction in each status.
    with_all_statuses_flag,
    # Transform dollar monetary amounts to pesos (Use the monthly average of exchange rates).
    with_dollars_to_pesos
]).orderBy('tx_agg_id')

load(full_submission, '../data/out/Examen.full.csv')

full_submission.toPandas()

                                                                                

Unnamed: 0,tx_agg_id,merchant_id,op_month_date,status,cc,tx_cnt,tx_amt,cb_amt,total_terminal_tx_amt,completed_tx_cnt_per,all_statuses_flag
0,1001,0,2021-07-01,IN_PROGRESS,MXN,3679780,1192891046.00,0.00,0.00,0.00,0
1,1002,0,2021-07-01,COMPLETED,MXN,12366,23472915.00,0.00,12919153058.40,0.00,0
2,1003,0,2021-07-01,FAILED,MXN,5115,7124189.72,0.00,9063826205.33,0.00,0
3,1004,0,2021-08-01,IN_PROGRESS,MXN,2536306,8193985268.00,0.00,0.00,0.00,0
4,1005,0,2021-08-01,CANCELLED,MXN,876854,2786.00,0.00,0.00,0.00,0
...,...,...,...,...,...,...,...,...,...,...,...
1976,2996,259,2021-08-01,FAILED,MXN,1636,1966402.39,0.00,9063826205.33,0.33,0
1977,2997,259,2021-08-01,COMPLETED,MXN,826,991902.66,9154.00,12919153058.40,0.33,0
1978,2998,260,2021-08-01,CANCELLED,MXN,1,217369.92,0.00,0.00,0.00,0
1979,2999,260,2021-07-01,COMPLETED,MXN,1713,4374585.07,0.00,12919153058.40,1.00,0
