In [None]:
import apache_beam as beam

# Base Form

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  (p | beam.Create(range(1, 20))
     | beam.Filter(lambda num: num % 2 == 0)
     | beam.Map(print))

# Custom Options

In [None]:
from apache_beam.options.pipeline_options import PipelineOptions

beam_options = PipelineOptions()

In [None]:
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

# Adding name to steps in the pipeline

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  (p | "Create set of numbers from 1 to 20" >> beam.Create(range(1, 20))
     | "Step 2" >> beam.Filter(lambda num: num % 2 == 0)
     | "Step 3" >>beam.Map(print))

# Filters

## Filtering with a function

In [None]:
import apache_beam as beam

def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(is_perennial)
      | beam.Map(print))


In [None]:
## Filtering with a lambda function

In [None]:
import apache_beam as beam

def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(lambda item: item["duration"] == "perennial")
      | beam.Map(print))

In [None]:
## Filtering with multiple arguments

In [None]:
import apache_beam as beam

def has_duration(plant, duration):
  return plant['duration'] == duration

with beam.Pipeline() as p:
  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(has_duration, "biennial")
      | beam.Map(print))

## Filtering with side inputs as singletons

In [None]:

# If the PCollection has a single value, such as the average from another computation,
# passing the PCollection as a singleton accesses that value.
# In this example, we pass a PCollection the value perennial as a singleton. 
# We then use that value to filter out perennials.

import apache_beam as beam

with beam.Pipeline() as p:
  perennial = p | 'Perennial' >> beam.Create(['perennial'])

  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant,
          duration: plant['duration'] == duration,
          duration=beam.pvalue.AsSingleton(perennial),
      )
      | beam.Map(print))

In [None]:
# Filtering with side input as iterators

In [None]:
# If the PCollection has multiple values, pass the PCollection as an iterator.
# This accesses elements lazily as they are needed, so it is possible to iterate
# over large PCollections that won’t fit into memory.

import apache_beam as beam

with beam.Pipeline() as p:
  valid_durations = p | 'Valid durations' >> beam.Create([
      'annual',
      'biennial',
      'perennial',
  ])

  valid_plants = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'
          },
      ])
      | 'Filter valid plants' >> beam.Filter(
          lambda plant,
          valid_durations: plant['duration'] in valid_durations,
          valid_durations=beam.pvalue.AsIter(valid_durations),
      )
      | beam.Map(print))

## Filtering with side inputs as dictionaries

In [None]:
# If a PCollection is small enough to fit into memory, then that PCollection
# can be passed as a dictionary. Each element must be a (key, value) pair.
# Note that all the elements of the PCollection must fit into memory for this. 
# If the PCollection won’t fit into memory, use beam.pvalue.AsIter(pcollection) instead.

import apache_beam as beam

with beam.Pipeline() as p:
  keep_duration = p | 'Duration filters' >> beam.Create([
      ('annual', False),
      ('biennial', False),
      ('perennial', True),
  ])

  perennials = (
      p | 'Gardening plants' >> beam.Create([
          {
              'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'
          },
          {
              'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'
          },
          {
              'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'
          },
          {
              'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'
          },
          {
              'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'
          },
      ])
      | 'Filter plants by duration' >> beam.Filter(
          lambda plant,
          keep_duration: keep_duration[plant['duration']],
          keep_duration = beam.pvalue.AsDict(keep_duration),
      )
      | beam.Map(print))

In [None]:
import apache_beam as beam
import string


def has_duration(plant):
  return ("A" in plant)

with beam.Pipeline() as p:
    str = "To be, or not to be: that is the question: Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune, Or to take arms against a sea of troubles, And by opposing end them. To die: to sleep"

    input = (p | beam.Create([str.split()])
             | beam.Filter(has_duration)
            | beam.Map(print))
    

# Aggregations

## Count

In [None]:
import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:
  (p | beam.Create(range(1, 14))
   # beam.combiners.Count.Globally() to return the count of numbers from `PCollection`.
   | beam.combiners.Count.Globally()
   | "Print" >> Output(prefix='Input has elements:'))

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
      | 'Count all elements' >> beam.combiners.Count.Globally()
      | beam.Map(print))

## Count by Key

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  total_elements = (
      p | 'Create plants' >> beam.Create([
          ('spring', '🍓'),
          ('spring', '🥕'),
          ('summer', '🥕'),
          ('fall', '🥕'),
          ('spring', '🍆'),
          ('winter', '🍆'),
          ('spring', '🍅'),
          ('summer', '🍅'),
          ('fall', '🍅'),
          ('summer', '🌽'),])
      | 'Count all elements' >> beam.combiners.Count.PerKey()
      | beam.Map(print))

In [None]:
import apache_beam as beam

class SplitWords(beam.DoFn):
  def __init__(self, delimiter=' '):
    self.delimiter = delimiter

  def process(self, text):
    for word in text.split(self.delimiter):
      yield word


with beam.Pipeline() as p:
  total_unique_elements = (
      p | beam.Create(["To be, or not to be: that is the question: Whether 'tis nobler in the mind to suffer, the slings and arrows of outrageous fortune, or to take arms against a sea of troubles, and by opposing end them. To die: to sleep"])
  | beam.ParDo(SplitWords()) | beam.combiners.Count.PerElement()
  | Output(prefix='PCollection filtered value: '))


In [None]:
## Count Unique keys

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  total_unique_elements = (
      p | 'Create produce' >> beam.Create(['🍓', '🥕', '🥕', '🥕', '🍆', '🍆', '🍅', '🍅', '🍅', '🌽'])
      | 'Count unique elements' >> beam.combiners.Count.PerElement()
      | beam.Map(print))


## Sum

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  total = (
      p | 'Create numbers' >> beam.Create([3, 4, 1, 2])
      | 'Sum values' >> beam.CombineGlobally(sum)
      | beam.Map(print))

## Sum per each Key

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  totals_per_key = (
      p | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),])
      | 'Sum values per key' >> beam.CombinePerKey(sum)
      | beam.Map(print))


## Mean

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  mean_per_key = (
      p | 'Create produce' >> beam.Create([
          (1, 36),(2, 91),(3, 33),(3, 11),(4, 67),])
      | 'Mean' >> beam.combiners.Mean.PerKey()
      | beam.Map(print))

