In [None]:
# Run and print a shell command.
def run(cmd):
    print('>> {}'.format(cmd))
    get_ipython().system(cmd)
    print('')

run('pip install --upgrade pip')

# Install apache-beam.
run('pip install --quiet apache-beam')

# Copy the input file into the local file system.
run('mkdir -p data')
run('wget https://storage.googleapis.com/bdt-beam/orders_v_2022.csv -O data/orders.csv')

>> pip install --upgrade pip
Collecting pip
  Downloading pip-23.2.1-py3-none-any.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 23.1.2
    Uninstalling pip-23.1.2:
      Successfully uninstalled pip-23.1.2
Successfully installed pip-23.2.1

>> pip install --quiet apache-beam
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.2/49.2 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import apache_beam as beam
import re

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

## HIDE CODE FOR NOW

In [None]:
! cat data/users.csv | head

In [None]:
! cat data/orders.csv | head

order_no;user_id;product_list;date_purchased
1000;1887;Cassava;2000-01-01
1001;838;Calabash, Water Spinach;2000-01-01
1002;2032;Onion, Rapini;2000-01-01
1003;1482;Swiss Chard, Artichoke;2000-01-01
1004;475;Turnip Greens, Plantain;2000-01-01
1005;1627;English Cucumber, Parsley Root, Cauliflower;2000-01-01
1006;2000;Bell Pepper, English Cucumber;2000-01-01
1007;2099;Arugula;2000-01-01
1008;2337;Shallots, Jerusalem Artichoke;2000-01-01


# **Sample to check if the code is running

In [None]:
! head -500 data/orders.csv > data/orders_sample.csv

! head -10 data/orders_sample.csv

order_no;user_id;product_list;date_purchased
1000;1887;Cassava;2000-01-01
1001;838;Calabash, Water Spinach;2000-01-01
1002;2032;Onion, Rapini;2000-01-01
1003;1482;Swiss Chard, Artichoke;2000-01-01
1004;475;Turnip Greens, Plantain;2000-01-01
1005;1627;English Cucumber, Parsley Root, Cauliflower;2000-01-01
1006;2000;Bell Pepper, English Cucumber;2000-01-01
1007;2099;Arugula;2000-01-01
1008;2337;Shallots, Jerusalem Artichoke;2000-01-01


# 5.1 Write to CSV

In [None]:
import apache_beam as beam
from apache_beam.transforms.window import SlidingWindows
from datetime import datetime
import time

class OrdersXform1(beam.DoFn):
    def process(self, element):
        orddata = element.split(';')
        orddata[3] = datetime.strptime(orddata[3], '%Y-%m-%d').strftime('%Y-%m-%d %H:%M:%S')
        orddata[0], orddata[1] = orddata[1], orddata[0]
        orddata = ';'.join(orddata)
        yield orddata

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        structime = time.strptime(element[0], '%Y-%m-%d %H:%M:%S')
        structime = time.mktime(structime)
        yield beam.window.TimestampedValue((structime, element[1]), structime)

class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return 0.0, 0

    def add_input(self, accumulator, input):
        return accumulator[0] + input[1], accumulator[1] + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        if accumulator[1] == 0:
            return float('NaN')
        return "{:.4f}".format(accumulator[0] / accumulator[1])

