In [1]:
!nautilus --browser /media/desktop/01D7E2330EB2B040/google-cloud-ml/training-data-analyst-master/quests/dataflow_python/8b_Stream_Testing_Pipeline/solution

In [2]:
%%writefile lab_10_taxi_streaming_pipeline.py
import json
import typing
import logging
import apache_beam as beam
from apache_beam.transforms.trigger import AccumulationMode, AfterCount, AfterWatermark
from apache_beam.transforms.combiners import CountCombineFn
import argparse

class TaxiRide(typing.NamedTuple):
    ride_id: str
    point_idx: int
    latitude: float
    longitude: float
    timestamp: str
    meter_reading: float
    meter_increment: float
    ride_status: str
    passenger_count: int

beam.coders.registry.register_coder(TaxiRide, beam.coders.RowCoder)

class JsonToTaxiRide(beam.DoFn):

    def process(self, line):
        row = json.loads(line)
        yield TaxiRide(**row)

class ConvertCountToDict(beam.DoFn):

    def process(self, element, window=beam.DoFn.WindowParam):
        window_start = window.start.to_utc_datetime().strftime("%Y-%m-%dT%H:%M:%S")
        output = {"taxi_rides" : element, "timestamp": window_start}
        yield output


class TaxiCountTransform(beam.PTransform):

    def expand(self, pcoll):
        
        output = (pcoll
                    | "ParseJson" >> beam.ParDo(JsonToTaxiRide())
                    | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')
                    | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60),
                                              trigger=AfterWatermark(late=AfterCount(1)),
                                              allowed_lateness=60,
                                              accumulation_mode=AccumulationMode.ACCUMULATING)
                    | "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
                 )

        return output

def run():

    parser = argparse.ArgumentParser(description='Load from Json from Pub/Sub into BigQuery')

    parser.add_argument('--table_name', required=True, help='Output BQ table')

    opts = parser.parse_args()

    table_name = opts['table_name']

    table_schema = {
        "fields": [
            {
                "name": "taxi_rides",
                "type": "INTEGER"
            },
            {
                "name": "timestamp",
                "type": "STRING"
            },

        ]
    }

    p = beam.Pipeline()

    (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/taxirides-realtime") 
       | "TaxiPickupCount" >> TaxiCountTransform()
       | "ConvertToDict" >> beam.ParDo(ConvertCountToDict())
       | 'WriteAggToBQ' >> beam.io.WriteToBigQuery(
                table_name,
                schema=table_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                )
    )

if __name__ == '__main__':

    run()

Writing lab_10_taxi_streaming_pipeline.py


In [4]:
%%writefile lab_10_taxi_streaming_pipeline_test.py
import logging
import json
import unittest
import sys

import apache_beam as beam

from lab_10_taxi_streaming_pipeline import *
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to_per_window
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, IntervalWindow
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

def main(out = sys.stderr, verbosity = 2):
    loader = unittest.TestLoader()
  
    suite = loader.loadTestsFromModule(sys.modules[__name__])
    unittest.TextTestRunner(out, verbosity = verbosity).run(suite)


class TaxiWindowingTest(unittest.TestCase):

    def test_windowing_behavior(self):

        options = PipelineOptions()
        options.view_as(StandardOptions).streaming = True

        with TestPipeline(options=options) as p:

            base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                         "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                         "\"ride_status\":\"pickup\",\"passenger_count\":1}" 

            base_json_enroute = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                         "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                         "\"ride_status\":\"pickup\",\"passenger_count\":1}" 
            

            test_stream = TestStream().advance_watermark_to(0).add_elements([
                TimestampedValue(base_json_pickup, 0),
                TimestampedValue(base_json_pickup, 0),
                TimestampedValue(base_json_enroute, 0),
                TimestampedValue(base_json_pickup, 60)
            ]).advance_watermark_to(60).advance_processing_time(60).add_elements([
                TimestampedValue(base_json_pickup, 120)
            ]).advance_watermark_to_infinity()

            taxi_counts = (p | test_stream
                             | TaxiCountTransform()
                          )

            EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3],
                                      IntervalWindow(60,120): [1],
                                      IntervalWindow(120,180): [1]}

            assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS),
                        reify_windows=True)

class TaxiLateDataTest(unittest.TestCase):

        def test_late_data_behavior(self):

            options = PipelineOptions()
            options.view_as(StandardOptions).streaming = True

            with TestPipeline(options=options) as p:

                base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                            "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                            "\"ride_status\":\"pickup\",\"passenger_count\":1}" 

                test_stream = TestStream().advance_watermark_to(0).add_elements([
                    TimestampedValue(base_json_pickup, 0),
                    TimestampedValue(base_json_pickup, 0),
                ]).advance_watermark_to(60).advance_processing_time(60).add_elements([
                    TimestampedValue(base_json_pickup, 0)
                ]).advance_watermark_to(300).advance_processing_time(240).add_elements([
                    TimestampedValue(base_json_pickup, 0)
                ])

                EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]}  #On Time and Late Result

                taxi_counts_late = (p | test_stream
                                      | TaxiCountTransform()
                                   )

                assert_that(taxi_counts_late, equal_to_per_window(EXPECTED_RESULTS),
                            reify_windows=True)

if __name__ == '__main__':
    with open('lab_10_testingout.txt', 'w') as f:
        main(f)

Overwriting lab_10_taxi_streaming_pipeline_test.py


```bash
PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)")
export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:${serviceAccount}" --role="roles/dataflow.worker"
```

terminal
```bash
conda activate beam
cd /path
python $workdir/lab_10_taxi_streaming_pipeline_test.py
cat lab_10_testingout.txt
```

In [1]:
!cat lab_10_testingout.txt

test_late_data_behavior (__main__.TaxiLateDataTest) ... ok
test_windowing_behavior (__main__.TaxiWindowingTest) ... ok

----------------------------------------------------------------------
Ran 2 tests in 2.022s

OK
