# -*- coding: utf-8 -*-
from __future__ import print_function
import re
import sys
import time
from dateutil.parser import parse as parse_datetime
from import tzlocal
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class Print(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.ParDo(print)
class PrintWindowFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
print('[%s, %s) @ %s' % (window.start.isoformat(), window.end.isoformat(), element))
def extract_timestamp(log):
mo ='\[(.*?)\]', log)
if mo is not None:
dt = parse_datetime(, fuzzy=True)
dt = dt.astimezone(tzlocal())
return int(time.mktime(dt.timetuple()))
except Exception:
return int(time.time())
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
ts = extract_timestamp(element)
yield beam.window.TimestampedValue(element, ts)
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
lines = p | 'Create' >>'access.log')
windowed_counts = (
| 'Timestamp' >> beam.ParDo(AddTimestampDoFn())
| 'Window' >> beam.WindowInto(beam.window.SlidingWindows(600, 300))
| 'Count' >> (beam.CombineGlobally(beam.combiners.CountCombineFn())
windowed_counts = windowed_counts | beam.ParDo(PrintWindowFn())