## Min

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  min_element = (
      p | 'Create numbers' >> beam.Create([3, 44,23,6767, 100, 2])
      | 'Get min value' >> beam.CombineGlobally(lambda elements: min(elements or [-1]))
      | beam.Map(print))

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  elements_with_min_value_per_key = (
      p | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),])
      | 'Get min value per key' >> beam.CombinePerKey(min)
      | beam.Map(print))


In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  elements_with_min_value_per_key = (
      p | beam.Create([(1, 360),(1, 91),(3, 33),(3, 11),(2, 67),])
      | 'Get min value per key' >> beam.CombinePerKey(min)
      | 'Top 2' >> beam.combiners.Top.Smallest(2)
      | beam.Map(print))


## Max

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  max_element = (
      p | 'Create numbers' >> beam.Create([3, 4, 1, 2,9990])
      | 'Get max value' >> beam.CombineGlobally(lambda elements: max(elements or [None]))
      | beam.Map(print))

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
  elements_with_max_value_per_key = (
      p | 'Create produce' >> beam.Create([
          ('🥕', 3),
          ('🥕', 2),
          ('🍆', 1),
          ('🍅', 4),
          ('🍅', 5),
          ('🍅', 3),])
      | 'Get max value per key' >> beam.CombinePerKey(max)
      | beam.Map(print))


In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    (p | beam.Create([
        (1, 36),
        (2, 91),
        (3, 33),
        (3, 11),
        (4, 67),
        (2, 679),
    ]) 
       # | beam.CombinePerKey(max)
       | beam.combiners.Top.Largest(2)
       | beam.Map(print) )
# (1, 36)
# (2, 679)
# (3, 33)
# (4, 67)

# WithKey Example

In [14]:
# WithKeys takes a PCollection<V> and produces a PCollection<KV<K, V>> by associating each
# input element with a key.

import apache_beam as beam

with beam.Pipeline() as p:
  (p | beam.Create(['apple', 'banana', 'cherry', 'durian', 'guava', 'melon', 'banana'])
     | beam.WithKeys(lambda word: word)
     | beam.Map(print))

('apple', 'apple')
('banana', 'banana')
('cherry', 'cherry')
('durian', 'durian')
('guava', 'guava')
('melon', 'melon')
('banana', 'banana')


# Challenge

In [None]:
import apache_beam as beam

def parse_csv_line(line):
    # Split the line by comma (adjust delimiter if needed)
    fields = line.split(',')
    # Assuming headers exist in the first line, skip the header row
    # if line.startswith('VendorID,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount'):
    if fields[16] == '"total_amount"':
        yield None
    else:        
          # Extract data from each field based on your schema
        yield float(fields[16])  # Assuming field2 is numerical
              # ... extract other fields based on their data types
          # }

with beam.Pipeline() as pipeline:
  # Read the CSV file line by line
    lines = ( pipeline | beam.io.ReadFromText('/home/sanvir/Downloads/sample1000.csv'))

    parsed_lines = (lines | beam.ParDo(parse_csv_line))
    # Apply a ParDo transform to parse each line
    above = (parsed_lines
            | beam.Filter(lambda cost: cost and cost >= 15)
            | "sum above" >> beam.CombineGlobally(sum) 
            | beam.WithKeys(lambda word: "above")
            | "print above" >> beam.Map(print))
    
    below = (parsed_lines
            | beam.Filter(lambda cost: cost and cost < 15)
            | "sum below" >> beam.CombineGlobally(sum) 
            | beam.WithKeys(lambda word: "below")
            | beam.Map(print))

# Core Beam transforms
Beam provides the following core transforms, each of which represents a different processing paradigm:

- ParDo
- GroupByKey
- CoGroupByKey
- Combine
- Flatten
- Partition

# ParDo

## one to one

In [None]:
import apache_beam as beam

class MultiplyByTenDoFn(beam.DoFn):
    def process(self, element):
        yield element * 10


with beam.Pipeline() as p:
  (p | beam.Create([1, 2, 3, 4, 5])
    # Transform simple DoFn operation
     | beam.ParDo(MultiplyByTenDoFn())
     | beam.Map(print))

## one to many

In [1]:
import apache_beam as beam

class BreakIntoWordsDoFn(beam.DoFn):
    def process(self, element):
        return element.split()

with beam.Pipeline() as p:
  (p | beam.Create(['Hello Beam', 'It is awesome'])
     | beam.ParDo(BreakIntoWordsDoFn())
     | beam.Map(print))

Hello
Beam
It
is
awesome


## Map elements

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    input = p | 'Create words' >> beam.Create(['Hello', 'World', 'How', 'are', 'you'])

    uppercase_words = input | 'Convert to uppercase' >> beam.Map(lambda word: word.upper())

    (uppercase_words | beam.Map(print))



## FlatMap elements

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    words_with_counts = p | 'Create words with counts' >> beam.Create([
    ('Hello', 1), ('World', 2), ('How', 3), ('are', 4), ('you', 5)])

    split_words = words_with_counts | 'Split words' >> beam.FlatMap(
    lambda word_with_count: [word_with_count[0]] * word_with_count[1])
    
    (split_words | beam.Map(print))


## GroupByKey

In [16]:
import apache_beam as beam

with beam.Pipeline() as p:

    input = p | 'Fruits' >> beam.Create([
        ("banana", 2),
        ("apple", 4),
        ("lemon", 3),
        ("Apple", 1),
        ("Banana", 5),
        ("Lemon", 12),
        ("Datyra", 21),
        ("datyra", 22),
        ("sanvir", 20)
    ])
    
    (input 
     | beam.Map(lambda data: (data[0].lower(), data[1]) )
     | beam.GroupByKey()
     | beam.Map(print))

('banana', [2, 5])
('apple', [4, 1])
('lemon', [3, 12])
('datyra', [21, 22])
('sanvir', [20])


## CoGroupByKey

In [None]:
# CoGroupByKey is a transformation used in Apache Beam for performing joins on multiple datasets

import apache_beam as beam

# // Mock data
emails_list = [
    ('amy', 'amy@example.com'),
    ('carl', 'carl@example.com'),
    ('julia', 'julia@example.com'),
    ('carl', 'carl@email.com'),
]
phones_list = [
    ('amy', '111-222-3333'),
    ('james', '222-333-4444'),
    ('amy', '333-444-5555'),
    ('carl', '444-555-6666'),
]