with beam.Pipeline() as pipeline:
    items = (
        pipeline
        | 'read orders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=True)
        | 'format orders' >> beam.ParDo(OrdersXform1())
        | 'split order records' >> beam.Map(lambda x: x.split(';'))
        | 'kv-pair orders' >> beam.Map(lambda element: (str(element[3]), 1))
    )

    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

    # 7-day rolling window
    seven_day_rolling = (
        timestamped_items
        | '7-day window' >> beam.WindowInto(SlidingWindows(size=7*24*3600, period=24*3600))
        | 'group 7-day' >> beam.GroupByKey()
        | 'sum 7-day' >> beam.CombineValues(sum)
        | 'average 7-day' >> beam.CombineGlobally(AverageFn()).without_defaults()
        | 'attach timestamp' >> beam.Map(lambda element, timestamp=beam.DoFn.TimestampParam:
            (datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d'), element))
        | 'format for 7-day csv' >> beam.Map(lambda element: f"{element[0]},{element[1]}")
        | 'Write 7-day results to CSV' >> beam.io.WriteToText('output_7_day.csv')
    )

    # 30-day rolling window
    thirty_day_rolling = (
        timestamped_items
        | '30-day window' >> beam.WindowInto(SlidingWindows(size=30*24*3600, period=24*3600))
        | 'group 30-day' >> beam.GroupByKey()
        | 'sum 30-day' >> beam.CombineValues(sum)
        | 'average 30-day' >> beam.CombineGlobally(AverageFn()).without_defaults()
        | 'attach timestamp 30-day' >> beam.Map(lambda element, timestamp=beam.DoFn.TimestampParam:
            (datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d'), element))
        | 'format for 30-day csv' >> beam.Map(lambda element: f"{element[0]},{element[1]}")
        | 'Write 30-day results to CSV' >> beam.io.WriteToText('output_30_day.csv')
    )




# 5.2 Write to Parquet

In [None]:
pip install pyarrow

In [None]:
import apache_beam as beam
from apache_beam.transforms.window import SlidingWindows
from datetime import datetime
import time
import pyarrow

class OrdersXform1(beam.DoFn):
    def process(self, element):
        orddata = element.split(';')
        orddata[3] = datetime.strptime(orddata[3], '%Y-%m-%d').strftime('%Y-%m-%d %H:%M:%S')
        orddata[0], orddata[1] = orddata[1], orddata[0]
        orddata = ';'.join(orddata)
        yield orddata

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        structime = time.strptime(element[0], '%Y-%m-%d %H:%M:%S')
        structime = time.mktime(structime)
        yield beam.window.TimestampedValue((structime, element[1]), structime)

class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return 0.0, 0

    def add_input(self, accumulator, input):
        return accumulator[0] + input[1], accumulator[1] + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        if accumulator[1] == 0:
            return float('NaN')
        return "{:.4f}".format(accumulator[0] / accumulator[1])

with beam.Pipeline() as pipeline:
    items = (
        pipeline
        | 'read orders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=True)
        | 'format orders' >> beam.ParDo(OrdersXform1())
        | 'split order records' >> beam.Map(lambda x: x.split(';'))
        | 'kv-pair orders' >> beam.Map(lambda element: (str(element[3]), 1))
    )

    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

    # 7-day rolling window
    seven_day_rolling = (
        timestamped_items
        | '7-day window' >> beam.WindowInto(SlidingWindows(size=7*24*3600, period=24*3600))
        | 'group 7-day' >> beam.GroupByKey()
        | 'sum 7-day' >> beam.CombineValues(sum)
        | 'average 7-day' >> beam.CombineGlobally(AverageFn()).without_defaults()
        | 'window info' >> beam.Map(lambda element, timestamp=beam.DoFn.TimestampParam: {
            'timestamp': datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d'),
            'avg_7': float(element)})
        | 'Re-window into global windows' >> beam.WindowInto(beam.window.GlobalWindows())
        | 'Write 7-day results to Parquet' >> beam.io.WriteToParquet(
            file_path_prefix='output_7_day',
            schema=pyarrow.schema([
                pyarrow.field('timestamp', pyarrow.string()),
                pyarrow.field('avg_7', pyarrow.float64())
            ]),
            file_name_suffix=".parquet"
        )
    )
      # 30-day rolling window
    thirty_day_rolling = (
        timestamped_items
        | '30-day window' >> beam.WindowInto(SlidingWindows(size=30*24*3600, period=24*3600))
        | 'group 30-day' >> beam.GroupByKey()
        | 'sum 30-day' >> beam.CombineValues(sum)
        | 'average 30-day' >> beam.CombineGlobally(AverageFn()).without_defaults()
        | 'window info for 30-day' >> beam.Map(lambda element, timestamp=beam.DoFn.TimestampParam: {
            'timestamp': datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d'),
            'avg_30': float(element)})
        | 'Re-window into global windows for 30-day' >> beam.WindowInto(beam.window.GlobalWindows())
        | 'Write 30-day results to Parquet' >> beam.io.WriteToParquet(
            file_path_prefix='output_30_day',
            schema=pyarrow.schema([
                pyarrow.field('timestamp', pyarrow.string()),
                pyarrow.field('avg_30', pyarrow.float64())
            ]),
            file_name_suffix=".parquet"
        )
    )

Directory listing command

In [None]:
ls output_7_day*.parquet

In [None]:
ls output_30_day*.parquet

In [None]:
import pyarrow.parquet as pq

# Read Parquet file
table = pq.read_table('output_7_day-00000-of-00001.parquet')

# Convert to Pandas DataFrame and display
df = table.to_pandas()
print(df.head())

    timestamp   avg_7
0  2000-01-07  176.50
1  2000-01-06  176.50
2  2000-01-05  185.60
3  2000-01-04  124.75
4  2000-01-03   59.00


In [None]:
# Read Parquet file
table = pq.read_table('output_30_day-00000-of-00001.parquet')

# Convert to Pandas DataFrame and display
df = table.to_pandas()
print(df.head())

    timestamp    avg_30
0  2000-01-30  176.4483
1  2000-01-29  168.6786
2  2000-01-28  173.6667
3  2000-01-27  179.6154
4  2000-01-26  180.9200


# Hide

In [None]:
import apache_beam as beam
from apache_beam.transforms.window import SlidingWindows
from datetime import datetime
import time
import pyarrow

class OrdersXform1(beam.DoFn):
    def process(self, element):
        orddata = element.split(';')
        orddata[3] = datetime.strptime(orddata[3], '%Y-%m-%d').strftime('%Y-%m-%d %H:%M:%S')
        orddata[0], orddata[1] = orddata[1], orddata[0]
        orddata = ';'.join(orddata)
        yield orddata

class AddTimestampDoFn(beam.DoFn):
    def process(self, element):
        structime = time.strptime(element[0], '%Y-%m-%d %H:%M:%S')
        structime = time.mktime(structime)
        yield beam.window.TimestampedValue((structime, element[1]), structime)

class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return 0.0, 0

    def add_input(self, accumulator, input):
        return accumulator[0] + input[1], accumulator[1] + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        if accumulator[1] == 0:
            return float('NaN')
        return "{:.4f}".format(accumulator[0] / accumulator[1])

with beam.Pipeline() as pipeline:
    items = (
        pipeline
        | 'read orders' >> beam.io.ReadFromText('data/orders.csv', skip_header_lines=True)
        | 'format orders' >> beam.ParDo(OrdersXform1())
        | 'split order records' >> beam.Map(lambda x: x.split(';'))
        | 'kv-pair orders' >> beam.Map(lambda element: (str(element[3]), 1))
    )

    timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

    # 30-day rolling window
    thirty_day_rolling = (
        timestamped_items
        | '30-day window' >> beam.WindowInto(SlidingWindows(size=30*24*3600, period=24*3600))
        | 'group 30-day' >> beam.GroupByKey()
        | 'sum 30-day' >> beam.CombineValues(sum)
        | 'average 30-day' >> beam.CombineGlobally(AverageFn()).without_defaults()
        | 'window info for 30-day' >> beam.Map(lambda element, timestamp=beam.DoFn.TimestampParam: {
            'timestamp': datetime.fromtimestamp(int(timestamp)).strftime('%Y-%m-%d'),
            'avg_30': float(element)})
        | 'Re-window into global windows for 30-day' >> beam.WindowInto(beam.window.GlobalWindows())
        | 'Write 30-day results to Parquet' >> beam.io.WriteToParquet(
            file_path_prefix='output_30_day',
            schema=pyarrow.schema([
                pyarrow.field('timestamp', pyarrow.string()),
                pyarrow.field('avg_30', pyarrow.float64())
            ]),
            file_name_suffix=".parquet"
        )
    )

RuntimeError: ignored