In [2]:
import apache_beam as beam
from apache_beam.metrics import Metrics

class TransactionMetricsDoFn(beam.DoFn):
    def __init__(self):
        # Definir métricas
        self.total_amount_eur = Metrics.counter(self.__class__, 'total_amount_eur')
        self.total_amount_gbp = Metrics.counter(self.__class__, 'total_amount_gbp')
        self.total_transactions = Metrics.counter(self.__class__, 'total_transactions')

    def process(self, element):
        # Incrementar métricas según la moneda
        if element['currency'] == 'EUR':
            self.total_amount_eur.inc(int(element['amount']))
        elif element['currency'] == 'GBP':
            self.total_amount_gbp.inc(int(element['amount']))
        
        # Incrementar métrica de transacciones
        self.total_transactions.inc()
        
        yield element

# Define the pipeline
p = beam.Pipeline()

# Read from data.txt and apply the custom DoFn
transactions = (
    p
    | "Read from data.txt" >> beam.io.ReadFromText('data.txt')
    | "Convert to Dict" >> beam.Map(lambda line: {
        "id": line.split(',')[0],
        "name": line.split(',')[1],
        "amount": float(line.split(',')[2]),
        "currency": line.split(',')[3],
        "city": line.split(',')[4].strip()
    })
    | "Apply Custom Metrics" >> beam.ParDo(TransactionMetricsDoFn())
    | "Write Results" >> beam.io.WriteToText('outputmetrics.txt')
)

# Run the pipeline
result = p.run()
result.wait_until_finish()

# Print the metrics
metrics = result.metrics().query()
for metric in metrics['counters']:
    print(metric.key.metric.name, metric.result)



total_amount_eur 380
total_transactions 11
total_amount_gbp 150