def join_info(name_info):
  (name, info) = name_info
  return '%s; %s; %s' %\
      (name, sorted(info['emails']), sorted(info['phones']))

# // Creating PCollections
with beam.Pipeline() as p:
    emails = p | 'CreateEmails' >> beam.Create(emails_list)
    phones = p | 'CreatePhones' >> beam.Create(phones_list)

    # // Apply CoGroupByKey
    results = ({'emails': emails, 'phones': phones} | beam.CoGroupByKey())

    # contact_lines = results | beam.Map(join_info)
    
    (results | beam.Map(print))


In [None]:
## Tags for multiple outputs (withOutputs)

In [None]:
import apache_beam as beam

# To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the
# expected tags for the outputs. with_outputs() returns a DoOutputsTuple object. Tags specified in
# with_outputs are attributes on the returned DoOutputsTuple object. The tags give access to the
# corresponding output PCollections.

# The result is also iterable, ordered in the same order that the tags were passed to with_outputs(),
# the main tag (if specified) first.

class ProcessWords(beam.DoFn):
    def process(self, element, cutoff_length, marker):
        if len(element) <= cutoff_length:
            # Emit this short word to the main output.
            yield element
        else:
            # Emit this word's long length to the 'above_cutoff_lengths' output.
            yield beam.pvalue.TaggedOutput('above_cutoff_lengths', len(element))
        if element.startswith(marker):
            # Emit this word to a different output with the 'marked strings' tag.
            yield beam.pvalue.TaggedOutput('marked strings', element)

with beam.Pipeline() as p:   
    words = (p | beam.Create(["em","elements","multiple","output","xico"]))
    below, above, marked = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
                                .with_outputs('above_cutoff_lengths','marked strings',main='below_cutoff_strings'))

    (above | 'Above'>> beam.Map(print))
    
    (below | 'Below'>> beam.Map(print))
    
    (marked | 'Marked'>> beam.Map(print))


In [None]:
import apache_beam as beam

# Producing multiple outputs is also available in Map and FlatMap.
# Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.
def even_odd(x):
  yield beam.pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
  if x % 10 == 0:
    yield x

with beam.Pipeline() as p: 
    results = ( p |  beam.Create(range(10))
        | beam.FlatMap(even_odd).with_outputs() )

    evens = results.even
    odds = results.odd
    tens = results[None]  # the undeclared main output

    (evens | 'Above'>> beam.Map(print))
    
    # (odds | 'Below'>> beam.Map(print))
#     
    # (tens | 'Marked'>> beam.Map(print))


In [None]:
## Branching PCollection

In [None]:
# Example how can we move the logic to an function
func applyTransform(s beam.Scope, input beam.PCollection) (beam.PCollection, beam.PCollection) {
    return beam.ParDo2(s, func(element string, upperCaseWords, lowerCaseWords func(string)) {
        if element==strings.Title(element) {
                upperCaseWords(element)
                return
            }
            lowerCaseWords(element)
        }, input)
}

# Sample of Branching
reversed = input | reverseString(...)
toUpper = input | toUpperString(...)

## Combine

In [None]:
import apache_beam as beam

# Is a Beam transform for combining collections of elements or values in your data. 
# Combine has variants that work on entire PCollections, and some that combine the values for each key in PCollections of key/value pairs.

input = [1, 10, 100, 1000]

def bounded_sum(values, bound=500):
  return min(sum(values), bound)

with beam.Pipeline() as p: 
    small_sum = input | beam.CombineGlobally(bounded_sum)  # [500]
    large_sum = input | beam.CombineGlobally(bounded_sum, bound=5000)
    
    (small_sum | beam.Map(print))

In [None]:
import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

def concat(strings):
    total = ""

    for word in strings:
        total += word

    return total

with beam.Pipeline() as p:
  (p | beam.Create(["quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"])
     | beam.CombineGlobally(concat)
     | Output())



## CombineFn

In [None]:
# Is a class that specifies how to combine a collection of elements into a single output value. 
# It's a powerful tool for performing aggregations and summarizing data within your Beam pipelines.


# It defines three methods:

# createAccumulator(): Creates a new accumulator to hold intermediate results. This is called for each partition (batch) of data.
# addInput(AccumT accumulator, InputT input): Combines a single input element with the current accumulator, updating the accumulator to reflect the new information.
# mergeAccumulators(Iterable<AccumT> accumulators): Merges multiple accumulators into a single accumulator. This is useful when processing data in parallel.
# (Optional) extractOutput(AccumT accumulator): Transforms the final accumulator into the desired output format.

import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

class AverageFn(beam.CombineFn):

    def create_accumulator(self):
        return 0.0, 0

    def add_input(self, accumulator, element):
        (sum, count) = accumulator
        return sum + element, count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        (sum, count) = accumulator
        return sum / count if count else float('NaN')

with beam.Pipeline() as p:
  (p | beam.Create([10, 20, 50, 70, 90, 90000])
     | beam.CombineGlobally(AverageFn())
     | Output())

## Combine-per-key

In [None]:
# It defines three methods:

#     createAccumulator(): Creates a new accumulator to hold intermediate results. This is called for each partition (batch) of data.
#     addInput(AccumT accumulator, InputT input): Combines a single input element with the current accumulator, updating the accumulator to reflect the new information.
#     mergeAccumulators(Iterable<AccumT> accumulators): Merges multiple accumulators into a single accumulator. This is useful when processing data in parallel.
#     (Optional) extractOutput(AccumT accumulator): Transforms the final accumulator into the desired output format.

import apache_beam as beam

with beam.Pipeline() as p:
    input_data = [('a', 1), ('b', 2), ('a', 3), ('c', 4), ('b', 5)]
    input_pcoll = p | beam.Create(input_data)
    output_pcoll = input_pcoll | beam.CombinePerKey(sum)

    (output_pcoll | beam.Map(print))

In [2]:
import apache_beam as beam

class ConcatString(beam.CombineFn):
    def create_accumulator(self):
        return ""

    def add_input(self, accumulator, input):
        return accumulator + "," + input

    def merge_accumulators(self, accumulators):
        return ''.join(accumulators)

    def extract_output(self, accumulator):
        return accumulator

