<a href="https://colab.research.google.com/github/pginand/prophet/blob/master/ApacheBeam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Apache Beam
Apache Beam is a framework that gives us the instruments to transform, process, aggregate and manipulate data based on user requirments.  It is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream processing. It is named after the ability to handle both batch ("b") and ("eam") streaming data. Beam Pipelines are defined using one of the provided SDKs and executed in one of the Beam’s supported runners including Apache Apex, Apache Flink, Apache Gearpump, Apache Samza, Apache Spark, and Google Cloud Dataflow


**Prerequisite**

Google Colab is similar to a VM with python runtime. Pip install apache beam in a runtime enviroment that is compatiable with python 3.5 or greater. Once beam is installed, create a directory and upload the data

In [0]:
# install apache beam
pip install --quiet apache-beam

In [0]:
#Create a local directory to store output data
!{'mkdir -p beam_output'}

In [3]:
# upload file 
from google.colab import files
upload = files.upload()

In [4]:
# view content
!cat df.csv

149633CM,Marco,10,Accounts,1/1/2019
212539MU,Rebekah,10,Accounts,1/1/2019
231555ZZ,Itoe,10,Accounts,1/1/2019
503996WI,Edouard,10,Accounts,1/1/2019
704275DC,Kyle,10,Accounts,1/1/2019
957149WC,Kyle,10,Accounts,1/1/2019
241316NX,Kumiko,10,Accounts,1/1/2019
796656IE,Gaston,10,Accounts,1/1/2019
331593PS,Beryl,20,HR,1/1/2019
560447WH,Olga,20,HR,1/1/2019
222997TJ,Leslie,20,HR,1/1/2019
171752SY,Mindy,20,HR,1/1/2019
153636AS,Vicky,20,HR,1/1/2019
745411HT,Richard,20,HR,1/1/2019
298464HN,Kirk,20,HR,1/1/2019
783950BW,Kaori,20,HR,1/1/2019
892691AR,Beryl,20,HR,1/1/2019
245668UZ,Oscar,20,HR,1/1/2019
231206QD,Kumiko,30,Finance,1/1/2019
357919KT,Wendy,30,Finance,1/1/2019
472418ZG,Cristobal,30,Finance,1/1/2019
442292OI,Erika,30,Finance,1/1/2019
503647MN,Sebastien,30,Finance,1/1/2019
245319LD,Valerie,30,Finance,1/1/2019
818776XR,Dolly,30,Finance,1/1/2019
259229ZU,Emily,30,Finance,1/1/2019
349360YC,Kaori,30,Finance,1/1/2019
951594MT,Hitomi,30,Finance,1/1/2019
149633CM,Marco,10,

# Beam Main Components
A typical Beam is broken down into 5 major parts: 
    
    1. Create a pipeline by instating Pipeline class
    2. Read in a file from a source which creates the program's first Pcollection
    3. Ptransforms based on customer requirments
    4. Write a Pcollection to a source
    5. Run the created Pipeline

### Step 1
Instantiate an Apache Beam Pipeline class

**Pipeline:** A Pipeline encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a Pipeline. When you create the Pipeline, you must also specify the execution options that tell the Pipeline where and how to run.


In [0]:
import apache_beam as beam

#Creating a pipeline by instantiate Pipeline class. Need to name the variable as well
P1 = beam.Pipeline()

# user define function to help process data
def SplitComma(element):
    return element.split(',')

def FilterRecord(record):
  return record[3] == 'Accounts'

# Step 2
Read in the file to create a PCollection and perform transformations known as PTransform

**PCollection**: A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial PCollection by reading data from an external data source, but you can also create a PCollection from in-memory data within your driver program. From there, PCollections are the inputs and outputs for each step in your pipeline and are immutable. 

**PTransform**: A PTransform represents a data processing operation, or a step, in your pipeline. Every PTransform takes one or more PCollection objects as input, performs a processing function that you provide on the elements of that PCollection, and produces zero or more output PCollection objects.

In [6]:
# Inital PCollection reading the data source
name_count = (  
   P1
    | 'Read lines' >> beam.io.ReadFromText('df.csv')
    | 'Split row' >> beam.Map(SplitComma)
    | 'Filter ' >>beam.Filter(FilterRecord)
    | 'Pair each employee ' >> beam.Map(lambda record: (record[1], 4))
    | 'Aggregate sum ' >> beam.CombinePerKey(sum)
    | 'Write results' >> beam.io.WriteToText('beam_output/output_file')
    
)




# Step 3
Run the pipeline

In [7]:
P1.run()

# Sample the first 20 results, remember there are no ordering guarantees.
!{('head -n 20 beam_output/output_file-00000-of-00001')}

('Marco', 124)
('Rebekah', 124)
('Itoe', 124)
('Edouard', 124)
('Kyle', 248)
('Kumiko', 124)
('Gaston', 124)
('Ayumi', 120)
