In [381]:
from apache_beam.io.aws.s3io import parse_s3_path,S3IO
from apache_beam.options import pipeline_options
import os
import boto3
import csv
import pandas as pd

In [382]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.interactive.interactive_beam import *
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.dataframe import convert
from apache_beam.dataframe.io import read_csv
import numpy as np

In [383]:
SECRET_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
REGION = os.getenv('DEFAULT_REGION')

In [397]:
def parse_file(element):
    for line in csv.reader(
        [element],
        quotechar='"',
        delimiter=",",
        skipinitialspace=True,
    ):
        return line

In [385]:
def convert_dict(element):
    return {
        "name": element[0],
        "height": int(element[1]),
        "weight": int(element[2]),
    }

In [386]:
class ConvertToByteArray(beam.DoFn):
    def __init__(self):
        pass

    def setup(self):
        pass

    def process(self, row):
        try:
            yield bytearray(str(row) + '\n', 'utf-8')

        except Exception as e:
            raise e

In [398]:
def write_to_s3(value):
    S3IO(
        options=pipeline_options.S3Options(
            [
                f"--s3_region_name={REGION}",
                f"--s3_access_key_id={ACCESS_KEY_ID}",
                f"--s3_secret_access_key={SECRET_ACCESS_KEY}",
                "--s3_endpoint_url=http://localstack:4566",
            ]
        )
    ).open('s3://test-bucket/file/output_filename.txt', mode='w').writelines(value[1])

In [394]:
options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).streaming = False

In [None]:
with beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) as p:
    input_data = (
        p
        | 'read csv' >> beam.io.ReadFromText('./test.csv')
        | 'parse csv' >> beam.Map(parse_file)
        | 'convert dict' >> beam.Map(convert_dict)
    )
    height_average = (
        input_data
        | 'extract height' >> beam.Map(lambda x: x['height'])
        | 'compute mean height' >> beam.combiners.Mean.Globally()
    )
    weight_average = (
        input_data
        | 'extract weight' >> beam.Map(lambda x: x['weight'])
        | 'compute mean weight' >> beam.combiners.Mean.Globally()
    )
    output_mean = (
        (height_average,weight_average)
        | beam.Flatten()
        | beam.Map(lambda x: {"mean":x})        
        | 'Build byte array' >> beam.ParDo(ConvertToByteArray()) # S3への出力
        | 'Group' >> beam.GroupBy()
        | 'Write to S3' >> beam.Map(write_to_s3)
    )

In [None]:
# ref
# https://future-architect.github.io/articles/20220920a/
# https://dev.classmethod.jp/articles/cloud-dataflow_gcs2bq_python/
# https://python.plainenglish.io/using-apache-beam-and-aws-s3-storage-i-o-transforms-in-python-6cabe2a8d592
# https://towardsdatascience.com/data-pipelines-with-apache-beam-86cd8eb55fd8