with beam.Pipeline() as p:    
    input = (p | 'Create Cities To Time KV' >> beam.Create([
             ('a', 'apple'),
             ('o', 'orange'),
             ('a', 'avocado'),
             ('l', 'lemon'),
             ('l', 'limes')])
    )

    output = input | 'Combine Per Key' >> beam.CombinePerKey(ConcatString())

    (output | beam.Map(print) )


('a', ',apple,avocado')
('o', ',orange')
('l', ',lemon,limes')


# Composite Example

In [None]:
# More than one ParDo, Combine, GroupByKey, or even other composite transforms.
# These transforms are called composite transforms


In [None]:
import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

class ExtractAndMultiplyNumbers(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                # First operation
                | beam.FlatMap(lambda line: map(int, line.split(',')))
                # Second operation
                | beam.Map(lambda num: num * 10)
                )

with beam.Pipeline() as p:
  (p | beam.Create(['1,2,3,4,5', '6,7,8,9,10']) \
     | ExtractAndMultiplyNumbers() \
     | Output())

# Flatten

In [1]:
# Flatten is a Beam transform for PCollection objects that store the same data type.
# Flatten merges multiple PCollection objects into a single logical PCollection.

import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:
  # List of elements start with a
  wordsStartingWithA = p | 'Words starting with A' >> beam.Create(['apple', 'ant', 'arrow'])

  # List of elements start with b
  wordsStartingWithB = p | 'Words starting with B' >> beam.Create(['ball','book', 'bow'])

  test = p | 'Test' >> beam.Create(range(20))
    
  # Accept two PCollection data types are the same combines and returns one PCollection
  ((wordsStartingWithA, wordsStartingWithB, test) | beam.Flatten() | Output())

apple
ant
arrow
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ball
book
bow


# Pertition

In [None]:
# Partition is a Beam transform for PCollection objects that store the same data type. 
# Partition splits a single PCollection into a fixed number of smaller collections.
import apache_beam as beam

def partition_fn(number, num_partitions):
    if number > 100:
        return 0
    else:
        return 1

with beam.Pipeline() as p:
  results = (p | beam.Create([1, 2, 3, 4, 5, 100, 110, 150, 250])
        # Accepts PCollection and returns the PCollection array
         | beam.Partition(partition_fn, 2))

  results[0] | 'Log numbers > 100' >> beam.Map(print)
  results[1] | 'Log numbers <= 100' >> beam.Map(print)

# Side inputs

In [17]:
# You can provide additional inputs to a ParDo transform in the form of side inputs.
import apache_beam as beam

class Person:
    def __init__(self, name, city, country=''):
        self.name = name
        self.city = city
        self.country = country

    def __str__(self):
        return 'Person[' + self.name + ',' + self.city + ',' + self.country + ']'


class EnrichCountryDoFn(beam.DoFn):
    # Get city from cities_to_countries and set person
    def process(self, element, cities_to_countries):
        yield Person(element.name, element.city,
                     cities_to_countries[element.city])

with beam.Pipeline() as p:
  # List of elements
  cities_to_countries = {
        'Beijing': 'China',
        'London': 'United Kingdom',
        'San Francisco': 'United States',
        'Singapore': 'Singapore',
        'Sydney': 'Australia'
    }

  persons = [
        Person('Henry', 'Singapore'),
        Person('Jane', 'San Francisco'),
        Person('Lee', 'Beijing'),
        Person('John', 'Sydney'),
        Person('Alfred', 'London')
    ]

  (p | beam.Create(persons)
     | beam.ParDo(EnrichCountryDoFn(), cities_to_countries)
     | beam.Map(print) )

Person[Henry,Singapore,Singapore]
Person[Jane,San Francisco,United States]
Person[Lee,Beijing,China]
Person[John,Sydney,Australia]
Person[Alfred,London,United Kingdom]


# Schemas for programming language types

In [4]:
import typing

class Transaction(typing.NamedTuple):
  bank: str
  purchase_amount: float

class ShippingAddress(typing.NamedTuple):
  street_address: str
  city: str
  state: typing.Optional[str]
  country: str
  postal_code: str

class Purchase(typing.NamedTuple):
  user_id: str  # The id of the user who made the purchase.
  item_id: int  # The identifier of the item that was purchased.
  shipping_address: ShippingAddress  # The shipping address, a nested type.
  cost_cents: int  # The cost of the item
  transactions: typing.Sequence[Transaction]  # The transactions that paid for this purchase (a list, since the purchase might be spread out over multiple credit cards).

# Challenges

In [None]:
import apache_beam as beam

def partition_fn(word, num_partitions):
    if word.isupper():  
      return 0
    elif word[0].isupper():
      return 1
    else:
      return 2    

with beam.Pipeline() as p:
    parts = ( p | beam.io.ReadFromText('/home/sanvir/Desktop/kinglear.txt')
            | beam.combiners.Sample.FixedSizeGlobally(1)
            | beam.FlatMap(lambda line: line)
            | beam.FlatMap(lambda sentence: sentence.split()))

    result = ( parts | beam.Partition(partition_fn,3) )  
  
    allLetterUpperCase = result[0] 
    firstLetterUpperCase = result[1]
    allLetterLowerCase = result[2]
    
    (allLetterUpperCase | "Lower" >> beam.Map(print))
    
    (firstLetterUpperCase | "First" >> beam.Map(print))
    
    (allLetterLowerCase | "Upper" >> beam.Map(print))


# Windowing

## Example GlobalWindows

In [None]:
import apache_beam as beam
from apache_beam import window

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:
  (p | beam.Create(['Hello Beam','It`s windowing'])
     | 'window' >>  beam.WindowInto(window.GlobalWindows())
     | 'Log words' >> Output())

## Windowing X seconds

In [None]:
import apache_beam as beam
from apache_beam import window

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:
  (p | beam.Create(['Hello Beam','It`s windowing'])
     | 'window' >> beam.WindowInto(window.FixedWindows(60))
     | 'Log words' >> Output())

## Windowing sliding function

In [None]:
# The following example code shows how to apply Window to divide a PCollection into sliding time windows. 
# Each window is 30 seconds in length, and a new window begins every five seconds:

