<a href="https://colab.research.google.com/github/tataan40/apache/blob/main/Apache.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install apache-beam

In [3]:
import apache_beam as beam 
import re
from collections import Counter


In [23]:
from apache_beam.transforms.window import *
from apache_beam.io.textio import WriteToText

Streaming Case Study

In [None]:
class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        unix_timestamp = element["timestamp"]
        element = (element["userId"], element["click"])

        yield TimestampedValue(element, unix_timestamp)

In [None]:
with beam.Pipeline() as pipeline:
  logs = (
      pipeline
      | beam.Create([
            {"userId": "Andy", "click": 1, "timestamp": 1603112520},  # Event time: 13:02
            {"userId": "Sam", "click": 1, "timestamp": 1603113240},  # Event time: 13:14
            {"userId": "Andy", "click": 1, "timestamp": 1603115820},  # Event time: 13:57
            {"userId": "Andy", "click": 1, "timestamp": 1603113600},  # Event time: 13:20
      ])
      | beam.ParDo(AddTimestampDoFn()) 
      | beam.WindowInto(Sessions(gap_size=30*60))
      | beam.CombinePerKey(sum)
      | WriteToText(file_path_prefix="file")
      #| beam.Map(print)
  )



Text Filtering


In [None]:
class TextSplit(beam.DoFn):
  def process(self, element):
    words = element.split() 
    yield words


class ScoreWords(beam.DoFn):
  def process(self, element):
    counts = Counter(element)
    yield counts

class Filter(beam.DoFn):
    def process(self, element):
      list_filtered = list(filter(lambda x : r'^le' in x, element))
      #counts = Counter(list_filtered)
      return [list_filtered]
    


In [None]:
with beam.Pipeline() as pipeline:
  pipe1 = (pipeline
           | beam.Create(["L’Europe contribue désormais à plus de la moitié des nouveaux décès liés au Covid-19 dans le monde. En particulier la Russie, l’Ukraine et la Roumanie. Rien d’étonnant, puisque le continent est au cœur de la reprise épidémique qui s’installe depuis quelques semaines au gré des premiers frimas de l’hiver. Le chiffre n’en reste pas moins inquiétant, alors que l’Europe représentait jusque-là un peu plus du quart des quelque 5 millions de morts provoqués par l’épidémie depuis mars 2020. Un cap symbolique franchi le 1er novembre, selon les données officielles fournies par les différents pays et compilées notamment par l’Organisation mondiale de la santé (OMS)."])
           #| beam.Map(lambda x: x.split()) Do the same thing with a lambda function
           | beam.ParDo(TextSplit())
           | beam.ParDo(Filter())
           | beam.ParDo(ScoreWords())
           | beam.Map(print)
           )




Counter()


Streaming Stingray Use Case

In [107]:
class SessionDuration(beam.DoFn):
  def process(self,element):
    user_id = element["userId"]
    duration = 10
    unix_timestamp = element["timestamp"]
    start = element["start"]
    end = element["end"]
    yield TimestampedValue((user_id,duration),unix_timestamp)


In [110]:
with beam.Pipeline() as pipeline:
  logs = (
      pipeline
      | beam.Create([
            {"userId": "001", "start": 1, "end": 0, "timestamp": 1603112520},  # Event time: 13:02 for User 001
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112530},
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112540},
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112550},
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112560},
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112570},
            {"userId": "001", "start": 0, "end": 0, "timestamp": 1603112580},
            {"userId": "004", "start": 1, "end": 0, "timestamp": 1603113240},  # Event time: 13:14 for User 004
            {"userId": "004", "start": 0, "end": 0, "timestamp": 1603113250},
            {"userId": "004", "start": 0, "end": 0, "timestamp": 1603113260},
            {"userId": "004", "start": 0, "end": 0, "timestamp": 1603113270},
            {"userId": "004", "start": 0, "end": 0, "timestamp": 1603113280},
            {"userId": "002", "start": 1, "end": 0, "timestamp": 1603115820},  # Event time: 13:57 for User 002
            {"userId": "002", "start": 0, "end": 0, "timestamp": 1603115830},
            {"userId": "002", "start": 0, "end": 0, "timestamp": 1603115840},
            {"userId": "002", "start": 0, "end": 0, "timestamp": 1603115850},
            {"userId": "002", "start": 0, "end": 0, "timestamp": 1603115860},
            {"userId": "001", "start": 1, "end": 0, "timestamp": 1603113600},  # Event time: 13:20 for User 001
      ])
      | beam.ParDo(SessionDuration())
      | beam.WindowInto(Sessions(100))
      | beam.CombinePerKey(sum)
      #| beam.io.WriteToText(file_path_prefix="Streaming")
      | beam.Map(print)
  )



('001', 70)
('001', 10)
('004', 50)
('002', 50)
