In [None]:
pip install apache-beam[interactive]

In [None]:
import apache_beam as beam
import re
import apache_beam.runners.interactive.interactive_beam as ib

p1 = beam.Pipeline()

class FormatElements(beam.DoFn):
  def process(self,record):
    year, producers = record
    raw_str_producers = re.sub(r'\s*, and | and \s*|, ', ',', producers)
    producers = raw_str_producers.split(',')

    rows = []

    if (len(producers) > 0):
      for idx, producer in enumerate(producers):
        rows.append(producer + "," + str(year))

    return rows

class CalculateYearsInterval(beam.DoFn):
  def process(self,record):
    key, value = record
    years = []

    for idx, year in enumerate(value):
      years.append(year[1])

    years.sort()

    followingWin = int(years[len(years) -1])
    previousWin = int(years[len(years) -2])
    interval = (followingWin - previousWin)

    row = key, previousWin, followingWin, interval

    return [row]

load_csv = (
    p1
      | "Import movie list" >> beam.io.ReadFromText("movielist.csv", skip_header_lines = 1)
      | "Split by comma" >> beam.Map(lambda record: record.split(';'))
      | "Filter by winners" >> beam.Filter(lambda record: record[4] == "yes")
      | "Map producers and year columns" >> beam.Map(lambda record: [record[0], record[3]])
      | "Format elements" >> beam.ParDo(FormatElements())
      | "Split Producer and Year by comma" >> beam.Map(lambda producers: producers.split(','))
      | "Group By producer's key" >> beam.GroupBy(lambda producer: producer[0])
      | "Filter Producers with more than 2 awards" >> beam.Filter(lambda record: len(record[1]) > 1)
      | "Calculate Years Interval" >> beam.ParDo(CalculateYearsInterval())
      | "Write to Text" >> beam.io.WriteToText('results.txt')
      | "Print Results Year" >> beam.Map(print)
    )

p1.run()

['Allan Carr', '1980']
['Frank Yablans', '1981']
['Mitsuharu Ishii', '1982']
['Robert R. Weston', '1983']
['Bo Derek', '1984']
['Buzz Feitshans', '1985']
['Gloria Katz', '1986']
['Bob Cavallo', '1986']
['Joe Ruffalo', '1986']
['Steve Fargnoli', '1986']
['Bill Cosby', '1987']
['Ted Field', '1988']
['Robert W. Cort', '1988']
['Harve Bennett', '1989']
['Steven Perry', '1990']
['Joel Silver', '1990']
['Bo Derek', '1990']
['Joel Silver', '1991']
['Carol Baum', '1992']
['Howard Rosenman', '1992']
['Sherry Lansing', '1993']
['Buzz Feitshans', '1994']
['David Matalon', '1994']
['Charles Evans', '1995']
['Alan Marshall', '1995']
['Andrew Bergman', '1996']
['Mike Lobell', '1996']
['Kevin Costner', '1997']
['Steve Tisch', '1997']
['Jim Wilson', '1997']
['Ben Myron', '1998']
['Joe Eszterhas', '1998']
['Jon Peters', '1999']
['Barry Sonnenfeld', '1999']
['Jonathan D. Krane', '2000']
['Elie Samaha', '2000']
['John Travolta', '2000']
['Larry Brezner', '2001']
['Howard Lapides', '2001']
['Lauren Lloyd'

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7c158b00fcd0>

# New Section

In [None]:
import apache_beam as beam
#import re

p1 = beam.Pipeline()

movie_winners_by_year = (
    p1
      | "Import Data Year" >> beam.io.ReadFromText("movielist.csv", skip_header_lines = 1)
      | "Split by comma Year" >> beam.Map(lambda record: record.split(';'))
      | "Filter By Winners Year" >> beam.Filter(lambda record: record[4] == "yes")
      | "Map Producers Year2" >> beam.Map(lambda record: [record[0], record[3]])
      | "Print Results Year" >> beam.Map(lambda record: print(record[1]))
)


p1.run()