from apache_beam import window

sliding_windowed_items = (
    input | 'window' >> beam.WindowInto(window.SlidingWindows(30, 5)))


## Window Session

In [None]:
# The following example code shows how to apply Window to divide a PCollection into session windows, where each session must be separated by a time gap of at least 10 minutes (600 seconds):
from apache_beam import window

with beam.Pipeline() as p:
  (p | beam.Create(['Hello Beam','It`s windowing'])
     | 'window' >>  beam.WindowInto(window.Sessions(10 * 60))
     | 'Log words' >> beam.Map(print))

# Triggers

In [None]:
# You can set triggers for your PCollections to change this default behavior. Beam provides several pre-built triggers 
#    •Event time triggers. These triggers operate on the event time, as indicated by the timestamp on each data element. Beam’s default trigger is event time-based.
#    •Processing time triggers. These triggers operate on the processing time – the time when the data element is processed at any given stage in the pipeline.
#    •Data-driven triggers. These triggers operate by examining the data as it arrives in each window and firing when that data meets a certain property. 
# Currently, data-driven triggers only support firing after a certain number of data elements.
#    •Composite triggers. These triggers combine multiple triggers in various ways.

## Handling late data

# If you want your pipeline to process data that arrives after the watermark passes the end 
# of the window, you can apply an allowed lateness when you set your windowing configuration.
# This gives your trigger the opportunity to react to the late data. If allowed lateness
# is set, the default trigger will emit new results immediately whenever late data arrives.7
# You set the allowed lateness by using .withAllowedLateness() when you set your windowing
# function:input = [Initial PCollection]
input | beam.WindowInto(
            FixedWindows(60),
            trigger=AfterProcessingTime(60),
            allowed_lateness=1800) # 30 minutes
     | ...
    


## Types de built-in TRIGGERS:
- AfterAll: It fires when all sub-triggers defined through of(List<Trigger> triggers) method are ready.
- AfterEach: Are defined in inOrder(List<Trigger> triggers) method.The sub-trigger are executed in order, one by one.
- AfterFirst: Executes when at least one of defined sub-triggers fires.
- AfterPane: It uses elementCountAtLeast(int countElems).
- AfterProcessingTime: Processing time-based trigger.
- AfterWatermark: Its method pastEndOfWindow() creates a trigger firing the pane after the end of the window.
- DefaultTrigger: It's the class used by default that is an equivalent to repeatable execution of AfterWatermak trigger.
- NeverTrigger: The pane if fired only after the passes window plus allowed lateness delay.
- OrFinallyTrigger: Special Trigger constructed througth Trigger's orFinally(OnceTrigger until method)
- Repeatedly: To execute given trigger repeatedly. The sub-trigger is defined in forever(Trigger repeatable) method.
    
### Window accumulation
    
When you specify a trigger, you must also set the window's accumulation mode.

To set a window to accumulate the panes that are produced when the trigger fires, invoke.accumulatingFiredPanes() when you set the trigger. To set a window to discard fired panes, invoke .discardingFiredPanes().
    
#### Accumulating mode
    
First trigger firing:  [5, 8, 3]
Second trigger firing: [5, 8, 3, 15, 19, 23]
Third trigger firing:  [5, 8, 3, 15, 19, 23, 9, 13, 10]
    
    ```
    input | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterProcessingTime(1 * 60),
    accumulation_mode=AccumulationMode.ACCUMULATING)
    ```

#### Discarding mode

First trigger firing:  [5, 8, 3]
Second trigger firing:           [15, 19, 23]
Third trigger firing:                         [9, 13, 10]
  
    ```
    input | WindowInto(
    FixedWindows(1 * 60),
    trigger=AfterProcessingTime(1 * 60),
    accumulation_mode=AccumulationMode.DISCARDING)
    ```

# Event time trigger
Is a trigger in Apache Beam that fires based on the timestamps of the elements in a pipeline,
as opposed to the current processing time.

## Modes of this type are

### Discarding
Any late data is discaded and only the data that arrives before the trigger fires is processed.

#### Accumulating
Late data is included and the trigger fires whenever the trigger condition are meet.

```
(p | beam.Create(['Hello Beam','It`s trigger'])
   | 'window' >>  beam.WindowInto(FixedWindows(2),
                                                trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
                                                accumulation_mode=trigger.AccumulationMode.DISCARDING,
                                                timestamp_combiner=trigger.TimestampCombiner.OUTPUT_AT_EOW) \
   | ...)
```

##  Data driven trigger

Fires when specific conditions on the data being processed are met.

Apache Beam provides several options for data-driven triggers, including element
count triggers, processing time triggers, and custom triggers.

```
(p | beam.Create(['Hello Beam','It`s trigger'])
| 'window' >> beam.WindowInto(FixedWindows(2),trigger=trigger.AfterCount(2),accumulation_mode=trigger.AccumulationMode.DISCARDING) \
| ... )
```

## Processing trigger

Processing time trigger is a trigger in Apache Beam that fires based on the current processing time of the pipeline.

```
(p | beam.Create(['Hello Beam','It`s trigger'])
   | 'window' >>  beam.WindowInto(FixedWindows(2),
                                                trigger=trigger.AfterProcessingTime(1),
                                                accumulation_mode=trigger.AccumulationMode.DISCARDING) \
   | ...)
```


## Composite trigger

Allows to specify multiple triggers to be used in combination. when any of the trigger fire, the composite trigger will fire.

```
processing_time_trigger = trigger.AfterProcessingTime(60)
event_time_trigger = trigger.AfterWatermark(early=trigger.AfterCount(100),
                                             late=trigger.AfterCount(200))

