# Process data from OSIsoft PI Server via Google PubSub using Dataflow

### Example slightly modified from Google DataFlow example: 02-Streaming_Word_count.ipynb

In [2]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from datetime import timedelta
import google.auth
import pandas as pd

# The Google Cloud PubSub topic that we are reading from for this example.
topic = "projects/osi-pi-gcp-accelerator/topics/mof_explore_pubsub"

In [3]:
# display all the data
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', -1)

In [4]:
ib.options.capture_duration = timedelta(seconds=600)

In [5]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

In [6]:
# initialize pipeline
p = beam.Pipeline(InteractiveRunner(), options=options)

In [7]:
#beam.runners.interactive.options.capture_control.evict_captured_data()

In [8]:
# create a subscription for specified topic
stream = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

In [7]:
# window time in seconds
windowed_stream = (stream 
                  | "window" >> beam.WindowInto(beam.window.FixedWindows(300)))

In [9]:
#output = (windowed_stream
  | "retrieve" >> beam.io.WriteToText(file_path_prefix='/home/jupyter'))

In [None]:
ib.show(windowed_stream, include_window_info=True)

In [8]:
df = ib.collect(windowed_stream, include_window_info=True)

In [48]:
df.head()

Unnamed: 0,0,event_time,windows,pane_info
0,"{""Element1"":""Element1"",""Attribute1"":18.873699768066409,""Attribute2"":1.4809300231933595,""Timestamp"":""2020-08-22T05:31:44.9650000+00:00""}",1598074339195000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
1,"b'{""Element1"":""Element1"",""Attribute1"":18.698853363037109,""Attribute2"":1.4600553131103515,""Timestamp"":""2020-08-22T05:32:44.9650000+00:00""}'",1598074399250000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
2,"b'{""Element1"":""Element1"",""Attribute1"":19.14048083496094,""Attribute2"":1.5010699768066404,""Timestamp"":""2020-08-22T05:33:44.9650000+00:00""}'",1598074459316000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
3,"b'{""Element1"":""Element1"",""Attribute1"":18.805055313110351,""Attribute2"":1.478,""Timestamp"":""2020-08-22T05:34:44.9650000+00:00""}'",1598074519225000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
4,"b'{""Element1"":""Element1"",""Attribute1"":18.681749420166014,""Attribute2"":1.4581049652099609,""Timestamp"":""2020-08-22T05:35:44.9650000+00:00""}'",1598074579213000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"


In [37]:
# take a copy
#df_backup = df.copy(deep=True)

In [143]:
# restore backup
#df = df_backup.copy(deep=True)

In [145]:
# useful column name
df.rename(columns={0:"event"},inplace=True)

In [171]:
# worth a tryp
df["event"] = df.event.astype(str)

In [172]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 4 columns):
event         10 non-null object
event_time    10 non-null int64
windows       10 non-null object
pane_info     10 non-null object
dtypes: int64(1), object(3)
memory usage: 448.0+ bytes


In [168]:
# get rid of encoding
import re
for key,value in df.iterrows():
    if re.search("^b'.*'$", value["event"]):
        df["event"].iat[key] = value["event"][2:-1] 

In [173]:
df

Unnamed: 0,event,event_time,windows,pane_info
0,"{""Element1"":""Element1"",""Attribute1"":18.873699768066409,""Attribute2"":1.4809300231933595,""Timestamp"":""2020-08-22T05:31:44.9650000+00:00""}",1598074339195000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
1,"{""Element1"":""Element1"",""Attribute1"":18.698853363037109,""Attribute2"":1.4600553131103515,""Timestamp"":""2020-08-22T05:32:44.9650000+00:00""}",1598074399250000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
2,"{""Element1"":""Element1"",""Attribute1"":19.14048083496094,""Attribute2"":1.5010699768066404,""Timestamp"":""2020-08-22T05:33:44.9650000+00:00""}",1598074459316000,"[[1598074200.0, 1598074500.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
3,"{""Element1"":""Element1"",""Attribute1"":18.805055313110351,""Attribute2"":1.478,""Timestamp"":""2020-08-22T05:34:44.9650000+00:00""}",1598074519225000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
4,"{""Element1"":""Element1"",""Attribute1"":18.681749420166014,""Attribute2"":1.4581049652099609,""Timestamp"":""2020-08-22T05:35:44.9650000+00:00""}",1598074579213000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
5,"{""Element1"":""Element1"",""Attribute1"":18.900419860839843,""Attribute2"":1.4849300231933595,""Timestamp"":""2020-08-22T05:36:44.9650000+00:00""}",1598074639244000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
6,"{""Element1"":""Element1"",""Attribute1"":18.736139953613282,""Attribute2"":1.462,""Timestamp"":""2020-08-22T05:37:44.9650000+00:00""}",1598074699226000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
7,"{""Element1"":""Element1"",""Attribute1"":18.52752714538574,""Attribute2"":1.4445101623535157,""Timestamp"":""2020-08-22T05:38:44.9650000+00:00""}",1598074759253000,"[[1598074500.0, 1598074800.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
8,"{""Element1"":""Element1"",""Attribute1"":18.538090301513673,""Attribute2"":1.4459300231933594,""Timestamp"":""2020-08-22T05:39:44.9650000+00:00""}",1598074819195000,"[[1598074800.0, 1598075100.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"
9,"{""Element1"":""Element1"",""Attribute1"":18.587571136474612,""Attribute2"":1.4478250579833984,""Timestamp"":""2020-08-22T05:40:44.9650000+00:00""}",1598074879223000,"[[1598074800.0, 1598075100.0)]","PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)"


In [208]:
import json
df.count()
temp = []
for key, value in df.iterrows():
    temp.append(json.loads(value["event"]))
    #print(type(record))
    #print(type(json.loads(record)))

In [213]:
values = pd.DataFrame.from_dict(temp)