# Cloud Function to Beam Pipeline

In this example we'll be converting a series of Cloud Functions to an Apache Beam pipeline.

### Background

Your team receives invoice data from your ordering system in JSON format. Currently, the data is processed by a series of Cloud Functions.


The JSON object contains a list of invoices and each invoice has the following structure:
```json
{
  "invoice_id": int,
  "customer_id": int,
  "line_items": [
    {
      "line_no": int,
      "product_name": str,
      "quantity": int,
      "price_ea": float
    }
  ]
}
```

### Create the Sample Data

The cell below contains a function that generates sample data, let's run it to generate some records for us to test our pipeline.

In [19]:
import random
import json

def generate_sample_data(n_records: int = 100, n_customers: int = 10) -> list[str]:
    """
    params:
        n_records (int): The number of sample records to create. Default = 100
        n_customers (int): The number of unique customer_ids to include. Default = 10
    
    out:
        sample invoices (list[str]): a list of sample records formatted as JSON strings
    """
    # products and their corresponding prices
    products = ['Scissors', 'Tape', 'Printer Paper', 'Box', 'Envelope']
    prices = [9.99, 0.99, 7.99, 2.49, 1.49]
    
    # initialize variables
    invoice_no = 0
    invoices = []
    
    while len(invoices) < n_records:
        # generate a random customer ID
        customer_id = random.randint(1, n_customers)
        
        # each product gets its own line item on the invoice
        line_items = []
        for price, product in zip(prices, products):
            # get a random quantity for each product
            quantity = random.randint(0, 5)
            # if quantity is zero, skip that line
            if quantity <= 0:
                continue
            
            line_items.append(
                {'line_no': len(line_items) + 1, 'product_name': product, 'quantity': quantity, 'price_ea': price}
            )
        
        if line_items:
            invoice_no += 1
            invoices.append(
                json.dumps({'invoice_id': invoice_no, 'customer_id': customer_id, 'line_items': line_items}, indent=2)
            )
    
    return invoices

sample_data = generate_sample_data()

### Review the Data

Let's check out the sample data. Run the cell below to see what one record of the sample data looks like.

In [2]:
print(sample_data[0])

{
  "invoice_id": 1,
  "customer_id": 3,
  "line_items": [
    {
      "line_no": 1,
      "product_name": "Scissors",
      "quantity": 3,
      "price_ea": 9.99
    },
    {
      "line_no": 2,
      "product_name": "Tape",
      "quantity": 4,
      "price_ea": 0.99
    },
    {
      "line_no": 3,
      "product_name": "Printer Paper",
      "quantity": 5,
      "price_ea": 7.99
    },
    {
      "line_no": 4,
      "product_name": "Box",
      "quantity": 1,
      "price_ea": 2.49
    },
    {
      "line_no": 5,
      "product_name": "Envelope",
      "quantity": 4,
      "price_ea": 1.49
    }
  ]
}


### Cloud Functions

Your company decided to use a series of Cloud Functions as its ETL solution because there are multiple stages and each stage's output needs to be used by multiple downstream stages. Instead of creating one massive pipeline, to make it easier to maintain and update you decided to break each stage out into its own Cloud Function.

In [None]:
# Cloud Function 1
def get_total_by_invoice(invoice_list: list):
    invoice_totals = []
    for invoice_json in invoice_list:
        invoice = json.loads(invoice_json)
        line_items = invoice.get('line_items', [])
        
        if not line_items:
            continue
        
        running_total = 0.00
        for line_item in line_items:
            quantity = line_item.get('quantity', 0)
            price_ea = line_item.get('price_ea', 0.00)
            running_total += quantity * price_ea
    
        invoice_totals.append(
            json.dumps({'invoice_id': element.get('invoice_id', None), 'invoice_total': round(running_total, 2)})
        )
    
    return invoice_totals
            
# Cloud Function 2
def get_total_by_customer(invoice_list: list):
    customer_totals = {}
    for invoice_json in invoice_list:
        invoice = json.loads(invoice_json)
        customer_id = invoice.get('customer_id', -1)
        line_items = invoice.get('line_items', [])
        
        if not (line_items and customer_id >= 0):
            continue
        
        running_total = 0.00
        for line_item in line_items:
            quantity = line_item.get('quantity', 0)
            price_ea = line_item.get('price_ea', 0.00)
            running_total += quantity * price_ea
    
        customer_total = customer_totals.get(customer_id, 0.00)
        customer_totals[customer_id] = customer_total + running_total
    
    return customer_totals

In [10]:
import apache_beam as beam
from apache_beam import Create, Map, GroupByKey, FlatMap
from apache_beam.transforms.util import WithKeys

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [11]:
p = beam.Pipeline(InteractiveRunner())

invoices = (p | 'Create Invoice PColl' >> Create(sample_data)
              | 'Convert JSON to PColl' >> Map(json.loads))

ib.show(invoices)

In [12]:
def get_invoice_totals(element):
    line_items = element.get('line_items', [])
    
    running_total = 0.00
    for line_item in line_items:
        quantity = line_item.get('quantity', 0)
        price_ea = line_item.get('price_ea', 0.00)
        running_total += quantity * price_ea
    return {'invoice_id': element.get('invoice_id', None), 'invoice_total': round(running_total, 2)}

invoice_totals = invoices | 'Get Invoice Total Amount' >> Map(get_invoice_totals)

ib.show(invoice_totals)

In [13]:
def get_customer_totals(element):
    line_items = element.get('line_items', [])
    customer_id = element.get('customer_id', None)
    
    running_total = 0.00
    for line_item in line_items:
        quantity = line_item.get('quantity', 0)
        price_ea = line_item.get('price_ea', 0.00)
        running_total += quantity * price_ea
    
    return [customer_id, round(running_total, 2)]

customer_invoice_totals = (invoices | 'Get Total For Each Invoice' >> Map(get_customer_totals)
                                    | 'Group By Customer ID' >> GroupByKey())

ib.show(customer_invoice_totals)

# # TODO: Get the total amount spent per customer

# total_per_customer = customer_invoice_totals | 'Get Total Spent per Customer' >> # DoSomething

# ib.show(total_per_customer)

In [18]:
products = invoices | Map(lambda x: x.get('line_items', []))

ib.show(products)