composite_trigger = trigger.AfterAll(processing_time_trigger,event_time_trigger)
```


# Triggers Exercise

In [None]:
#   Licensed to the Apache Software Foundation (ASF) under one
import apache_beam as beam

# Output PCollection
class Output(beam.PTransform):
    class _OutputFn(beam.DoFn):
        def __init__(self, prefix=''):
            super().__init__()
            self.prefix = prefix

        def process(self, element):
            print(self.prefix+str(element))

    def __init__(self, label=None,prefix=''):
        super().__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._OutputFn(self.prefix))

with beam.Pipeline() as p:
    processing_time_trigger = beam.trigger.AfterProcessingTime(60)
    # Define an event time trigger
    event_time_trigger = beam.trigger.AfterWatermark(early=beam.trigger.AfterCount(100),
                                             late=beam.trigger.AfterCount(200))

    # Combine the processing time and event time triggers using the Or method
    composite_trigger = beam.trigger.AfterAll(processing_time_trigger,event_time_trigger)

    (p | beam.Create(['Hello Beam','It`s trigger','this','is','a','sample'])
     | 'window' >>  beam.WindowInto(beam.window.FixedWindows(2),
                                                trigger=composite_trigger ,
                                                accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)
     | 'Log words' >> Output())

# TextIO

## Read file

In [None]:
# This provide a way to read and write text files in a pipeline.
# We can use this same way to read GCS, S3, and HDFS

import apache_beam as beam

p = beam.Pipeline()
lines = p | beam.io.ReadFromText('/home/sanvir/Desktop/kinglear.txt')
lines | beam.Map(print)
p.run()

## Write file

In [12]:
# This provide a way to read and write text files in a pipeline.
# We can use this same way to write GCS, S3, and HDFS

import apache_beam as beam

p = beam.Pipeline()
data = ['Hello, World!', 'Apache Beam']
p | beam.Create(data) | beam.io.WriteToText('/home/sanvir/Desktop/samplefile.txt')
p.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7fdd48056580>

# TexIO GCS write

In [None]:
# Example to set up authentication to use GCS

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/staging'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
google_cloud_options.region = 'us-central1'

# set credentials
credentials = GoogleCredentials.get_application_default()


# BigQueryIO

## Reading BigQuery table

In [None]:
# BigQueryIO allows you to read from a BigQuery table and read the results. 
# By default, Beam invokes a BigQuery export request when you apply a BigQueryIO read transform. 
# In Java Beam SDK, readTableRows returns a PCollection of BigQuery TableRow objects. 
# Each element in the PCollection represents a single row in the table.


```
p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations',
                                                            method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
```

## Reading BigQuery query results

In [None]:

```
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT max_temperature FROM `tess-372508.fir.xasw`'))
```

## BigQueryIO write table-schema

In [None]:
"""
    You can use the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, 
    possibly with different schemas.The dynamic destinations feature groups your user type by a user-defined
    destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements
    to the computed destination.In addition, you can also write your own types that have a mapping function to TableRow, 
    and you can use side inputs in all DynamicDestinations methods.
"""

```
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
                                                  ('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

```

In [None]:
import apache_beam as beam

p = beam.Pipeline()

table_spec = bigquery.TableReference(
                 projectId='project-id',
                 datasetId='dataset',
                 tableId='table')

table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

input = p | beam.Create([
    {
        'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
    },
    {
        'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
    },
])

# It defines the schema (table_schema) of the table. The table has two fields: source and quote, both of type STRING.
# The source field is nullable, while the quote field is required.

# It creates the input data which is a collection of dictionaries. Each dictionary represents a row in the BigQuery table.

# Finally, it writes the data to the BigQuery table using the beam.io.WriteToBigQuery function. The write_disposition
# parameter is set to WRITE_TRUNCATE which means that if the table already exists, it will be replaced with the new
# data. The create_disposition parameter is set to CREATE_IF_NEEDED which means the table will be created if it does
# not exist.

input | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)


## KafkaIO

### Read

When reading data from Kafka topics using Apache Beam, developers can use the ReadFromKafka transform to create a PCollection of Kafka messages. 
This transform takes the following parameters:
•consumer_config: a dictionary that contains the Kafka consumer configuration properties, such as the Kafka broker addresses, the group ID of the consumer group, and the deserializer classes for the key and value of the Kafka messages.
•bootstrap.servers: is a configuration property in Apache Kafka that specifies the list of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster.
•topic: the name of the Kafka topic to write the data to.
•with_metadata: a boolean flag that specifies whether to include the Kafka metadata for each message, such as the topic, partition, and offset.For detailed information

In [None]:
input_topic = 'input-topic'
output_topic = 'output-topic'

(p | "Read from Kafka" >> ReadFromKafka(
      topics=[input_topic],
      bootstrap_servers='localhost:9092')
 | "Process data" >> beam.Map(process_data))


In [None]:
import apache_beam as beam

def process_data(element):
    # Do some processing on the data
    return element

options = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=options)

input_topic = 'input-topic'
output_topic = 'output-topic'

bootstrap_servers = {"bootstrap.servers": "kafka_server:9092"}

# Set Kafka parameters: The Kafka topic to read from (input_topic), the Kafka topic to write to (output_topic),
# and the Kafka brokers to connect to (bootstrap_servers) are specified.

# Read from Kafka topic: A KafkaIO ReadFromKafka transform is created, where the topics method is used to specify the
# Kafka topic to read from and the consumer_config method is used to specify the Kafka brokers to connect to.

# Process the data: The data read from Kafka is processed using the beam.Map(process_data) method. In this case,
# the data is simply passed to the process_data function defined earlier.



# (p | "Read from Kafka" >> ReadFromKafka(
#       topics=[input_topic],
#       consumer_config=bootstrap_servers)
#  | "Process data" >> beam.Map(process_data))

p.run().wait_until_finish()


## Write

To use the WriteToKafka transform, developers need to provide the following parameters:
•producer_config: a dictionary that contains the Kafka producer configuration properties, 
such as the Kafka broker addresses and the number of acknowledgments to wait for before 
considering a message as sent.
•bootstrap.servers: is a configuration property in Apache Kafka that specifies the list
of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster.
•topic: the name of the Kafka topic to write the data to.
•key: a function that takes an element from the input PCollection and returns the key to use
for the Kafka message. The key is optional and can be None.
•value: a function that takes an element from the input PCollection and returns the value 
to use for the Kafka message.

In [None]:
(input |  "Write to Kafka" >> WriteToKafka(
       topic=output_topic,
       producer_config = bootstrap_servers,
       key='key',
       value='value'))

In [None]:

import apache_beam as beam

def process_data(element):
    # Do some processing on the data
    return element

options = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=options)

input_topic = 'input-topic'
output_topic = 'output-topic'
bootstrap_servers = {"bootstrap.servers": "kafka_server:9092"}

