## 1. Import Modules

In [None]:
#!/usr/bin/env python
import argparse
import json
import os
import logging
import apache_beam as beam
from datetime import datetime
from apache_beam import DoFn
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys,Create,Map , CombineGlobally ,dataframe
from apache_beam import CombineGlobally, CombinePerKey
from apache_beam.runners.interactive import interactive_runner
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

## 2. Create function to add window start and end to the aggregated windowed data

This function will take transformed aggregated result and adds window start and end dates 

In [157]:
#Add Window Starttime and Endtime to PCollections after the window aggregation
class FormatDoFn(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        ts_format = '%Y-%m-%d %H:%M:%S.%f UTC'
        window_start = datetime.fromtimestamp(window.start)
        window_end = datetime.fromtimestamp(window.end)
        return [{
        'sensorID': element[0],
        'sensorValue': element[1],
        'windowStart': window_start,
        'windowEnd': window_end
        }]

In [None]:
#Add Window Starttime and Endtime to PCollections after the window aggregation
class ProcessDoFn(beam.DoFn):
    def process(self, element):
        yield element

## 3. Create function to add window start and end to the aggregated windowed data using Dataflow Runner
Run this code only if you want to run this lab as a dataflow job. THIS WILL CREATE A DATAFLOW JOB. 
This function will write both raw data and aggregated data into two different BigQuery Tables. 
The below Beam Pipeline, aggregates the data using window functions to create Fixed 10second windows.

In [170]:
def run(runneroption):
    # Parsing arguments
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."',
        default=PUBSUB_SUBSCRIPTION,
    )
    parser.add_argument(
        "--output_raw_table", help="Output BigQuery Table", default=BIGQUERY_RAW_TABLE
    )
    parser.add_argument(
        "--output_agg_table", help="Output BigQuery Table", default=BIGQUERY_AGG_TABLE
    )
    parser.add_argument(
        "--output_raw_schema",
        help="Output BigQuery Schema in text format",
        default=BIGQUERY_RAW_SCHEMA,
    )
    parser.add_argument(
        "--output_schema",
        help="Output BigQuery Schema in text format",
        default=BIGQUERY_SCHEMA,
    )
    known_args, pipeline_args = parser.parse_known_args()

    
    if runneroption == 'DataflowRunner':
        pipeline_options = PipelineOptions(
        pipeline_args,
        runner=runneroption,
        project=DEST_PROJECT,
        job_name='unique-job-name',
        temp_location=BUCKET_NAME,
        region=REGION,
        service_account_email=SERVICE_ACCOUNT_FQN,
        network=VPC_FQN,
        subnetwork=SUBNET)
    else:
        pipeline_options = PipelineOptions(
        pipeline_args,
        runner=runneroption)
    pipeline_options.view_as(StandardOptions).streaming = True    
    with beam.Pipeline(options=pipeline_options) as p:
            mapped =     (
                
                p
                  | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=PUBSUB_SUBSCRIPTION)
                  | "Json Loads" >> Map(json.loads))
            raw_data = (
                mapped             
                        
                  | 'Format' >> beam.ParDo(ProcessDoFn())
                  |  "WriteToBigQueryRaw" >> beam.io.WriteToBigQuery(
                known_args.output_raw_table,
                schema=known_args.output_raw_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
            agg_data = (mapped
                        
                  | "Map Keys" >> Map(lambda x: (x["SensorID"],x["SensorValue"]))
                  | "ApplyFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(3))
                  | "Total Per Key" >> beam.combiners.Mean.PerKey()
                  | 'Final Format' >> beam.ParDo(FormatDoFn())
                  |  "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                known_args.output_agg_table,
                schema=known_args.output_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            ))

In [None]:
if __name__ == "__main__":
    run(runneroption ="dataflowrunner")

In [None]:
sql = """
SELECT sensorValue,row_number() over (order by windowStart)  as cycle FROM `general-demo-364117.Asset_Management_Demo.Anomaly-detection-dataflow` limit 500;
"""
pct_overlap_terms_by_days_apart = client.query(sql).to_dataframe()


In [None]:
pct_overlap_terms_by_days_apart.plot(
    kind="scatter",
    x="cycle",
    y="sensorValue"  , color = 'red',
    figsize=(20, 10)
        
    )