<a href="https://colab.research.google.com/github/logsmay/inter-probe/blob/main/Apache_Beam_Colab_LZD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Install apache-beam.
run('pip install --quiet apache-beam')

>> pip install --quiet apache-beam
[K     |████████████████████████████████| 3.0MB 9.2MB/s 
[K     |████████████████████████████████| 235kB 57.0MB/s 
[K     |████████████████████████████████| 153kB 60.3MB/s 
[K     |████████████████████████████████| 51kB 3.1MB/s 
[K     |████████████████████████████████| 1.2MB 54.6MB/s 
[K     |████████████████████████████████| 61kB 6.5MB/s 
[K     |████████████████████████████████| 81kB 8.7MB/s 
[K     |████████████████████████████████| 58.1MB 37kB/s 
[K     |████████████████████████████████| 112kB 40.7MB/s 
[?25h  Building wheel for avro-python3 (setup.py) ... [?25l[?25hdone
  Building wheel for dill (setup.py) ... [?25l[?25hdone
  Building wheel for hdfs (setup.py) ... [?25l[?25hdone
  Building wheel for oauth2client (setup.py) ... [?25l[?25hdone
[31mERROR: pydrive 1.3.1 has requirement oauth2client>=4.0.0, but you'll have oauth2client 3.0.0 which is incompatible.[0m
[31mERROR: multiprocess 0.70.9 has requirement dill>=0.3.1, bu

[Apache Beam](https://beam.apache.org/) is an unified programming SDK to design and execute data processing pipelines (both batch & realtime) with multiple supported runners such as [Apache Flink](https://flink.apache.org/), [Apache Spark](https://spark.apache.org/), [Apache Apex](https://apex.apache.org/), [Apache Gearpump](http://incubator.apache.org/projects/gearpump.html), [Apache Samza](http://samza.apache.org/) and [Google Dataflow](https://cloud.google.com/dataflow/)

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders.coders import Coder
import json
import logging
from datetime import datetime

In [None]:
logging.getLogger().setLevel(logging.INFO)

Apache Beam supports [multiple variants of runners](https://beam.apache.org/documentation/runners/capability-matrix/) to build and process data pipelines through highly sophisticated parallel processing. Each runners can be configured accordingly with the respective configuration through pipeline options.

In [None]:
# Current option uses direct runner (local python kernel)
option_dict = {
    'runner': 'DirectRunner',
    'job_name': 'notebook',
}
pipeline_options = PipelineOptions.from_dictionary(option_dict)

In [None]:
input_data = [
    "1b,20181019,10:00 am,click,ios,SG",
    "1b,20181019,10:01 am,install,ios,SG",
    "1c,20181019,10:02 am,page_view,android,SG",
    "1c,20181019,10:03 am,order,android,SG",
    "1b,20181019,11:00 am,page_view,ios,SG",
    "1b,20181019,11:10 am,order,ios,SG",
    "1c,20181019,12:00 pm,page_view,andorid,SG"
]
outputs_prefix = 'outputs/part'

In [None]:
session_gap =  (60 * 60) - 60 

print("removing old files")
run('rm -rf {}*'.format("outputs/"))

# class to prepare data before processing
# add preprocessed session id field & formatted timestamp field from date & time
class DataPrep(beam.DoFn):
    def process(self, element):
        pcol = element.split(",")
        ts_concat = pcol[1] + " " + pcol[2]
        session_id_ = 's' + pcol[0] + "_" 
        session_ts = datetime.strptime(ts_concat, '%Y%m%d %I:%M %p').timestamp()
        pcol.append(session_id_)
        pcol.append(session_ts)
        return [pcol]

# class to add processed session id based on calculated window parameter 
class AddSessionId(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    for x in element[1]:
      x[6] += window.start.to_utc_datetime().strftime("%Y%m%d") + "_" + window.start.to_utc_datetime().strftime("%H%M")
      logging.info(" Event >> %s %s %s with start window=%s"\
                   , x[0], x[3], x[7], window.start.to_utc_datetime())
      yield x[:-1]

# class to export file by session id
class ExportFileBySession(beam.DoFn):
    def __init__(self):
        self.outdir = 'outputs/out-s1h_'
    def process(self, element):
        from apache_beam.io.filesystems import FileSystems 
        print("writing file: " + element[0])
        exp_file_name = self.outdir+element[0]+'.csv'
        writer = FileSystems.create(exp_file_name, 'text/plain')
        session_str = ""
        for row in element[1]:
          session_str += ','.join(row) + '\n'
        writer.write(session_str.encode())
        writer.close()

# apache beam processing starts here with defining pipeline object with options
ptemp = beam.Pipeline(options=pipeline_options)

# start processing files through series of PCollection processing (similar to spark)
pline = (
    ptemp
    | 'loaddata' >> beam.Create(input_data) # can include variety of input using beam.io (batch/realtime)
    | 'prepdata' >> beam.ParDo(DataPrep())
    | 'addts' >> beam.Map(lambda elem: beam.window.TimestampedValue(elem, elem[7]))
    | 'addkey' >> beam.Map(lambda row: (row[0], row))
    | 'windowtimer' >> beam.WindowInto(beam.window.Sessions(session_gap)\
                                       , timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW)
    | 'groupbydevice' >> beam.GroupByKey()
    | 'addsession' >> beam.ParDo(AddSessionId())
    | 'prepexport' >> beam.Map(lambda row: (row[6], row))
    | 'groupbysessionid' >> beam.GroupByKey()
)

# PCollection - just to print on console
(
    pline
    | 'print' >> beam.Map(lambda element: print(element))
)

# PCollection - to export file by generated session id
(
    pline
    | 'exportsession' >> beam.ParDo(ExportFileBySession())
)

result = ptemp.run()
result.wait_until_finish()

removing old files
>> rm -rf outputs/*



INFO:root:Running ((((ref_AppliedPTransform_loaddata/Impulse_3)+(ref_AppliedPTransform_loaddata/FlatMap(<lambda at core.py:2468>)_4))+(ref_AppliedPTransform_loaddata/MaybeReshuffle/Reshuffle/AddRandomKeys_7))+(ref_AppliedPTransform_loaddata/MaybeReshuffle/Reshuffle/ReshufflePerKey/Map(reify_timestamps)_9))+(loaddata/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Write)
INFO:root:Running ((((((((loaddata/MaybeReshuffle/Reshuffle/ReshufflePerKey/GroupByKey/Read)+(ref_AppliedPTransform_loaddata/MaybeReshuffle/Reshuffle/ReshufflePerKey/FlatMap(restore_timestamps)_14))+(ref_AppliedPTransform_loaddata/MaybeReshuffle/Reshuffle/RemoveRandomKeys_15))+(ref_AppliedPTransform_loaddata/Map(decode)_16))+(ref_AppliedPTransform_prepdata_17))+(ref_AppliedPTransform_addts_18))+(ref_AppliedPTransform_addkey_19))+(ref_AppliedPTransform_windowtimer_20))+(groupbydevice/Write)
INFO:root:Running (((groupbydevice/Read)+(ref_AppliedPTransform_addsession_25))+(ref_AppliedPTransform_prepexport_26))+(groupbys

('s1b_20181019_1000', [['1b', '20181019', '10:00 am', 'click', 'ios', 'SG', 's1b_20181019_1000'], ['1b', '20181019', '10:01 am', 'install', 'ios', 'SG', 's1b_20181019_1000']])
writing file: s1b_20181019_1000
('s1b_20181019_1100', [['1b', '20181019', '11:00 am', 'page_view', 'ios', 'SG', 's1b_20181019_1100'], ['1b', '20181019', '11:10 am', 'order', 'ios', 'SG', 's1b_20181019_1100']])
writing file: s1b_20181019_1100
('s1c_20181019_1002', [['1c', '20181019', '10:02 am', 'page_view', 'android', 'SG', 's1c_20181019_1002'], ['1c', '20181019', '10:03 am', 'order', 'android', 'SG', 's1c_20181019_1002']])
writing file: s1c_20181019_1002
('s1c_20181019_1200', [['1c', '20181019', '12:00 pm', 'page_view', 'andorid', 'SG', 's1c_20181019_1200']])
writing file: s1c_20181019_1200


'DONE'

In [None]:
run('head -n 20 {}*'.format("outputs/"))

>> head -n 20 outputs/*
==> outputs/out-s1b_20181019_1000.csv <==
1b,20181019,10:00 am,click,ios,SG,s1b_20181019_1000
1b,20181019,10:01 am,install,ios,SG,s1b_20181019_1000

==> outputs/out-s1b_20181019_1100.csv <==
1b,20181019,11:00 am,page_view,ios,SG,s1b_20181019_1100
1b,20181019,11:10 am,order,ios,SG,s1b_20181019_1100

==> outputs/out-s1c_20181019_1002.csv <==
1c,20181019,10:02 am,page_view,android,SG,s1c_20181019_1002
1c,20181019,10:03 am,order,android,SG,s1c_20181019_1002

==> outputs/out-s1c_20181019_1200.csv <==
1c,20181019,12:00 pm,page_view,andorid,SG,s1c_20181019_1200