input = p | beam.Create([{"key": "foo", "value": "bar"}])

"""
This pipeline is an example of how you can use Apache Beam's KafkaIO (in Python SDK) to write data to a Kafka 
topic.Make sure your Kafka server is accessible and running, and the topic exists. 
"""

# (input |  "Write to Kafka" >> WriteToKafka(
#       topic=output_topic,
#       producer_config = bootstrap_servers)
# )

p.run().wait_until_finish()


# REST API

```
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

```

Final Challenge

In [None]:
import apache_beam as beam
import logging
import re
from apache_beam.transforms import window, trigger
from apache_beam.transforms.combiners import CountCombineFn

class Transaction:
    def __init__(self, transaction_no, date, product_no, product_name, price, quantity, customer_no, country):
        self.transaction_no = transaction_no
        self.date = date
        self.product_no = product_no
        self.product_name = product_name
        self.price = price
        self.quantity = quantity
        self.customer_no = customer_no
        self.country = country

    def __str__(self):
        return f"Transaction(transaction_no={self.transaction_no}, date='{self.date}', product_no='{self.product_no}', product_name='{self.product_name}', price={self.price}, quantity={self.quantity}, customer_no={self.customer_no}, country='{self.country}')"

def run():
    with beam.Pipeline() as pipeline:
      transactions = (pipeline
                        | 'Read from text file' >> beam.io.ReadFromText('input.csv')
                        | 'window' >>  beam.WindowInto(beam.window.FixedWindows(30))
                        | 'Filter' >>
                     )


if __name__ == '__main__':
    run()


# Define an event time trigger
# event_time_trigger = beam.trigger.AfterWatermark(early=beam.trigger.AfterCount(100),
                                         # late=beam.trigger.AfterCount(200))

# Combine the processing time and event time triggers using the Or method
composite_trigger = beam.trigger.AfterAll(processing_time_trigger,event_time_trigger)

# (p | beam.Create(['Hello Beam','It`s trigger','this','is','a','sample'])
#  | 'window' >>  beam.WindowInto(beam.window.FixedWindows(2),
#                                             trigger=composite_trigger ,
#                                             accumulation_mode=beam.trigger.AccumulationMode.DISCARDING)

# FINAL CHALLENGE

Final challenge 1You’re given a csv file with purchase transactions. 
Write a Beam pipeline to prepare a report every 30 seconds.
The report needs to be created only for transactions where quantity is more than 20.
Report should consist of two files named "price_more_than_10.txt" and "price_less_than_10.txt":
•Total transactions amount grouped by ProductNo for products with price greater than 10
•Total transactions amount grouped by ProductNo for products with price less than 10Example rows from input file:

Sample of Data:
581538,12/9/2019,22561,Wooden School Colouring Set,7.24,1,14446,United Kingdom     
581538,12/9/2019,84519A,Tomato Charlie+Lola Coaster Set,7.24,1,14446,United Kingdom
581538,12/9/2019,84519B,Carrot Charlie+Lola Coaster Set,7.24,1,14446,United Kingdom
581538,12/9/2019,35004B,Set Of 3 Black Flying Ducks,7.24,2,14446,United Kingdom    
581538,12/9/2019,22068,Black Pirate Treasure Chest,7.24,1,14446,United Kingdom     
581538,12/9/2019,23353,6 Gift Tags Vintage Christmas,6.19,1,14446,United Kingdom   
581538,12/9/2019,21591,Cosy Hour Cigar Box Matches,6.19,1,14446,United Kingdom     
581538,12/9/2019,22197,Popcorn Holder,6.19,4,14446,United Kingdom                  
581538,12/9/2019,23320,Giant 50'S Christmas Cracker,6.19,1,14446,United Kingdom


In [None]:
import apache_beam as beam
import logging
import re
from apache_beam.transforms import window, trigger
from apache_beam.transforms.combiners import CountCombineFn

class Transaction:
    def __init__(self, transaction_no, date, product_no, product_name, price, quantity, customer_no, country):
        self.transaction_no = transaction_no
        self.date = date
        self.product_no = product_no
        self.product_name = product_name
        self.price = price
        self.quantity = quantity
        self.customer_no = customer_no
        self.country = country

    def __str__(self):
        return f"Transaction(transaction_no={self.transaction_no}, date='{self.date}', product_no='{self.product_no}', product_name='{self.product_name}', price={self.price}, quantity={self.quantity}, customer_no={self.customer_no}, country='{self.country}')"


class linesToObject(beam.DoFn):
    def process(self, line):  
        spl = line.split(',')        
        if spl[0] !=  'TransactionNo' :    
            yield  Transaction(spl[0],spl[1],spl[2],spl[3],spl[4],spl[5],spl[6],spl[7])    

def partitonTrans(transaction):
    if float(transaction.price) >= 10.0:
        return 0
    else:
        return 1              
    

def run():
    with beam.Pipeline() as pipeline:
        transactions = (pipeline
                        | 'Read from text file' >> beam.io.ReadFromText('input.csv'))


        transOb = (transactions | 'ConvertLinesToTransactionObject' >> beam.ParDo(linesToObject()))

        windowed_transactions = (transOb | 'Window' >> beam.WindowInto(window.FixedWindows(30), trigger=trigger.AfterWatermark( early=trigger.AfterProcessingTime(5).has_ontime_pane(), late=trigger.AfterAll()), allowed_lateness=30, accumulation_mode=trigger.AccumulationMode.DISCARDING))                

        filter20 = (transOb | 'Quantity>20' >>beam.Filter(lambda t: int(t.quantity) > 20))

        groups = (filter20  | 'partitions' >> beam.Partition(partitionTrans, 2))  

        priceGreater10 = groups[0]    
        priceLower10 = groups[1]           

        (priceGreater10 
                  | 'Dict>10' >> beam.Map(lambda t: (t.transaction_no, float(t.price)))
                  | 'SumByKey>10' >> beam.CombinePerKey(sum) 
                  | 'Save' >> beam.io.WriteToText('price_more_than_10.txt'))

        (priceLower10 
                  | 'Dict>10' >> beam.Map(lambda t: (t.transaction_no, float(t.price)))
                  | 'SumByKey<10' >> beam.CombinePerKey(sum) 
                  | 'Save' >> beam.io.WriteToText('price_less_than_10.txt'))

