**Install Apache Beam**

In [31]:
pip install apache-beam[interactive]



**Mount Google Drive**

In [32]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


**Importing Libraries**

In [33]:
import apache_beam as beam

In [34]:
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode

**Extract the Id and SalePrice from the House Prices Dataset**

In [35]:
class ExtractAndConvertFieldsFn(beam.DoFn):
    def process(self, element):
        fields = element.split(',')
        house_id, sale_price = fields[0], float(fields[-1])
        return [(house_id, sale_price)]

**Compute the average sale price of house**

In [36]:
class ComputeAverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # sum, count

    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input[1], count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum / count if count else float('NaN')

**Create a pipeline**

In [37]:
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=['--allow_unsafe_triggers'])
p = beam.Pipeline(options=options)

**Read the CSV data, apply windowing, triggers and compute average sale price**

In [38]:
averages = (p
        | "Read CSV" >> beam.io.ReadFromText('/content/gdrive/MyDrive/Datasets/House Prices.csv', skip_header_lines=1)
        | "Extract and Convert Fields" >> beam.ParDo(ExtractAndConvertFieldsFn())
        | "Window" >> beam.WindowInto(FixedWindows(300),
                                      trigger=AfterCount(10),
                                      accumulation_mode=AccumulationMode.DISCARDING)
        | "Compute Average" >> beam.CombineGlobally(ComputeAverageFn()).without_defaults()
        | "Print" >> beam.Map(print))



**Run the pipeline**

In [39]:
# Run the pipeline
p.run()

180921.19589041095


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d8912d15240>

In [40]:
from apache_beam import PTransform

**Composite Transforms - Extracting fields from the CSV and computing the average bundled together in a composite transform**

In [41]:
class ExtractAndComputeAverage(PTransform):
    def expand(self, pcoll):
        return (pcoll
                | "Extract and Convert Fields" >> beam.ParDo(ExtractAndConvertFieldsFn())
                | "Compute Average" >> beam.CombineGlobally(ComputeAverageFn()).without_defaults())

In [42]:
# Create a pipeline
p = beam.Pipeline()

In [43]:
# Read the CSV data, apply our composite transform, and print the results
averages = (p
            | "Read CSV" >> beam.io.ReadFromText('/content/gdrive/MyDrive/Datasets/House Prices.csv', skip_header_lines=1)
            | "Extract Fields and Compute Average" >> ExtractAndComputeAverage()
            | "Print" >> beam.Map(print))

In [44]:
# Run the pipeline
p.run()

180921.19589041095


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d8912c8d360>