# Machine Learning on Streams with Apache Beam

Read-only link: [notebook](https://colab.research.google.com/drive/1RWtxEWsjzlrltbx4zrKMYOUt_P0KCkwA?usp=sharing)


# What we will build 

In this workshop, we will learn how to use Apache Beam to detect potential disasters occurring in the world. We will receive a stream of Tweets as input, use machine learning to detect anomalies, and send alert events in an output stream. These alerts can be consumed in real-time by any service of interest.


# Streaming Data Processing

Data processing is either offline or online. There are a lot of applications    from streaming/online machine learning, where you receive a continuous flux of data to transform. The output can then be either stored or directly sent to another output stream.
For instance, detecting anomalies in a continuous stream of events (IoT sensors, transactions, ...) and sending alterts to an output stream.


# Apache Beam

**Apache Beam** is a unified programming model to define batch and **streaming** data processing jobs. It's compatible with multiple executor engine such as *Apache Spark*, *Apache Flink* or *Google DataFlow*. It follows the philosopy "Write once, run everywhere" and has SDK in multiple languages, i.e **Python**, Java and Go.





## Asumptions

We assume a basic knowledge in machine learning (mainly Pandas and SKlearn) and Python programming language.






# Dataset

The dataset comes from a Kaggle competition: [Natural Language Processing with Disaster Tweets
](https://www.kaggle.com/c/nlp-getting-started).

The objective is to predict which Tweets talks about about natural disasters and which does not.

Example of disaster Tweet:
> Heard about #earthquake in different cities, stay safe everyone

Example of normal Tweet:
> No don't tell me that!



# Technologies
1. **Google Cloud Storage**: bucket to store the data and the trained model
2. **Apache Beam**: Python SDK to create data processing job
3. **Google Cloud Pub/Sub**: ingestion platform for event-driven systems and streaming analytics
4. **Google Cloud DataFlow**: cloud executor for job expressed with Apache Beam SDK






# Overall System
![overall pipeline](https://i.postimg.cc/sDSB2Tzm/overall-pipeline.png
"Overall System")





# Beam Pipeline

![beam pipeline](https://i.postimg.cc/T1rRkTD9/beam-pipeline.png
"Beam Pipeline")

# Plan

1. Setup libraries and configuration
2. Prepare the input (Tweets) and output (predictions) streams
3. Train a simple model and save it in a Google Cloud bucket
4. Build a Beam streaming pipeline to process the Tweets

# Setup and Configuration
Import libraries and authenticate

In [5]:
# Do not forget to restart the runtime after running this cell
# or the libraries won't be available
!pip install --quiet google-cloud-pubsub
!pip install --quiet apache-beam[gcp]
!pip install --quiet fsspec gcsfs

In [29]:
from typing import Tuple, List, Dict, Iterable, Any

import google.auth
from google.colab import auth
from google.cloud import pubsub as pubsub
from google.api_core import exceptions as gexc

In [30]:
# { display-mode: "form" }
project_id = "starthack-workshop" #@param {type:"string"}

In [31]:
%env GCLOUD_PROJECT=starthack-workshop

env: GCLOUD_PROJECT=starthack-workshop


In [32]:
auth.authenticate_user()

# A glance at Google Cloud Pub/Sub

It's the service used in this workshop to handle stream of events. We briefly introduce the 2 main concepts to understand how it works: **Topics** and **Subscriptions**.


## Topics: enqueuing messages

A topic can be seen as queue. It's the place where you push/enqueue messages. In our setup we want a topic to push Tweets (input) and a topic to push predictions (output).

## Subscriptions: dequeuing (consuming) messages

Now that messages are pushed to the topics (*queues*) we use subscriptions to pull messages out of it (*dequeue*). Any application who need to access message in the queue does it through a subscription. In our case we want a subscription to pull Tweets for the first queue and a subscription to pull predictions from the second queue.

A topic can have multiple subscriptions. It will deliver the messages once to every subscriptions. 



## Overview
![overall pipeline](https://i.postimg.cc/zvL1f2Rk/pub-sub.png
"Overall System")



In [5]:
# Create the publisher and subscriber clients
sub_client = pubsub.SubscriberClient()
pub_client = pubsub.PublisherClient()
# Choose a topic name for input and output steams
topic_path_in = pub_client.topic_path(project_id, "tweets-in")
topic_path_out = pub_client.topic_path(project_id, "predictions-out")
# Choose subscriptions name
subscription_path_in = sub_client.subscription_path(project_id, "subscription_in")
subscription_path_out = sub_client.subscription_path(project_id, "subscription_out")
# Sotre the project id in a variable
project_path=f"projects/{project_id}"

# Topic creation

First we need to create Pub/Sub topics to push and pull stream of events. In our case we wil have one topic for incoming Tweets *tweets-in* and one topic for outgoing predictions *predictions-out*.

In [6]:
try:
  topic_in = pub_client.create_topic(topic_path_in)
  print(f"Created topic: {topic_in.name}")
except gexc.AlreadyExists:
    print("Skipping topic creation, already exists")

try:
  topic_out = pub_client.create_topic(topic_path_out)
  print(f"Created topic: {topic_out.name}")
except gexc.AlreadyExists:
    print("Skipping topic creation, already exists")

Skipping topic creation, already exists
Skipping topic creation, already exists


In [7]:
for topic in pub_client.list_topics(project_path):
    print(topic.name)

projects/starthack-workshop/topics/tweets-in
projects/starthack-workshop/topics/predictions-out


# Subscription Creation

Then we need subscriptions to pull the events pushed to the topics. In our case we have a subscription to get Tweets from the *tweets-in* topic and a subscription to pull predictions from the *predictions-out*.

In [8]:
try:
  subscription_in = sub_client.create_subscription(subscription_path_in, topic_path_in)
  print(f"created topic: {subscription_in.name}")
except gexc.AlreadyExists:
    print("Skipping subscription creation, already exists")

try:
  subscription_out = sub_client.create_subscription(subscription_path_out, topic_path_out)
  print(f"created topic: {subscription_out.name}")
except gexc.AlreadyExists:
    print("Skipping subscription creation, already exists")

Skipping subscription creation, already exists
Skipping subscription creation, already exists


In [9]:
for subscription in sub_client.list_subscriptions(project_path):
    print(subscription.name)

projects/starthack-workshop/subscriptions/subscription_out
projects/starthack-workshop/subscriptions/subscription_in


# Modeling

We need a model to classify tweets as disaster or not. 

**The purpose of this workshop is not modeling, so we will not spend to much time on it and go for an easy solution. The pre-processing and modeling are purposedly rudimentary.**

We build the model as a SKlearn pipeline, it has the advantage to bundle multiple steps into one estimator. We can then have our preprocessing included in the model:

1. First we select the column of interest in the DataFrame, in our case the text column. This column does not contain any NA values so ...
2. We vectorize the raw text using TFIDF ([TFIDF details](https://en.wikipedia.org/wiki/Tf%E2%80%93idf), [Sklearn TFIDF](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html))
3. We apply a Random Forest classifier ([Sklearn RandomForestClassfier](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestClassifier.html))

In [15]:
import pickle

import pandas as pd
from google.cloud import storage
from sklearn.model_selection import cross_validate
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import OneHotEncoder, FunctionTransformer

In [16]:
# Read the data from our Google Cloud bucket
df_train = pd.read_csv("gs://starthack-workshop-data/train.csv")

df_train.dropna().head()

Unnamed: 0,id,keyword,location,text,target
31,48,ablaze,Birmingham,@bbcmtd Wholesale Markets ablaze http://t.co/l...,1
32,49,ablaze,Est. September 2012 - Bristol,We always try to bring the heavy. #metal #RT h...,0
33,50,ablaze,AFRICA,#AFRICANBAZE: Breaking news:Nigeria flag set a...,1
34,52,ablaze,"Philadelphia, PA",Crying out for more! Set me ablaze,0
35,53,ablaze,"London, UK",On plus side LOOK AT THE SKY LAST NIGHT IT WAS...,0


# Explain model with schema


In [17]:
def select_text(df: pd.DataFrame) -> pd.Series:
    return df["text"]

# Model pipeline
pipe = Pipeline([
    ('selector', FunctionTransformer(select_text)),
    ('tfidf', TfidfVectorizer(lowercase=True)),
    ('model', RandomForestClassifier(random_state=42))

])


In [None]:
# OPTIONAL - cross-validation step to get an idea of model performances

cv_results = cross_validate(pipe, df_train, df_train['target'], cv=3, scoring=["roc_auc"], return_train_score=True, n_jobs=-1)
cv_results = pd.DataFrame(cv_results)
cv_results.agg(("mean", "std"))


Unnamed: 0,fit_time,score_time,test_roc_auc,train_roc_auc
mean,7.989036,0.267601,0.764883,0.999892
std,1.193525,0.055022,0.037528,8.2e-05


In [14]:
model = pipe.fit(df_train, df_train["target"])

In [33]:
# We pickle and save the model to our Google Cloud bucket

pickle.dump(model, open("model.pkl", 'wb'))
bucket = storage.Client().get_bucket('starthack-workshop-data')
blob = bucket.blob('model.pkl')
blob.upload_from_filename('model.pkl')


NameError: ignored

# Beam Pipeline Overview

We can summarize beam pipelines as follow:
1. Read data from any source into a `PCollection` A
2. Transform A into another `PCollection` B by applying a `PTranform` (map, filter, ...)
3. Transform B intot another `PCollection`
4. ... keep chaing tranformers
5. Save the final output somewhere


<br/>

![beam-pipeline-doc](https://beam.apache.org/images/design-your-pipeline-linear.svg "Windowing")
[*Image taken from official documentation*](https://beam.apache.org/documentation/programming-guide/#transforms)

<br/>

## Initialization

We first need an pipeline. It will be the entry point:

```python
pipeline = beam.Pipeline()
```

Or to create a pipeline and run it locally directly:

```python

with beam.Pipeline() as pipeline:
    ...
```

## Applying a transform on your pipeline

Use the **pipe** operator `|` to apply a transform on a collection. You can pair it with the `>>` to give a name to the transformation:

Without name:

```python
pipeline | transform
```

With name:

```python
pipeline | "step_name" >> transform
```

## Read data


In [7]:
import apache_beam as beam

In [9]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

1, 2, 3, 4, 5, 


## Common operators

### beam.Map (map a function on a collection)

In [18]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "add_one" >> beam.Map(lambda x: x + 1)
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

2, 3, 4, 5, 6, 

### beam.ParDo with beam.DoFn

Beam has a generic transform for data processing: `beam.ParDo`. 
It takes as input a function to apply: `beam.DoFn`. 

### ParDo
1. run on each element in the input `PCollection`
2. apply a processing function
3. emit **zero**, **one**, or **many outputs** for each input element

It's generic and can implement any usual transform: map, flatmap, filter, ...
Indeed all these are special cases of `beam.ParDo` processing.


#### DoFn
`beam.DoFn` represent a processing function applied by the `beam.ParDo`.

A beam.DoFn function **should always return an iterable or None**. beam.ParDo will flatten the iterable.


For instance if you want to return one element, you need to wrap it into a list or tuple:

```
1. 1 -> Pardo()
2. DoFnMultBy2(1) -> [2]
3. ParDo([2]) -> 2
```

The first dimension of your output is flattened. 

Another nice feature is **stateful computation**. Since `beam.DoFn` is a class, it is possible to have an **internal state**. 





Quick example: add an incremental index, starting from N, to elements

```
Input: 1, 2, 3, 4, 5

Apply IdxFn

Output:  (N, 1), (N+1, 2), (N+2, 3), (N+3, 4), (N+4, 5)
```

In [10]:
class IdxFn(beam.DoFn):
    """Custom DoFn function to add an index. It will be applied using beam.ParDo"""
    def __init__(self, init_state: int):
        # this is is the first number to use as an index, we will increment it for each element
        self.state = init_state

    def process(self, elem: Any) -> Iterable[Any]:
        # create a tuple with the current state as index and the element
        res = (self.state, elem)
        # increment the state
        self.state += 1
        return [res]

print("Like a Map:")
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "with_idx" >> beam.ParDo(IdxFn(init_state=10))
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )



Like a Map:
(10, 1), (11, 2), (12, 3), (13, 4), (14, 5), 

### beam.Filter (filter a collection) - Optional, not used in this workshop


In [27]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "keep_even_only" >> beam.Filter(lambda x: (x % 2) == 0)
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

2, 4, 

### beam.FlatMap (filter a collection) - Optional, not used in this workshop

In [28]:
with beam.Pipeline() as pipeline:
    (pipeline
        | "read_data" >> beam.Create([1, 2, 3, 4, 5])
        | "duplicate_inputs" >> beam.FlatMap(lambda x: [x, x])
        | "print" >> beam.Map(lambda x: print(x, end=', '))
    )

1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 

### beam.window.WindowInto

#### Without window

![without-window](https://i.postimg.cc/N0Nksgrx/without-window.png
"Windowing")

#### With session window

![with-window](https://i.postimg.cc/cLFWzjCV/with-window.png
"Windowing")

In [26]:
with beam.Pipeline() as pipeline:
    input_stream = [(1, 100), (2, 101), (3, 102), (4, 200)]
    input_stream = [beam.window.TimestampedValue(elem, timestamp) for elem, timestamp in input_stream]
    pc = (pipeline
        | beam.Create(input_stream)
        | 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
        | 'group' >> beam.GroupBy()
        | 'pp' >> beam.Map(lambda x: print(f"Batch: {x}"))
    )

Batch: (Key(), [1, 2, 3])
Batch: (Key(), [4])


# Our Beam Pipeline

As a reminder, this is what we want:

![beam pipeline](https://i.postimg.cc/xT5TvZnJ/beam-pipeline-focus.png
"Beam Pipeline")

We will focus on two important parts: **Grouping by chunk** of time and **Predicting**.






# Predict

The most custom and critical step here is the **prediction**. Indeed we need to build a custom `beam.DoFn` with an initialization step pulling the pickled model from Google Cloud and loading the model. Then use the model to transform the incoming tweets. 

Here are the steps for the prediction part:
1. we implement a custom `beam.DoFn` class called `ApplyModel`
2. in the `__init__` method we pull and load the model
3. we implement the `process` method which receives batch of tweets and use the model to label them as *DISASTER* or *NORMAL*


![dofn](https://i.postimg.cc/KYpdxdBT/dofn.png
"Windowing")

In [21]:
import re
import json

In [22]:
class ApplyModel(beam.DoFn):
    def __init__(self):
        """Initialize the function"""
        self._model = None
        # We import in the init statement to have the library available even when we run the pipeline 
        # on remote executors such as DataFlow
        from google.cloud import storage
        import pandas as pd
        import pickle
        self._pd = pd
        bucket = storage.Client().get_bucket('starthack-workshop-data')
        blob = bucket.get_blob('model.pkl')
        self._model = pickle.loads(blob.download_as_string())

     
    def process(self, group: Tuple[Any, Iterable[Any]]) -> Iterable[Any]:
    #def process(self, group):
        """Process every batch of tweets"""
        # extract batch and build DataFrame
        _, batch = group
        df = self._pd.DataFrame(batch)
        # predict and format
        y = self._model.predict(df)
        df['prediction'] = y
        df.loc[df["prediction"] == 0, "prediction"] = "NORMAL"
        df.loc[df["prediction"] == 1, "prediction"] = "DISASTER"
        res = df.to_dict(orient="records")
        return res

        


def build_pipeline(pipeline: beam.Pipeline) -> beam.Pipeline:
    """Takes an empty beam.Pipeline in input and returns the full beam.Pipeline"""
    
    # first read incoming Tweets from Pub/Sub
    messages = (
        pipeline
        | beam.io.ReadFromPubSub(subscription=subscription_path_in).
        with_output_types(bytes))
    
    # decode from bytes to text and then to JSON
    parse = (messages
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'to_json' >> beam.Map(lambda x: json.loads(x))
    )
    
    # group the tweets in time windows, this will be our batches
    group = (parse 
        | 'window' >> beam.WindowInto(beam.window.FixedWindows(4))
        | 'group' >> beam.GroupBy()
    )

    def debug_fn(json_with_prediction: Dict) -> Dict:
        """Pretty print for model predictions"""
        txt = json_with_prediction["text"]

        dots = re.findall('.{1,80}', ' '*len(txt))
        txt = re.findall('.{1,80}', txt)

        for txt_line, dots_line in zip(txt, dots):
            if json_with_prediction["prediction"] == "NORMAL":
                ff = f"{txt_line:80s} | {dots_line:80s}"
            else:
                ff = f"{dots_line:80s} | {txt_line:80s}"
            print(ff)

        print(f"{' '*80} | {' '*80}")

        return json_with_prediction
    
    # make predictions and converts them to JSON then to bytes
    predict = (
        group
        | 'predict' >> beam.ParDo(ApplyModel())
        | 'debug' >> beam.Map(debug_fn)
        | 'to_bytes' >> beam.Map(lambda x: json.dumps(x).encode("utf-8"))
    )

    # write the results to Pub/Sub
    write = (
        predict
        | "Write to PubSub" >> beam.io.WriteToPubSub(topic=topic_path_out, with_attributes=False)
    )


    return pipeline

# How to run a pipeline

## Locally

We can run a Beam pipeline locally using the `DirectRunner`, it's useful to for debugging and tesing.

## Remotely
In production, we want to run the pipeline into a remote executor for maximum performance and potentially scaling capabilities. 
In this workshop we will use the Google Cloud DataFlow executor. 

## How we proceed

We will first run it locally to make sure that everything runs smoothly.

Then we will run it on Google Cloud DataFlow runner to simulate a production application.

In [23]:
from apache_beam.runners import DataflowRunner
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions, GoogleCloudOptions

In [34]:
print()
print(f"{'NORMAL':80s} | {'DISASTER':80s}")
print(f"{'':80s} | {'':80s}")

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with beam.Pipeline(options=options) as pipeline:
    build_pipeline(pipeline)



NORMAL                                                                           | DISASTER                                                                        
                                                                                 |                                                                                 
Apocalypse lighting. #Spokane #wildfires                                         |                                                                                 
                                                                                 |                                                                                 
                                                                                 | Just happened a terrible car crash                                              
                                                                                 |                                                                                 
               

KeyboardInterrupt: ignored

In [None]:
# Set up Apache Beam pipeline options.
options = PipelineOptions(streaming=True)

# Set the project to the default project in your current Google Cloud
# environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'europe-west6'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://starthack-workshop-data/dataflow'

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# Set the SDK location. This is used by Dataflow to locate the
# SDK needed to run the pipeline.
#options.view_as(SetupOptions).sdk_location = (
#    '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
#    beam.version.__version__)

pipe = build_pipeline(beam.Pipeline())
runner = DataflowRunner()
runner.run_pipeline(pipe, options)

# Subscribe to the output stream

In [None]:
import pprint

def callback(message: str):
    """Callback to deal with the received message"""
    x = json.loads(message.data.decode("utf-8"))
    print()
    print(x)
    print()
    message.ack()



# Subscribe to the topics
streaming_pull_future = sub_client.subscribe(subscription_path_out, callback=callback)
print(f"Listening for messages on {subscription_path_out} ...\n")

while True:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()

# Conclusion


## Machine Learning on Streams 
Processing streams with machine learning has many applications:
1. IoT sensors
2. Server logs ...


## Apache Beam
Apache Beam facilitate stream manipulation. It has numerous advantages:
1. Multiple compatible backends
2. Available in multiple programming languages
3. Built-in support for batch and **streaming** processing


## Serverless
We used multiple managed services (Storage, Pub/Sub, DataFlow). It simple to setup and it scales smoothly.
Just be careful with the bill, features like auto-scaling can surprise you.





# Thank You For Attending !

## We hope this workshop will be helpful !