if __name__ == '__main__':  
    run()

# FINAL CHALLENGE 2

In [None]:
Final challenge 2You are given a file analyzed.csv which maps words to sentiments. 
Using this, analyze kinglear.txt.
Output PCollections counting the number of negative words and positive words as well as PCollections counting the number of positive words with strong modal and positive words with weak modal.
Example rows from input file:
    
    

In [None]:
import re

class SplitWords(beam.DoFn):
    def process(self, element):
        word = element.upper().split(" ")      
        return word  
        
class convertToAnalysis(beam.DoFn):
    def process(self, element):
      sp =  element.split(',')
      if sp[0] != 'Word':
        yield Analysis(sp[0],sp[1],sp[2],sp[3],sp[4],sp[5],sp[6],sp[7])
    
class CountFn(beam.CombineFn):
    def create_accumulator(self):
        return 0

    def add_input(self, accumulator, element):
        count = accumulator
        return count + 1

    def merge_accumulators(self, accumulators):
        counts = accumulators
        return sum(counts)

    def extract_output(self, accumulator):
        (count) = accumulator
        return count if count else 0
                                    
class Analysis:
    def __init__(self, word, negative, positive, uncertainty, litigious, strong, weak, constraining):
        self.word = word
        self.negative = negative
        self.positive = positive
        self.uncertainty = uncertainty
        self.litigious = litigious
        self.strong = strong
        self.weak = weak
        self.constraining = constraining

    def __str__(self):
        return (f'Analysis(word={self.word}, negative={self.negative}, positive={self.positive}, '
                f'uncertainty={self.uncertainty}, litigious={self.litigious}, strong={self.strong}, '
                f'weak={self.weak}, constraining={self.constraining})')

def remove_non_letters(word):
    """Removes non-letter characters from a word.

    Args:
      word: The word to remove non-letters from.

    Returns:
      The word with non-letter characters removed.
    """
    return re.sub(r"[^a-zA-Z]+", "", word)

class match(beam.DoFn):
  def process(self, element, dataset):
       if element.word in dataset:
          yield element
     

def run():
    pipeline_options = PipelineOptions()
    with beam.Pipeline(options=pipeline_options) as p:
      kinglear_words = (p
                       | 'Read from text file' >> ReadFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
                       | 'Split into words' >> beam.ParDo(SplitWords())
                       | 'Filter empty words' >> beam.Filter(bool)
                       | 'Remove Non Letters' >> beam.Map(lambda word: remove_non_letters(word)) 
                       | 'Set' >> beam.Distinct() 
                       #| 'Print' >> beam.Map(print)
                       )

      sentiment_words = (p 
                       | 'Read sentiment words from csv' >> ReadFromText('analysis.csv')
                       | 'CSV into words' >> beam.ParDo(convertToAnalysis())  ) 
                       #| 'Print' >> beam.Map(print) )      

      positive_words = (sentiment_words
                       | 'Positive Words' >> beam.Filter(lambda a: int(a.positive) > 0)
                       #| 'Print positives' >> beam.Map(print)                            
                       ) 

      negative_words = (sentiment_words
                       | 'Negative Words' >> beam.Filter(lambda a: int(a.negative) > 0)
                       #| 'Print negatives' >> beam.Map(print)       
                       )                                
                       
      strong_modal = (positive_words
                       | 'Strong Modal' >> beam.Filter(lambda a: int(a.strong) > 0)
                       #| 'Print negatives' >> beam.Map(print)       
                       )      
       
      weak_modal = (positive_words
                       | 'Weak Modal' >> beam.Filter(lambda a: int(a.weak) > 0)
                       #| 'Print negatives' >> beam.Map(print)       
                       )   
                       
      inter_kinglear_positive = ( positive_words 
                       | 'Intersection Positive' >> beam.ParDo(match(), beam.pvalue.AsIter(kinglear_words)) 
                       #| 'Print match' >> beam.Map(print)
                       )
                       
      inter_kinglear_negative = ( negative_words 
                       | 'Intersection Negative' >> beam.ParDo(match(), beam.pvalue.AsIter(kinglear_words)) 
                       #| 'Print match' >> beam.Map(print)
                       )
                       
      inter_kinglear_strong = ( strong_modal 
                       | 'Intersection Strong' >> beam.ParDo(match(), beam.pvalue.AsIter(kinglear_words)) 
                       #| 'Print match' >> beam.Map(print)
                       )
                       
      inter_kinglear_weak = ( weak_modal 
                       | 'Intersection Weak' >> beam.ParDo(match(), beam.pvalue.AsIter(kinglear_words)) 
                       #| 'Print match' >> beam.Map(print)
                       )
      
      count_positives = ( inter_kinglear_positive
                        | 'Count Positives' >>  beam.CombineGlobally(CountFn()).without_defaults()
                        #| 'Print match' >> beam.Map(print)
                        )
                        
      count_negatives = ( inter_kinglear_negative
                        | 'Count Negatives' >>  beam.CombineGlobally(CountFn()).without_defaults()
                        #| 'Print match' >> beam.Map(print)
                        )
                            
      count_positives_strong = ( inter_kinglear_strong
                        | 'Count Positives Strong' >>  beam.CombineGlobally(CountFn()).without_defaults()
                        #| 'Print match' >> beam.Map(print)
                        )  
                        
      count_positives_weak = ( inter_kinglear_weak
                        | 'Count Positives Weak' >>  beam.CombineGlobally(CountFn()).without_defaults()
                        | 'Print match' >> beam.Map(print)
                        )                                                                  

if __name__ == "__main__":
    run()


# Misc

In [14]:
import apache_beam as beam

class Transaction(typing.NamedTuple):
  bank: str
  purchase_amount: float
    


with beam.Pipeline() as p:
    pc = (p 
          | beam.Create([{"bank":"Hola", "purchase_amount":12, "option":True},{"bank":"Saludos", "purchase_amount":40, "option":False}])
          | beam.Map(lambda item: beam.Row(bank=item["bank"],purchase_amount=item["purchase_amount"], option=item["option"]) )
          | beam.Map(print)
         )

Row(bank='Hola', purchase_amount=12, option=True)
Row(bank='Saludos', purchase_amount=40, option=False)
