In [1]:
pip install --quiet apache-beam

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m15.7/15.7 MB[0m [31m71.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m49.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 MB[0m [31m18.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
!pip install --upgrade apache-beam[gcp]


Collecting google-apitools<0.5.32,>=0.5.31 (from apache-beam[gcp])
  Downloading google-apitools-0.5.31.tar.gz (173 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m173.5/173.5 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting google-cloud-pubsublite<2,>=1.2.0 (from apache-beam[gcp])
  Downloading google_cloud_pubsublite-1.11.1-py2.py3-none-any.whl.metadata (5.6 kB)
Collecting google-cloud-storage<3,>=2.18.2 (from apache-beam[gcp])
  Downloading google_cloud_storage-2.18.2-py2.py3-none-any.whl.metadata (9.1 kB)
Collecting google-cloud-spanner<4,>=3.0.0 (from apache-beam[gcp])
  Downloading google_cloud_spanner-3.49.1-py2.py3-none-any.whl.metadata (10 kB)
Collecting google-cloud-dlp<4,>=3.0.0 (from apache-beam[gcp])
  Downloading google_cloud_dlp-3.25.0-py2.py3-none-any.whl.metadata (5.4 kB)
Collecting google-cloud-videointelligence<3,>=2.0 (from apache-beam[gcp])
  Downloading google_cloud_videointelli

In [3]:
!pip install apache-beam[gcp]



In [4]:
#Basic Apache Beam Pipeline

import apache_beam as beam

# Define a ParDo class for custom transformation
class SplitWords(beam.DoFn):
    def process(self, element):
        return element.split()

# Create the pipeline
with beam.Pipeline() as pipeline:
    # Reading from in-memory data (Pipeline IO)
    words = (
        pipeline
        | 'Read from Input' >> beam.Create(['This is Apache Beam', 'Exploring Beam Features', 'Done by Apurva Karne'])
        # Applying ParDo for word splitting
        | 'Split words' >> beam.ParDo(SplitWords())
        # Writing output (Pipeline IO)
        | 'Write to Output' >> beam.io.WriteToText('output.txt')
    )



In [5]:
#Composite Transform

# Define a composite transform
class ProcessText(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Split text' >> beam.ParDo(SplitWords())
            | 'Filter short words' >> beam.Filter(lambda word: len(word) > 3)
        )

# Use composite transform in pipeline
with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create input' >> beam.Create(['Apache Beam Pipeline', 'Learning transforms', 'Apurva Karne'])
        | 'Process text' >> ProcessText()
        | 'Write to text' >> beam.io.WriteToText('composite_output.txt')
    )


In [6]:
#Windowing and Triggers (For Streaming)

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from datetime import datetime

# Modify the SplitWords ParDo class to handle tuple input
class SplitWords(beam.DoFn):
    def process(self, element):
        # Assuming element is a tuple, access the first part (the string)
        text = element[0]
        # Apply split on the text part of the tuple
        return text.split()

# Define the pipeline for windowing
with beam.Pipeline() as pipeline:
    windowed_data = (
        pipeline
        # Create stream data as tuples (string, timestamp)
        | 'Create stream data' >> beam.Create([("This is a test message for apache beam testing- Apurva", datetime.now()) for _ in range(10)])
        # Apply fixed windowing
        | 'Window into fixed intervals' >> beam.WindowInto(FixedWindows(10)) # 10-second windows
        # Apply ParDo for word splitting
        | 'Process elements' >> beam.ParDo(SplitWords())
        # Write the output
        | 'Write windowed output' >> beam.io.WriteToText('window_output.txt')
    )


In [7]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import pickle

# Load the Iris dataset
iris = load_iris()

# Select only 2 features for model training
X = iris.data[:, :2]  # Use only the first two features
y = iris.target

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train the RandomForest model on 2 features
model = RandomForestClassifier()
model.fit(X_train, y_train)

# Save the model
model_path = 'iris_feature_model.pkl'
with open(model_path, 'wb') as model_file:
    pickle.dump(model, model_file)


In [8]:
# Define the ParDo for model inference with 2 features
class SklearnInference(beam.DoFn):
    def process(self, element):
        data = [[element['feature1'], element['feature2']]]  # Only 2 features
        prediction = model.predict(data)
        yield {'input': element, 'prediction': prediction[0]}

# Define the pipeline for 2 features
with beam.Pipeline() as pipeline:
    predictions = (
        pipeline
        | 'Create sample data' >> beam.Create([
            {'feature1': 0.5, 'feature2': 1.5},
            {'feature1': 0.3, 'feature2': 0.7}
        ])
        # Run manual inference
        | 'Run manual model inference' >> beam.ParDo(SklearnInference())
        | 'Write predictions' >> beam.io.WriteToText('ml_predictions.txt')
    )


In [9]:
# Define the ParDo for model inference with 4 features
class SklearnInference(beam.DoFn):
    def process(self, element):
        # Assuming element has 4 features
        data = [[element['feature1'], element['feature2']]]
        prediction = model.predict(data)
        yield {'input': element, 'prediction': prediction[0]}

# Define the pipeline with correct input (4 features)
with beam.Pipeline() as pipeline:
    predictions = (
        pipeline
        | 'Create sample data' >> beam.Create([
            {'feature1': 0.5, 'feature2': 1.5, 'feature3': 2.5, 'feature4': 3.5},  # Add 4 features
            {'feature1': 0.3, 'feature2': 0.7, 'feature3': 1.2, 'feature4': 2.0}
        ])
        # Run manual inference
        | 'Run manual model inference' >> beam.ParDo(SklearnInference())
        | 'Write predictions' >> beam.io.WriteToText('ml_predictions.txt')
    )
