# 1. <font color='blue' face="Fixedsys, monospace" size="+2">ReadFromText</font> allows you to read a text file into a <font color='green' size="+2">PCollection</font>.

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### It's a good idea to start naming the steps for debugging and monitoring later. Names must be unique in the pipeline.

In [2]:
! rm /tmp/outputs*

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

regionsfilename = '/class/datasets/northwind/CSV/regions/regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.Map(lambda x : x.split(','))
          | 'Transform' >> beam.Map(lambda x : (int(x[0]), x[1].upper()))
          | 'Write' >> WriteToText('/tmp/outputs')
#          | 'Print' >> beam.Map(print)
    )
    #p.run() # implicit in Python when using with block

! cat /tmp/outputs*

(1, 'EASTERN')
(2, 'WESTERN')
(3, 'NORTHERN')
(4, 'SOUTHERN')


### Read from CSV and use <font color='green' size="+2">ParDo</font>.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        #return [(int(regionid), regionname)] # ParDo's need to return a list
        yield (int(regionid), regionname) # Can also use yield instead of returning a list
#        yield (int(regionid), regionname.upper()) # Include a transformation instead of doing it as a separate step

regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.ParDo(RegionParseTuple())
          #| 'Write' >> WriteToText('regions.out')
          | 'Print' >> beam.Map(print)
    )


## <img src="images/java.png" width=40 height=40 /><font color='indigo' size="+2">Java</font>

### Read from CSV and use <font color='blue' face="Fixedsys, monospace" size="+2">Map</font> with <font color='green' >lambda</font>.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.TextIO;

public class ReadRegions1 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", MapElements.into(TypeDescriptors.strings()).via((String element) -> element.toUpperCase()));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
}


### <font color='green' size="+2">ParDo</font> Example using anonymous class inline.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;

public class ReadRegions2 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";


        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    String element = c.element();
                    // String[] elements = element.split(",");
                    c.output(element + "*");
                }
            }));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
}



### <font color='green' size="+2">ParDo</font> using a defined class instead of an anonynous class.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;

public class ReadRegions3 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";


        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", ParDo.of(new AddStar()));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
    
    static class AddStar extends DoFn<String, String> {
        @ProcessElement
        public void process(@Element String line, OutputReceiver<String> out) {
            out.output(line + "*");
        }
    }
}



# __ __ __ __ __ __ __ __ __ __ __ __

# 2. Parse into a model class.


## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### Create a model based on <font color='blue' face="Fixedsys, monospace" size="+2">typing.NamedTuple</font> so you can use properties instead of keys for <font color='green' size="+2">dict</font> or position for <font color='green' size="+2">tuple</font> and use the <font color='blue' face="Fixedsys, monospace" size="+2">Filter</font> <font color='green' size="+2">PTransform</font> with <font color='blue' face="Fixedsys, monospace" size="+2">lambda</font>.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter 1' >> beam.Filter(lambda x : x.regionid % 2 == 0)
          | 'Filter 2' >> beam.Filter(lambda x : x.territoryname.startswith('S'))
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Use <font color='blue' face="Fixedsys, monospace" size="+2">Filter</font> with a UDF.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

def startsWithS(element):
    return element.territoryname.startswith('S')

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter' >> beam.Filter(startsWithS)
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Use a <font color='green' size="+2">ParDo</font> class to accomplish filtering.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

class StartsWithSFilter(beam.DoFn):
    def process(self, element):
        if element.territoryname.startswith('S'):
            yield element
            
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter' >> beam.ParDo(StartsWithSFilter())
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Put the parsing and filtering all into one <font color='green' size="+2">ParDo</font>.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        if territoryname.startswith('S'):
            yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


In [None]:
with beam.Pipeline() as p:
  # records = (p | 'Read' >> beam.io.ReadFromAvro('gs://joey-shared-bucket/datasets/northwind/AVRO/categories/categories.avro')
  records = (p | 'Read' >> beam.io.ReadFromText('gs://joey-shared-bucket/datasets/northwind/CSV/categories/categories.csv')
             | beam.Map(print))
    
    

## <img src="images/java.png" width=40 height=40 /><font color='indigo' size="+2">Java</font>

### Parse a CSV into a class and filter it using a <font color='green' size="+2">Pardo</font>.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("Filter", ParDo.of(new FilterTerritories()))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));

        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


### Parse a CSV into a class and filter it using and anonymous class to create the condition.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("Filter", Filter.by(new SerializableFunction<Territory, Boolean>() {
                @Override
                public Boolean apply(Territory t) {
                    return t.territoryID % 2 == 0 && t.territoryName.startsWith("S");
                }
            }))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


### Parse a CSV into a class and filter it in one step.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                if (territoryName.startsWith("S")) {
                    c.output(new Territory(territoryID, territoryName, regionID));
                }
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


### There are special methods like <font color='blue' face="Fixedsys, monospace" size="+2">whereFieldName</font> but they don't do anything differently than just using a regular <font color='green' size="+2">ParDo</font>. This code doesn't actually run, but shows what it would look like.

In [None]:
%%java verbose
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
//import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.schemas.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("Filter", Filter.<Territory>create().whereFieldName("regionID", (Long regionID) -> regionID == 1))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
}


# __ __ __ __ __ __ __ __ __ __ __ __

# 3. Create multiple outputs from a single read.

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### Send the same data down multiple paths, such as to group it on two different keys with one read from the source. Also show how to read AVRO.

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        yield Territory(int(element['territoryid']), element['territorydescription'], int(element['regionid']))

territoriesfilename = 'datasets/northwind/AVRO/territories/territories.avro'
with beam.Pipeline() as p:
    territories = (p | 'Read' >> beam.io.ReadFromAvro(territoriesfilename)
                     | 'Parse' >> beam.ParDo(TerritoryParseClass())
                  )

    # Branch 1
    (territories 
         | 'Lowercase' >> beam.Map(lambda x : (x.territoryid, x.territoryname.lower(), x.regionid))
         | 'Write Lower' >> WriteToText('/tmp/territories_lower.out')
    )
    
    # Branch 2
    (territories 
         | 'Uppercase' >> beam.Map(lambda x : (x.territoryid, x.territoryname.upper(), x.regionid))
         | 'Write Upper' >> WriteToText('/tmp/territories_upper.out')
    )

! echo "Lower" && cat /tmp/territories_lower.out* && echo "Upper" && cat /tmp/territories_upper.out*
    

### Branching uses <font color='blue' face="Fixedsys, monospace" size="+2">TaggedOutput</font> in the <font color='green' size="+2">ParDo</font> to split data into two different paths with different data on each. Also show how to read Parquet.

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class OddEvenTerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = int(element['territoryid']), element['territoryname'], int(element['regionid'])
        if int(regionid) % 2 == 0:
            yield pvalue.TaggedOutput('Even', Territory(int(territoryid), territoryname, int(regionid)))
        else:
            yield pvalue.TaggedOutput('Odd', Territory(int(territoryid), territoryname, int(regionid)))

territoriesfilename = 'datasets/northwind/PARQUET/territories/territories.parquet'

with beam.Pipeline() as p:
    territories = p | 'Read' >> beam.io.ReadFromParquet(territoriesfilename) 
    # territories would return a tuple of the two tagged outputs
    # unpack the two outputs to two separate variables to process differently
    evens, odds = territories | 'Parse' >> beam.ParDo(OddEvenTerritoryParseClass()).with_outputs("Even", "Odd")
    
    evens | 'Write Even' >> WriteToText('/tmp/territories_even.out')
    
    odds | 'Write Odd' >> WriteToText('/tmp/territories_odd.out')

! echo "Evens" && cat /tmp/territories_even.out* && echo "Odds" && cat /tmp/territories_odd.out*

## <img src="images/java.png" width=40 height=40 /><font color='indigo' size="+2">Java</font>

### Send the same output down two different paths.

In [None]:
! rm /tmp/territories*

In [None]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territory", ParDo.of(new ParseTerritories()))
        ;                   
        
            
        territories
            .apply("Upper", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toUpperCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_upper").withFormatFunction(new SerializeTerritory()));

        territories
            .apply("Lower", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toLowerCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_lower").withFormatFunction(new SerializeTerritory()));

        
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

}


In [None]:
! echo "Upper" && cat /tmp/territories_upper* && echo "Lower" && cat /tmp/territories_lower*


### Branching uses <font color='blue' face="Fixedsys, monospace" size="+2">TupleTag</font> to split the output into two separate paths.

In [None]:
! rm /tmp/territories*

In [None]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {

    final static TupleTag<Territory> evenTag = new TupleTag<Territory>() {};
    final static TupleTag<Territory> oddTag = new TupleTag<Territory>() {};

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollectionTuple territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("OddEvenSplit", ParDo.of(new ParseTerritoriesOddEvenSplit()).withOutputTags(evenTag, TupleTagList.of(oddTag)))
        ;                   
        
        PCollection<Territory> evenTerritories = territories.get(evenTag);
        evenTerritories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix + "_even").withFormatFunction(new SerializeTerritory()));

        PCollection<Territory> oddTerritories = territories.get(oddTag);
        oddTerritories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix + "_odd").withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritoriesOddEvenSplit extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritoriesOddEvenSplit.class);

        @ProcessElement
        public void process(ProcessContext c) {


            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                if (regionID % 2 == 0) {
                    c.output(evenTag, new Territory(territoryID, territoryName, regionID));
                } else {
                    c.output(oddTag, new Territory(territoryID, territoryName, regionID));
                }
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

}


In [None]:
! echo "Odd" && cat /tmp/outputs_odd* && echo "Even" && cat /tmp/outputs_even*


# __ __ __ __ __ __ __ __ __ __ __ __

# 4. Group and Join

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### <font color='blue' face="Fixedsys, monospace" size="+2">WithKeys</font> will reshape your data first, then <font color='blue' face="Fixedsys, monospace" size="+2">GroupByKey</font> will cluster the elements as a list under each unique key. The data must be in a <font color='green' size="+2">KV</font> tuple pair first. Also not how to read a JSON file.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
import json
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = int(element['territoryid']), element['territoryname'], int(element['regionid'])
        yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/JSON/territories/territories.json'

with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText(territoriesfilename)
                    | 'From json' >> beam.Map(json.loads)
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'Territories With Keys' >> beam.util.WithKeys(lambda x : x.regionid)
#                    | 'Group Territories' >> beam.GroupByKey() 
                    | 'Print Territories' >> beam.Map(print)
                  )


### <font color='blue' face="Fixedsys, monospace" size="+2">Combine</font> is equivalent to a SQL <font color='blue' face="Fixedsys, monospace" size="+2">GROUP BY</font> query.
### <font color='blue' face="Fixedsys, monospace" size="+2">SELECT key, sum(value) as total FROM source GROUP BY key.</font>

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', 10), ('a', 20), ('b', 30), ('b', 40), ('c', 50), ('a', 60)])
          | 'Combine' >> beam.CombinePerKey(sum)
          | 'Print' >> beam.Map(print)
    )


### Custom <font color='blue' face="Fixedsys, monospace" size="+2">CombineFn.</font>

In [None]:
import apache_beam as beam

class CustomCombine(beam.CombineFn):
    """
    This custom combiner will calculate the max of the first element, sum of the second element and a count of total elements
    The final step will also return the average of the second element.
    """
    def create_accumulator(self):
        # method defining how to create an empty accumulator
        return dict()

    def add_input(self, accumulator, input):
        # get the input and split it up for easier manipulation
        k, v = input
        # get the values from the accumulator for the input key or initialize it if it's the first time we see this key
        m, s, c = accumulator.get(k, (0, 0, 0))

        # take the max for the first element of the tuple and sum the second element and count for the third
        accumulator[k] = (v[0] if v[0] > m else m, s + v[1], c + 1)
        return accumulator

    def merge_accumulators(self, accumulators):
        # merge the accumulators from the various workers once they have finished accumulating locally
        merged = dict()
        for accum in accumulators:
          for k, v in accum.items():
            m, s, c = merged.get(k, (0, 0, 0))
            merged[k] = (v[0] if v[0] > m else m, s + v[1], c + v[2])
        return merged

    def extract_output(self, accumulator):
        # called when all the works accumulators have been merge to render the final output
        # return the max, the sum, the count and the average for the key
        return {k : (v[0], v[1], v[2], v[1]/v[2]) for k, v in accumulator.items()}

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', (1, 10)), ('a', (2, 20)), 
                                     ('b', (3, 30)), ('c', (5, 50)), 
                                     ('b', (4, 40)), ('a', (6, 60))])
          | 'Combine' >> beam.CombineGlobally(CustomCombine())
          | 'Print' >> beam.Map(print)
    )


### Create a nested repeating output.
### First, create a dataset. Here is Python code for the equivalent bq command of <font color='blue' face="Fixedsys, monospace" size="+2">bq mk dataflow</font>.


In [None]:
# same as doing bq mk dataflow

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set dataset_id to the ID of the dataset to create.
PROJECT_ID = 'qwiklabs-gcp-04-b1b7cded1c4b'
dataset_id = f"{PROJECT_ID}.dataflow" #.format(client.project)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# # Construct a full Dataset object to send to the API.
# dataset = bigquery.Dataset(dataset_id)


try:
    client.get_dataset(dataset_id)  # Make an API request.
    print("Dataset {} already exists".format(dataset_id))
except:
    print("Dataset {} is not found".format(dataset_id))
    dataset = bigquery.Dataset(dataset_id)
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
    
    

schema = [
    bigquery.SchemaField("regionid", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("regionname", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("territories", "RECORD", mode="REPEATED", 
            fields=[
                    bigquery.SchemaField("territoryid", "STRING", mode="REQUIRED"),
                    bigquery.SchemaField("territoryname", "STRING", mode="REQUIRED")
                   ]
                        )
]

# create table dataflow.region_territory
# (regionid NUMERIC
# ,regionname STRING
# ,territories ARRAY<STRUCT<territoryid NUMERIC, territoryname STRING>>)

table_id = f"{PROJECT_ID}.dataflow.region_territory"

try:
    table = client.get_table(table_id)  # Make an API request.
    print("Table {} already exists.".format(table_id))
    print(table)
except:
    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print("Table {} created.".format(table_id))



### The code here is tricky: 
### First, parse the two tables into <font color='green' size="+2">tuples</font>, <font color='black' face="Fixedsys, monospace" size="+1">(regionid, regionname)</font> & <font color='black' face="Fixedsys, monospace" size="+1">(regionid, {'territoryid':territoryid, 'territoryname':territoryname})</font>
### <font color='blue' face="Fixedsys, monospace" size="+2">CoGroupByKey</font> yields a shape like <font color='black' face="Fixedsys, monospace" size="+1">(regionid, {'regions':['regionname'], 'territories':[{}])</font> so we need to reshape it to <font color='green' size="+2">dicts</font> to write it to BQ.


In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield (int(regionid), regionname) # Can also use yield instead of returning a list

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(regionid), {'territoryid': int(territoryid), 'territoryname':territoryname})

class SortTerritories(beam.DoFn):
    #{'regionid': 1, 'regionname': 'Eastern', 'territories': [{'territoryid': 1730, 'territoryname': 'Bedford'}, 
    def process(self, element):
        territories = element['territories']
        element['territories'] = sorted(territories, key = lambda x : x['territoryid'])
        yield element

regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

#PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseTuple())
#                | 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#                    | 'Print Territories' >> beam.Map(print)
                  )
    nested = ( 
        {'regions':regions, 'territories':territories} 
              | 'Nest territories into regions' >> beam.CoGroupByKey()
              | 'Reshape to dict' >> beam.Map(lambda x : {'regionid': x[0], 'regionname': x[1]['regions'][0], 
                                                        'territories': x[1]['territories']})
              | 'Sort by territoryid' >> beam.ParDo(SortTerritories())
#              | 'Print' >> beam.Map(print)
    )
    nested | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
                                                                             , project = PROJECT_ID
                                                                             , method = 'STREAMING_INSERTS'
                                                                             )
#    nested | 'Print' >> beam.Map(print)
             
#help(beam.io.WriteToBigQuery)    
#(1, {'regions': ['Eastern'], 'territories': [{'territoryid': 1730, 'territoryname': 'Bedford'}, {'territoryid': 1581, 'territoryname': 'Westboro'}, {'territoryid': 1833, 'territoryname': 'Georgetow'}, {'territoryid': 2116, 'territoryname': 'Bosto
#{'regionid': 1, 'regionname':'Eastern', 'territories' : [{'territoryid':1, 'territoryname':'name1'}, {}, {}]}

### Query the table to show it was populated.

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

table_id = f"{PROJECT_ID}.dataflow.region_territory"

query_job = client.query(f"""SELECT * FROM {table_id}""")

results = query_job.result()  # Waits for job to complete.
display(list(results))

### Helper functions to make a generic transform to nest children.

In [None]:
import apache_beam as beam

class NestJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key, sort = lambda x : x):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key
        self.sort = sort

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToDict(item):
            ret = {self.parent_key : item[0]}
            ret.update(item[1][self.parent_pipeline_name][0])
            ret[self.child_pipeline_name] = item[1][self.child_pipeline_name]
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToDict)
                  | f'Sort the nested data' >> beam.Map(self.sort)
            
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}
      
class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

def sort_territories(element):
    territories = element['territories']
    element['territories'] = list(sorted(territories, key = lambda x : x['territoryid']))
    return element

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | NestJoin('regions', 'regionid', 'territories', 'regionid', sort = sort_territories)
    nestjoin | 'Print Nest Join' >> beam.Map(print)
#     nestjoin | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
#                                                                              , project = PROJECT_ID
#                                                                              , method = 'STREAMING_INSERTS'
#                                                                              )



### Simulate an Outer Join with <font color='blue' face="Fixedsys, monospace" size="+2">CoGroup</font>.

In [None]:
import apache_beam as beam

class LeftJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToFlatDict(item):
            parent = {self.parent_key : item[0]}
            parent.update(item[1][self.parent_pipeline_name][0])
            ret = []
            for row1 in item[1][self.child_pipeline_name]:
                row = parent.copy()
                row.update(row1)
                ret.append(row)
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToFlatDict)
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | LeftJoin('regions', 'regionid', 'territories', 'regionid')
    nestjoin | 'Print Nest Join' >> beam.Map(print)



## <img src="images/java.png" width=40 height=40 /><font color='indigo' size="+2">Java</font>

### For Java, you don't need to group into KV shape first; instead you could use the <font color='blue' face="Fixedsys, monospace" size="+2">Group</font> and <font color='blue' face="Fixedsys, monospace" size="+2">Select</font> methods.

In [None]:
%%java 
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.transforms.Convert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Result> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("GroupBy regionID", Group.<Territory>byFieldNames("regionID")
                                            .aggregateField("territoryID", Count.combineFn(), "cnt"))
            .apply("Select", Select.fieldNames("key.regionID", "value.cnt"))
            .apply(Convert.fromRows(Result.class))
                   
        ;                   
        
        territories.apply(TextIO.<Result>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeResult()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Result {
        Long regionID;
        Long cnt;
        
        Result() {}
        
        Result(Long regionID, Long cnt) {
            this.regionID = regionID;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, cnt = %d)", regionID, cnt);
        }
    }
    static class SerializeResult implements SerializableFunction<Result, String> {
        @Override
        public String apply(Result input) {
          return input.toString();
        }
    }
}


### For the <font color='blue' face="Fixedsys, monospace" size="+2">JOIN</font> extension function, you still need to shape the data into a KV pair and then unnest it when done.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.extensions.joinlibrary.Join;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.transforms.WithKeys;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JoinTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();
        
        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<KV<Long, Region>> regions = p
            .apply("Read Regions", TextIO.read().from(regionsInputFileName))
            .apply("Parse Regions", ParDo.of(new CSVToRegion()))
            .apply("Regions KV", WithKeys.of(new SerializableFunction<Region, Long>() {
                @Override
                public Long apply(Region r) {
                  return r.regionid;
                }}));
          ;
        
        PCollection<KV<Long, Territory>> territories = p
            .apply("Read Territories", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territories", ParDo.of(new ParseTerritories()))
            .apply("Territories KV", WithKeys.of(new SerializableFunction<Territory, Long>() {
                @Override
                public Long apply(Territory t) {
                  return t.regionid;
                }}));
          ;
        
        PCollection<KV<Long, KV<Region, Territory>>> result =
            Join.innerJoin(regions, territories);  
        
        PCollection<Result> result2 = result
        
            .apply("Unnest KV", ParDo.of(new DoFn<KV<Long, KV<Region, Territory>>, Result>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    KV<Long, KV<Region, Territory>> e = c.element();
                    Long regionid = e.getKey();
                    KV<Region, Territory> v = e.getValue();
                    Region r = v.getKey();
                    Territory t = v.getValue(); 
                    String regionname = r.regionname;
                    Long territoryid = t.territoryid;
                    String territoryname = t.territoryname;
                    //c.output(new Result(1L, "regionname", 2L, "territoryname"));
                    c.output(new Result(regionid, regionname, territoryid, territoryname));
                }
                
            })
            );

        
        result2.apply(TextIO.<Result>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeResult()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Region {
        Long regionid;
        String regionname;
        
        Region() {}
        
        Region(Long regionid, String regionname) {
            this.regionid = regionid;
            this.regionname = regionname;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, regionname = %s)", regionid, regionname);
        }
    }
    
    static class SerializeRegion implements SerializableFunction<Region, String> {
        @Override
        public String apply(Region input) {
          return input.toString();
        }
    }

    static class CSVToRegion extends DoFn<String, Region> {
        private static final Logger LOG = LoggerFactory.getLogger(CSVToRegion.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long regionid = Long.parseLong(columns[0].trim());
                String regionname = columns[1].trim();
                c.output(new Region(regionid, regionname));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("CSVToRegion: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Territory {
        Long territoryid;
        String territoryname;
        Long regionid;
        
        Territory() {}
        
        Territory(long territoryid, String territoryname, long regionid) {
            this.territoryid = territoryid;
            this.territoryname = territoryname;
            this.regionid = regionid;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryid = %d, territoryname = %s, regionid = %d)", territoryid, territoryname, regionid);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryid = Long.parseLong(columns[0].trim());
                String territoryname = columns[1].trim();
                Long regionid = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryid, territoryname, regionid));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Result {
        Long regionid;
        String regionname;
        Long territoryid;
        String territoryname;
        
        Result() {}
        
        Result(Long regionid, String regionname, Long territoryid, String territoryname) {
            this.regionid = regionid;
            this.regionname = regionname;
            this.territoryid = territoryid;
            this.territoryname = territoryname;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, regionname = %s, territoryid = %d, territoryname = %s)", regionid, regionname, territoryid, territoryname);
        }
    }
    static class SerializeResult implements SerializableFunction<Result, String> {
        @Override
        public String apply(Result input) {
          return input.toString();
        }
    }
}
                   
                   
// KV{3, KV{(regionid = 3, regionname = Northern), (territoryid = 3801, territoryname = Portsmouth, regionid = 3)}}

# __ __ __ __ __ __ __ __ __ __ __ __

# 5. BeamSQL

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### SQL Transform uses <font color='green' size="+2">PCOLLECTION</font> as the name of a single source passed into it.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
    
    def __str__(self):
        return f'territoryid = {self.territoryid} territoryname = {self.territoryname} regionid = {self.regionid}'
coders.registry.register_coder(Territory, coders.RowCoder)
        
@beam.typehints.with_output_types(Territory)
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname.title(), int(regionid))
    
class RegionCount(typing.NamedTuple):
    regionid: int
    count: int
    
    def __str__(self):
        return f'regionid = {self.regionid} count = {self.count}'
coders.registry.register_coder(RegionCount, coders.RowCoder)
        
        
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
#                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass()).with_output_types(Territory) # if we didn't have with_output_types decorator
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'SQL Territories' >> SqlTransform("""SELECT regionid, count(*) as `count` FROM PCOLLECTION GROUP BY regionid""")
#                    | 'Map Territories for Print' >> beam.Map(lambda x : f'regionid = {x.regionid}  count = {x.count}')
#                    | 'Convert to RegionCount Class' >> beam.Map(lambda x : RegionCount(x.regionid, x.count))
                    | 'Print SQL' >> beam.Map(print)
                    )
    


### For a SQL query that has more than one source, bundle the sources together in a dictionary, they keys become the table names inside the SQL string.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

with beam.Pipeline() as p:
    parent = (
            p | 'Create Parent' >> beam.Create([(1, 'Vowel'), (2, 'Consonant'), (4, 'Unknown')])
              | 'Map Parent' >> beam.Map(lambda x : beam.Row(parent_id = x[0], parent_name = x[1]))
    )

    child = (
            p | 'Create Child' >> beam.Create([('Alpha', 1), ('Beta', 2), ('Gamma', 2), ('Delta', 2), ('Epsilon', 1), ('Pi', 3)])
              | 'Map Child' >> beam.Map(lambda x : beam.Row(child_name = x[0], parent_id = x[1]))
    )
    
    result = ( {'parent': parent, 'child' : child} 
         | SqlTransform("""
             SELECT p.parent_id, p.parent_name, c.child_name 
             FROM parent as p 
             INNER JOIN child as c ON p.parent_id = c.parent_id
             """)
        | 'Format Output' >> beam.Map(lambda x : f'{x.parent_id}, {x.parent_name}, {x.child_name}')
        | 'Print Join' >> beam.Map(print)
        )


### Real example

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

class Region(typing.NamedTuple):
    regionid: int
    regionname: str
beam.coders.registry.register_coder(Region, beam.coders.RowCoder)
        
class RegionParseClass(beam.DoFn):
    def process(self, element):
        yield Region(int(element['regionid']), element['regiondescription'])

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        yield Territory(int(element['territoryid']), element['territorydescription'], int(element['regionid']))

class Result(typing.NamedTuple):
    regionid: int
    regionname: str
    cnt: int
beam.coders.registry.register_coder(Result, beam.coders.RowCoder)
               
regionsfilename = 'datasets/northwind/AVRO/regions/regions.avro'
territoriesfilename = 'datasets/northwind/AVRO/territories/territories.avro'
with beam.Pipeline() as p:
    regions = (p | 'Read Regions' >> beam.io.ReadFromAvro(regionsfilename)
                     | 'Parse Regions' >> beam.ParDo(RegionParseClass())
                  )
    territories = (p | 'Read Territories' >> beam.io.ReadFromAvro(territoriesfilename)
                     | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                  )

    result = ( {'regions': regions, 'territories' : territories} 
         | SqlTransform("""
SELECT r.regionid AS regionid, r.regionname AS regionname, SUM(1) AS cnt 
FROM regions AS r 
JOIN territories AS t on t.regionid = r.regionid 
GROUP BY r.regionid, r.regionname
""")
        | 'Convert to Result Class' >> beam.Map(lambda x : Result(x.regionid, x.regionname, x.cnt))
#        | 'Format Output' >> beam.Map(lambda x : f'{x.regionid}, {x.regionname}, {x.cnt}')
        | 'Print Join' >> beam.Map(print)
             )


## <img src="images/java.png" width=40 height=40 /><font color='indigo' size="+2">Java</font>

### Beam SQL using Pojo with a simple query.

In [None]:
%%java verbose
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.transforms.Convert;
import com.google.gson.Gson;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ReadTerritories {
    public static void main(String[] args) {
        System.getProperties().put("org.apache.commons.logging.simplelog.defaultlog","fatal");

        Pipeline p = Pipeline.create();
        p.getSchemaRegistry().registerPOJO(Territory.class);
 
        String territoriesInputFileName = "datasets/northwind/JSON/territories/territories.json";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> result = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new JsonToTerritory()))
            .apply(SqlTransform.query("SELECT territoryid, upper(territoryname) as territoryname, regionid FROM PCOLLECTION WHERE regionid = 1"))
            .apply(Convert.fromRows(Territory.class))
        ;

        /*
        result.apply(MapElements.via(
            new SimpleFunction<Territory, Territory>() {
              @Override
              public Territory apply(Territory t) {
                System.out.println("** " + t);
                return t;
              }
            })); 
        */
        
        result.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        
        p.run().waitUntilFinish();
    }
    

    @DefaultSchema(JavaFieldSchema.class)
    static class Territory {
        Long territoryid;
        String territoryname;
        Long regionid;
        
        Territory() {}
        
        Territory(long territoryid, String territoryname, long regionid) {
            this.territoryid = territoryid;
            this.territoryname = territoryname;
            this.regionid = regionid;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryid, territoryname, regionid);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class JsonToTerritory extends DoFn<String, Territory> {
        @ProcessElement
        public void process(@Element String json, OutputReceiver<Territory> r) throws Exception {
            Gson gson = new Gson();
            Territory t = gson.fromJson(json, Territory.class);
            r.output(t);
        }
    }
}


### Beam SQL using multiple sources.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.transforms.Convert;
import com.google.gson.Gson;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import java.io.Serializable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ReadTerritories {
    public static void main(String[] args) {
        System.getProperties().put("org.apache.commons.logging.simplelog.defaultlog","fatal");

        Pipeline p = Pipeline.create();
        p.getSchemaRegistry().registerPOJO(Region.class);
        p.getSchemaRegistry().registerPOJO(Territory.class);
        p.getSchemaRegistry().registerPOJO(Result.class);
 
        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String territoriesInputFileName = "datasets/northwind/JSON/territories/territories.json";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Region> regions = p
            .apply("Read Regions", TextIO.read().from(regionsInputFileName))
            .apply("Parse Regions", ParDo.of(new CSVToRegion()));

        PCollection<Territory> territories = p
            .apply("Read Territories", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territories", ParDo.of(new JsonToTerritory()));
        
         PCollectionTuple joinSources = PCollectionTuple
                                        .of(new TupleTag<>("regions"), regions)
                                        .and(new TupleTag<>("territories"), territories);                                          
                                                    


        PCollection<Result> result = joinSources
            .apply(SqlTransform.query("SELECT r.regionid AS regionid, r.regionname AS regionname, SUM(1) AS cnt FROM regions AS r JOIN territories AS t on t.regionid = r.regionid group by r.regionid, r.regionname"))
            .apply(Convert.fromRows(Result.class))
        ;

        result.apply(TextIO.<Result>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeResult()));
        
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Region {
        Long regionid;
        String regionname;
        
        Region() {}
        
        Region(Long regionid, String regionname) {
            this.regionid = regionid;
            this.regionname = regionname;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, regionname = %s)", regionid, regionname);
        }
    }
    
    static class SerializeRegion implements SerializableFunction<Region, String> {
        @Override
        public String apply(Region input) {
          return input.toString();
        }
    }

    static class CSVToRegion extends DoFn<String, Region> {
        private static final Logger LOG = LoggerFactory.getLogger(CSVToRegion.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long regionid = Long.parseLong(columns[0].trim());
                String regionname = columns[1].trim();
                c.output(new Region(regionid, regionname));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("CSVToRegion: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Territory {
        Long territoryid;
        String territoryname;
        Long regionid;
        
        Territory() {}
        
        Territory(Long territoryid, String territoryname, Long regionid) {
            this.territoryid = territoryid;
            this.territoryname = territoryname;
            this.regionid = regionid;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryid = %d, territoryname = %s, regionID = %d)", territoryid, territoryname, regionid);
        }
        /*
        @Override
        public boolean equals (Object o) {
            if (o == this)
                return true;
            return false;
         }
        */
    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class JsonToTerritory extends DoFn<String, Territory> {
        @ProcessElement
        public void process(@Element String json, OutputReceiver<Territory> r) throws Exception {
            Gson gson = new Gson();
            Territory t = gson.fromJson(json, Territory.class);
            r.output(t);
        }
    }
     
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Result {
        Long regionid;
        String regionname;
        int cnt;
        
        Result() {}
        
        Result(Long regionid, String regionname, int cnt) {
            this.regionid = regionid;
            this.regionname = regionname;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, regionname = %s, cnt = %d)", regionid, regionname, cnt);
        }
        /*
        @Override
        public boolean equals (Object o) {
            if (o == this)
                return true;
            return false;
         }
        */
    }
    
    static class SerializeResult implements SerializableFunction<Result, String> {
        @Override
        public String apply(Result input) {
          return input.toString();
        }
    }
}


### Example from Beam documentation.

In [None]:
%%java verbose
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;

/**
 * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
 *
 * <p>Run the example from the Beam source root with
 *
 * <pre>
 *   ./gradlew :sdks:java:extensions:sql:runBasicExample
 * </pre>
 *
 * <p>The above command executes the example locally using direct runner. Running the pipeline in
 * other runners require additional setup and are out of scope of the SQL examples. Please consult
 * Beam documentation on how to run pipelines.
 */
class BeamSqlExample {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline p = Pipeline.create(options);

    // define the input row format
    Schema type =
        Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();

    Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();
    Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
    Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();

    // create a source PCollection with Create.of();
    PCollection<Row> inputTable =
        PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(type));

    // Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
    PCollection<Row> outputStream =
        inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));

    // print the output record of case 1;
    outputStream
        .apply(
            "log_result",
            MapElements.via(
                new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                    // expect output:
                    //  PCOLLECTION: [3, row, 3.0]
                    //  PCOLLECTION: [2, row, 2.0]
                    System.out.println("PCOLLECTION: " + input.getValues());
                    return input;
                  }
                }))
        .setRowSchema(type);

    // Case 2. run the query with SqlTransform.query over result PCollection of case 1.
    PCollection<Row> outputStream2 =
        PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
            .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));

    // print the output record of case 2;
    outputStream2
        .apply(
            "log_result",
            MapElements.via(
                new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                    // expect output:
                    //  CASE1_RESULT: [row, 5.0]
                    System.out.println("CASE1_RESULT: " + input.getValues());
                    return input;
                  }
                }))
        .setRowSchema(
            Schema.builder().addStringField("stringField").addDoubleField("doubleField").build());

    p.run().waitUntilFinish();
  }
}



### Beam SQL using Pojo into a Result Pojo.

In [None]:
%%java 
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.transforms.Convert;
import com.google.gson.Gson;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ReadTerritories {
    public static void main(String[] args) {
        System.getProperties().put("org.apache.commons.logging.simplelog.defaultlog","fatal");

        Pipeline p = Pipeline.create();
        p.getSchemaRegistry().registerPOJO(Result.class);
 
        String territoriesInputFileName = "datasets/northwind/JSON/territories/territories.json";
        String outputsPrefix = "/tmp/outputs";

        // Define the schema to hold the results.
        Schema resultSchema = Schema.of(
            Schema.Field.of("regionid", Schema.FieldType.INT64), 
            Schema.Field.of("cnt", Schema.FieldType.INT64));

        PCollection<Result> result = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new JsonToTerritory()))
            .apply(SqlTransform.query("SELECT regionid, COUNT(*) as cnt FROM PCOLLECTION GROUP BY regionid"))
            .apply(Convert.fromRows(Result.class))
        ;
        
        result.apply(TextIO.<Result>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeResult()));
        
        p.run().waitUntilFinish();
    }
    

    @DefaultSchema(JavaFieldSchema.class)
    static class Territory {
        Long territoryid;
        String territoryname;
        Long regionid;
        
        Territory() {}
        
        Territory(long territoryid, String territoryname, long regionid) {
            this.territoryid = territoryid;
            this.territoryname = territoryname;
            this.regionid = regionid;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryid, territoryname, regionid);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class JsonToTerritory extends DoFn<String, Territory> {
        @ProcessElement
        public void process(@Element String json, OutputReceiver<Territory> r) throws Exception {
            Gson gson = new Gson();
            Territory t = gson.fromJson(json, Territory.class);
            r.output(t);
        }
    }
     
    @DefaultSchema(JavaFieldSchema.class)
    static class Result {
        Long regionid;
        Long cnt;
        
        Result() {}
        
        Result(Long regionid, Long cnt) {
            this.regionid = regionid;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionid = %d, cnt = %d)", regionid, cnt);
        }
        @Override
        public boolean equals (Object o) {
            if (o == this)
                return true;
            return false;
         }
    }
    
    static class SerializeResult implements SerializableFunction<Result, String> {
        @Override
        public String apply(Result input) {
          return input.toString();
        }
    }
}


### BeamSQL Java working wrong way with schemas.

In [None]:
%%java verbose nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.transforms.Convert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
        ;                   
        
        // Define the schema for the records.
        Schema territorySchema = Schema
          .builder()
          .addInt64Field("territoryID")
          .addStringField("territoryName")
          .addInt64Field("regionID")
          .build();
        // Define the schema to hold the results.
        
        Schema resultSchema = Schema.of(
            Schema.Field.of("regionID", Schema.FieldType.INT64), 
            Schema.Field.of("cnt", Schema.FieldType.INT64));
        
        // Convert them to Rows with the same schema as defined above via a DoFn.
        PCollection<Row> territories2 = territories
          .apply(
          ParDo.of(new DoFn<Territory, Row>() {
            @ProcessElement
            public void process(ProcessContext c) {
              // Get the current POJO instance
              Territory t = c.element();

              // Create a Row with the appSchema schema
              // and values from the current POJO
              Row territoryRow =
                    Row
                      .withSchema(territorySchema)
                      .addValues(
                        t.territoryID,
                        t.territoryName,
                        t.regionID)
                      .build();

              // Output the Row representing the current POJO
              c.output(territoryRow);
            }
          })).setRowSchema(territorySchema);
        
          PCollection<Row> territories3 = territories2.apply(Convert.toRows()).apply(
             SqlTransform.query("SELECT regionID, COUNT(*) as cnt from PCOLLECTION GROUP BY regionID")).setRowSchema(resultSchema);
        
          territories3.apply(
              "Print", MapElements.via(new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                      System.out.println("SQL Result: " + input.getValues());
                      return input;
                  }
              }
          )).setRowSchema(resultSchema);
//        territories3.apply(TextIO.<Row>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }

/*    
    @schemultSchema(AutoValueSchema.class)
    @AutoValue
    public static abstract class Territory {
      public abstract Long getTerritoryID();
      public abstract String getTerritoryName();
      public abstract Long getRegionID();

      @SchemaCreate
      public static Territory create(Long territoryID, String territoryName, Long regionID) {
        return new AutoValue_TerritoryClass(territoryID, territoryName, regionID);
      }
*/    
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }
    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    

    
     
/*    
    
    
    @DefaultCoder(AvroCoder.class)
    static class Region {
        Long regionID;
        Long cnt regionName;
        
        Region() {}
        
        Region(long regionID, long cnt) {
            this.regionID = regionID;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionID = %d, cnt = %d)", regionID, cnt);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Region, String> {
        @Override
        public String apply(Region input) {
          return input.toString();
        }
    }

    
    
private class Transform extends PTransform<pcollectionlist<row>, PCollection<row>> {
 
    @Override
    public PCollection<row> expand(PCollectionList<row> pinput) {
      checkArgument(
          pinput.size() == 1,
          "Wrong number of inputs for %s: %s",
          BeamUncollectRel.class.getSimpleName(),
          pinput);
      PCollection<row> upstream = pinput.get(0);
 
      // Each row of the input contains a single array of things to be emitted; Calcite knows
      // what the row looks like
      Schema outputSchema = CalciteUtils.toSchema(getRowType());
 
      PCollection<row> uncollected =
          upstream.apply(ParDo.of(new UncollectDoFn(outputSchema))).setRowSchema(outputSchema);
 
      return uncollected;
    }
  }    
    static class ParseRegions extends DoFn<Row, Region> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    
    
*/
    
    
}


# __ __ __ __ __ __ __ __ __ __ __ __

# 6. <font color='green' size="+2">DoFn</font> Lifecycle

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### <font color='green' size="+2">DoFn</font> Lifecycle

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def setup(self):
        self.lookup = {1:'North', 2:'South', 3:'East', 4:'West'}
        print('setup')
        
    def start_bundle(self):
        print('start bundle')
        
    def process(self, element, uppercase = 0):
        #lookuptable = {1:'North', 2:'South', 3:'East', 4:'West'}
        territoryid, territoryname, regionid = element
        region = self.lookup.get(regionid, 'No Region')
        if uppercase == 1:
            region = region.upper()
        yield(territoryid, territoryname, regionid, region)
        
    def finish_bundle(self):
        print('finish bundle')

    def teardown(self):
        print('teardown')
        del self.lookup
    

with beam.Pipeline() as p:
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), uppercase = 1 ) 
        | 'Print Loopup' >> beam.Map(print)
    )
        


# __ __ __ __ __ __ __ __ __ __ __ __

# 7. Side Inputs

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### Side inputs are about passing extra parameters to a function where the parameters are calculated in the pipeline itself.

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText
from apache_beam.transforms.combiners import Sample

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element, uppercase = '0'):
        # It's a bit weird here but what is passed in is a single element array of a string
        #print('***', uppercase)
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname if uppercase[0] == '0' else territoryname.upper(), int(regionid))

        
with beam.Pipeline() as p:
    sideinput = (
        p | 'Read sideinput.txt' >> ReadFromText('sideinput.txt')
          | Sample.FixedSizeGlobally(1)
    )
    
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = ["0"]) # This is not a side input but just passing a fixed parameter
#          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = sideinput)  # fails because sideinput is a PCollection not an integer
#          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = beam.pvalue.AsSingleton(sideinput))  # When the parameter is calculated in the pipeline itself, that makes it a side input
          | 'Print Loopup' >> beam.Map(print)
    )

#    maxregion | 'Print Min' >> beam.Map(print)


### Side input that is a lookup list.
### More realistic example where the entire lookup table is read in the pipeline then distributed to each worker as a side input.

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsList
from apache_beam.io import ReadFromText

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid': int(regionid), 'regionname': regionname.title()}

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def process(self, element, lookuptable = [{'regionid':1, 'regionname':'North'}, {'regionid':2, 'regionname':'South'}]):
        # {1:'North', 2:'South'}
        territoryid, territoryname, regionid = element
        # Becase the regions PCollection is a different shape, use the following comprehension to make it easier to do a lookup
        lookup = {e['regionid'] : e['regionname'] for e in lookuptable } # {1:'North', 2:'South'}
        yield(territoryid, territoryname, regionid, lookup.get(regionid, 'No Region'))

with beam.Pipeline() as p:
    regions = (
        p | 'Read Regions' >> ReadFromText('regions.csv')
          | 'Parse Regions' >> beam.ParDo(RegionParseDict())
#          | 'Print Regions' >> beam.Map(print)
    )

    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#          | 'Print Territories' >> beam.Map(print)
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), lookuptable = beam.pvalue.AsList(regions))
        | 'Print Loopup' >> beam.Map(print)
    )
        


# __ __ __ __ __ __ __ __ __ __ __ __

# 8. Streaming Sources

## Streaming sources in Beam are not too different than they are in Spark. Running a pipeline with streaming sources is tricky to do because the direct runner does not support running them. Instead, you need to submit jobs to a runner like Flink or Spark or Google DataFlow. For the rest of the code examples, we won't be able to run them here because the environment is not set up to allow it. However, we can use a bounded data source to simulate what we would see.

## <img src="images/python.png" width=40 height=40 /><font color='cadetblue' size="+2">Python</font>

### We need the correct packages installed. This is already done on this machine, so these are for reference.

In [None]:
! pip install apache-beam
! pip install apache-beam[gcp]
! pip install apache-beam[interactive]

## A basic example of reading from a Kafka streaming source.

In [None]:
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window

brokers = 'localhost:9092'

kafka_config = {
                  'bootstrap.servers': brokers
                }

pipeline_options = PipelineOptions(streaming = True)
kafka_topic = 'stocks-json'

'''
These are a sample of some of the types of options we would have to send
     runner = "DirectRunner"
     runner = "SparkRunner"
     runner = "FlinkRunner"
     , flink_master="localhost:8081"
     , environment_type="LOOPBACK"
     , streaming="true"
     , checkpointing_interval=1000
     , environment_type = "DOCKER"
'''

with beam.Pipeline(options = pipeline_options) as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromKafka(consumer_config = kafka_config, topics=[kafka_topic]) 
          | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(10))
          | 'Print' >> beam.Map(print)
        )


In [None]:
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions

kafka_config = {
                  'bootstrap.servers': brokers
                }

pipeline_options = PipelineOptions(streaming = True)
kafka_tope = 'stocks-json'

def convert_kafka_record_to_dictionary(record):
    # the records have 'value' attribute when --with_metadata is given
    if hasattr(record, 'value'):
      stock_bytes = record.value
    elif isinstance(record, tuple):
      stock_bytes = record[1]
    else:
      raise RuntimeError('unknown record type: %s' % type(record))
    # Converting bytes record from Kafka to a dictionary.
    import ast
    stock = ast.literal_eval(stock_bytes.decode("UTF-8"))
    output = {
        key: stock[key]
        for key in ['timestamp', 'symbol', 'price']
    }
    if hasattr(record, 'timestamp'):
      # timestamp is read from Kafka metadata
      output['timestamp'] = record.timestamp
    print (record, output)
    return output


with beam.Pipeline(options = pipeline_options) as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromKafka(consumer_config = kafka_config, topics=[kafka_topic]) 
          | 'Convert Message' >> beam.Map(convert_kafka_record_to_dictionary)
          | 'Window' >> beam.WindowInto(FixedWindows(10))
#          | beam.Map(lambda x : ('x', 1))
          | beam.CombinePerKey(sum)          
          | 'Print' >> beam.Map(print)
        )



## Some examples of what we might do once we start reading the messages are similar to what we'd do in Spark. We'd have to convert the message into a usable structed object shape, then do whatever filtering, aggregating, windowing, etc. we need, and then write the final data out to some sink or destination, like another message in Kafka or PubSub, or write it to a database.

In [None]:
# Test code to convert a sample message into a structured object
from typing import NamedTuple
import uuid


class Trade(NamedTuple):
    key: str
    timestamp: int
    symbol: str
    event_time: float
    price: float
    quantity: int
    
    
def convert_dict_to_stock(record: dict) -> Trade:
    return Trade(**record)

sample_message = { "key": "6a826a01-4b23-4713-8a3e-f8f2d720ba07"
    , "timestamp": 1645290182886
    , "event_time": "2022-02-19 17:00:42"
    , "symbol": "MSFT"
    , "price": 154.49
    , "quantity": 164}

print(convert_dict_to_stock(sample_message))



## Let's read data from a file simulating like it's from a streaming source, so we can see how to convert it and aggregate it.

In [None]:
import apache_beam as beam
import json
from apache_beam.io import ReadFromText, WriteToText
#from apache_beam.combiners import Sample

class Trade(NamedTuple):
    key: str
    timestamp: int
    symbol: str
    event_time: float
    price: float
    quantity: int
    
def convert_dict_to_stock(record):
    x = json.loads(record)
    msg = x['value']
    msg['key'] = x['key']
    msg['timestamp'] = x['timestamp']
    return Trade(**msg)

stocks_filename = 'trades.txt'
with beam.Pipeline() as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
         # | 'Convert message to a dictionary' >> beam.Map(convert_kafka_record_to_dictionary)
          | 'Convert to Trade Object' >> beam.Map(convert_dict_to_stock)
          | 'MSFT trades' >> beam.Filter(lambda t : t.symbol == 'MSFT')
          | 'Limit 10' >> beam.combiners.Sample.FixedSizeGlobally(10)
          | 'Print' >> beam.Map(print)
        )

# Trade(key='de76c3b3-7b4c-4fcb-a4ff-ab4cca374f9d', timestamp=1645290182886, symbol='AAPL', event_time='2022-02-19 17:37:13', price=266.92, quantity=143)


## This is a really simple aggregate that sums up the total number of trades for each stock for the entire dataset.

In [None]:
import apache_beam as beam
import json
from apache_beam.io import ReadFromText, WriteToText

class Trade(NamedTuple):
    key: str
    timestamp: int
    symbol: str
    event_time: float
    price: float
    quantity: int
    
def convert_dict_to_stock(record):
    x = json.loads(record)
    msg = x['value']
    msg['key'] = x['key']
    msg['timestamp'] = x['timestamp']
    return Trade(**msg)

stocks_filename = 'trades.txt'
with beam.Pipeline() as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
          | 'Convert to Trade Object' >> beam.Map(convert_dict_to_stock)
          | 'Aggregate 1' >> beam.GroupBy('symbol').aggregate_field('quantity', sum, 'total_quantity')
          | 'Print' >> beam.Map(print)
        )



## Now let's introduce windowing to do micro-aggregations on the data for a fixed window. In this case, we will do 1 day (60 seconds * 60 minutes * 24 hours)


In [None]:
from apache_beam import window

import apache_beam as beam
import json
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.window import FixedWindows

class Trade(NamedTuple):
    key: str
    timestamp: int
    symbol: str
    event_time: float
    price: float
    quantity: int
    
def convert_dict_to_stock(record):
    x = json.loads(record)
    msg = x['value']
    msg['key'] = x['key']
    msg['timestamp'] = x['timestamp']
    return Trade(**msg)

def add_timestamp(element):
    ## This adds an element to the PCollection to be used for determining which window an item falls into
    unix_timestamp = element.timestamp
    event_time = element.event_time
    return beam.window.TimestampedValue(element, unix_timestamp)

stocks_filename = 'trades.txt'
with beam.Pipeline() as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
          | 'Convert to Trade Object' >> beam.Map(convert_dict_to_stock)
          | 'Timestamp the Trade' >> beam.Map(add_timestamp)
          | 'Window' >> beam.WindowInto(FixedWindows(60 * 60 * 24))
          | 'Aggregate 1' >> beam.GroupBy('symbol').aggregate_field('quantity', sum, 'total_quantity')
          | 'Print' >> beam.Map(print)
        )

# fixed_windowed_items = (
#     timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
#     items | 'window' >> beam.WindowInto(window.FixedWindows(60)))

## It would be nice to see the time ranges that these windows represent. 

In [None]:
# Here we did the same thing but instead the Trade class has event_time as Python datetime datatype
from apache_beam import window
from apache_beam.transforms.combiners import Sample

import apache_beam as beam
import json
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.window import FixedWindows
from datetime import datetime

class Trade(NamedTuple):
    key: str
    timestamp: float
    symbol: str
    event_time: datetime
    price: float
    quantity: int
beam.coders.registry.register_coder(Trade, beam.coders.RowCoder)
    
def convert_dict_to_stock(record):
    x = json.loads(record)
    msg = x['value']
    msg['key'] = x['key']
    msg['timestamp'] = x['timestamp']
    return Trade(**msg)
    
@beam.typehints.with_output_types(Trade)
class ParseStockMessage(beam.DoFn):
    def process(self, record):
        x = json.loads(record)
        msg = x['value']
        msg['event_time'] = datetime.strptime(msg['event_time'], "%Y-%m-%d %H:%M:%S")
        msg['key'] = x['key']
        msg['timestamp'] = x['timestamp']
        t = Trade(**msg)
        yield t
        
def add_timestamp(element):
    unix_timestamp = element.timestamp
    event_time = element.event_time
    return beam.window.TimestampedValue(element, unix_timestamp)
        
class TradeAggregate(NamedTuple):
    window_start: datetime
    window_end: datetime
    symbol: str
    total_quantity: int
    
    def __str__(self):
        return f'{self.window_start.strftime("%Y-%m-%d %H:%M:%S")} - {self.window_end.strftime("%Y-%m-%d %H:%M:%S")} {self.symbol} {self.total_quantity}'

beam.coders.registry.register_coder(TradeAggregate, beam.coders.RowCoder)

@beam.typehints.with_output_types(TradeAggregate)
class AddWindowRange(beam.DoFn):
    def process(self, element,  window=beam.DoFn.WindowParam):
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        yield TradeAggregate(window_start, window_end, element.symbol, element.total_quantity)
        
        
stocks_filename = 'trades.txt'
with beam.Pipeline() as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
          | 'Convert to Trade Object' >> beam.Map(convert_dict_to_stock)
          | 'Timestamp the Trade' >> beam.Map(add_timestamp)
          | 'Window' >> beam.WindowInto(FixedWindows(60 * 60 * 24))
          | 'Aggregate 1' >> beam.GroupBy('symbol').aggregate_field('quantity', sum, 'total_quantity')
          | 'AddWindowEndTimestamp' >> (beam.ParDo(AddWindowRange()))
          | 'Print' >> beam.Map(print)
    )
    

## Try a sliding window this time doing a 24-hour period updated hourly. Here we are also introducing a trick useful for debugging which is to convert a Beam PCollection into a Pandas DataFrame.

In [3]:
# pip install apache_beam[dataframe]
from typing import NamedTuple
from apache_beam import window
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection

import json
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.window import FixedWindows, SlidingWindows
from datetime import datetime

import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import pandas as pd
pd.set_option('display.max_rows', None)

class Trade(NamedTuple):
    key: str
    timestamp: float
    symbol: str
    event_time: datetime
    price: float
    quantity: int
beam.coders.registry.register_coder(Trade, beam.coders.RowCoder)
    
def convert_dict_to_stock(record):
    x = json.loads(record)
    msg = x['value']
    msg['key'] = x['key']
    msg['timestamp'] = x['timestamp']
    return Trade(**msg)
    
@beam.typehints.with_output_types(Trade)
class ParseStockMessage(beam.DoFn):
    def process(self, record):
        x = json.loads(record)
        msg = x['value']
        msg['event_time'] = datetime.strptime(msg['event_time'], "%Y-%m-%d %H:%M:%S")
        msg['key'] = x['key']
        msg['timestamp'] = x['timestamp']
        t = Trade(**msg)
        yield t
        
def add_timestamp(element):
    unix_timestamp = element.timestamp
    event_time = element.event_time
    return beam.window.TimestampedValue(element, unix_timestamp)
        

class TradeAggregate(NamedTuple):
    window_start: datetime
    window_end: datetime
    symbol: str
    total_quantity: int
    
    def __str__(self):
        return f'{self.window_start.strftime("%Y-%m-%d %H:%M:%S")} - {self.window_end.strftime("%Y-%m-%d %H:%M:%S")} {self.symbol} {self.total_quantity}'

    def toRow(self):
        return beam.Row(window_start = self.window_start, window_end = self.window_end
                        , symbol = self.symbol, total_quantity = self.total_quantity)
    
beam.coders.registry.register_coder(TradeAggregate, beam.coders.RowCoder)

@beam.typehints.with_output_types(TradeAggregate)
class AddWindowRange(beam.DoFn):
    def process(self, element,  window=beam.DoFn.WindowParam):
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        yield TradeAggregate(window_start, window_end, element.symbol, element.total_quantity)

        
stocks_filename = 'trades.txt'
with beam.Pipeline(InteractiveRunner()) as p:
    s = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
          | 'Convert to Trade Object' >> beam.Map(convert_dict_to_stock)
          | 'Timestamp the Trade' >> beam.Map(add_timestamp)
          | 'Window' >> beam.WindowInto(SlidingWindows(60 * 60 * 24, 60 * 60))
          | 'Aggregate 1' >> beam.GroupBy('symbol').aggregate_field('quantity', sum, 'total_quantity')
          | 'AddWindowEndTimestamp' >> (beam.ParDo(AddWindowRange()))
          | 'To Row' >> beam.Map(lambda x : x.toRow())
    )


    # Collect the Beam DataFrame into a Pandas DataFrame.
    df = ib.collect(s)
    df.columns = ['symbol','total_quantity', 'window_start', 'window_end']

    # We can now use any Pandas transforms with our data.
    df2 = df.sort_values(["symbol", "window_start"], ascending = (True, True))
    display(df2, 100)


Unnamed: 0,symbol,total_quantity,window_start,window_end
23,AAPL,5084,2022-02-19 19:00:00,2022-02-18 19:00:00
22,AAPL,12344,2022-02-19 20:00:00,2022-02-18 20:00:00
21,AAPL,18707,2022-02-19 21:00:00,2022-02-18 21:00:00
20,AAPL,25061,2022-02-19 22:00:00,2022-02-18 22:00:00
19,AAPL,30909,2022-02-19 23:00:00,2022-02-18 23:00:00
18,AAPL,37274,2022-02-20 00:00:00,2022-02-19 00:00:00
17,AAPL,45964,2022-02-20 01:00:00,2022-02-19 01:00:00
16,AAPL,52863,2022-02-20 02:00:00,2022-02-19 02:00:00
15,AAPL,57737,2022-02-20 03:00:00,2022-02-19 03:00:00
14,AAPL,63447,2022-02-20 04:00:00,2022-02-19 04:00:00


100

## Some Beam runners support the use of SQL and which features they support can also vary. Here's some examples of how to do the same thing using Beam SQL.

In [None]:
from apache_beam import window
from apache_beam.transforms.sql import SqlTransform

import apache_beam as beam
import json
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.transforms.window import FixedWindows
from datetime import datetime

class Trade(NamedTuple):
    key: str
    timestamp: float
    symbol: str
    event_time: str
    price: float
    quantity: int
beam.coders.registry.register_coder(Trade, beam.coders.RowCoder)
    
@beam.typehints.with_output_types(Trade)
class ParseStockMessage(beam.DoFn):
    def process(self, record):
        x = json.loads(record)
        msg = x['value']
        msg['key'] = x['key']
        msg['timestamp'] = x['timestamp']
        t = Trade(**msg)
        yield t


stocks_filename = 'trades.txt'
sql = """
SELECT 
  symbol, 
  TUMBLE_START('INTERVAL 10 MINUTE') as period_start,
  TUMBLE_END('INTERVAL 10 MINUTE') as period_end,
  SUM(quantity) AS total_quantity
FROM PCOLLECTION
GROUP BY symbol,
  TUMBLE(timestamp, 'INTERVAL 10 MINUTE')
"""

# sql = """
# SELECT symbol, 
#   SUM(quantity) AS total_quantity
# FROM PCOLLECTION
# GROUP BY
#   symbol
# """

with beam.Pipeline() as p:
    k = ( p
          | 'Read from Kafka' >> ReadFromText(stocks_filename)
          | 'Convert to Trade Object' >> beam.ParDo(ParseStockMessage())
          | 'SQL' >> SqlTransform(sql)
          | 'Print' >> beam.Map(print)
        )

    

### Sometimes we need to manually create a schema for a nested repeating because it cannot use a simple string. In this case, we don't really need it but it's included here as a reference in case we do.

In [None]:
from apache_beam.io.gcp.internal.clients import bigquery as bq
region_territory_schema = bq.TableSchema()
regionid = bq.TableFieldSchema(name = 'regionid', type = 'string', mode = 'required')
region_territory_schema.fields.append(regionid)
regionname = bq.TableFieldSchema(name = 'regionname', type = 'string', mode='required')
region_territory_schema.fields.append(regionname)

# A nested field
territories = bq.TableFieldSchema(name = 'territories', type = 'record', mode = 'nullable')
territoryid = bq.TableFieldSchema(name = 'territoryid', type = 'string', mode = 'required')
territories.fields.append(territoryid)
territoryname = bq.TableFieldSchema(name = 'territoryname', type = 'string', mode = 'required')
territories.fields.append(territoryname)

region_territory_schema.fields.append(territories)

print(region_territory_schema)

### Incomplete AVRO example.

In [None]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 Pipeline p = ...;

#  // Read Avro-generated classes from files on GCS
#  PCollection<AvroAutoGenClass> records =
#      p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));

#  // Read GenericRecord's of the given schema from files on GCS
#  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
#  PCollection<GenericRecord> records =
#      p.apply(AvroIO.readGenericRecords(schema)
#                 .from("gs://my_bucket/path/to/records-*.avro"));
 
    
  pipeline.apply("Read Avro files",
      AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
      .apply("Convert Avro to CSV formatted data",
          ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
      .apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
          .withSuffix(".csv"));
    
    
    
pipeline
      .apply("Read from Avro", AvroIO.read(BigtableRow.class).from(options.getInputFilePattern()))
      .apply(
          "Transform to Bigtable",
          ParDo.of(
              AvroToBigtableFn.createWithSplitLargeRows(
                  options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
      .apply("Write to Bigtable", write);

  return pipeline.run();



public class ReadTerritoriesAvro {

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/AVRO/territories/territories.avro";
        String outputsPrefix = "/tmp/outputs";

        PCollection<AvroAutoGenClass> records= p
            .apply("Read Avro", AvroIO.read(AvroAutoGenClass.class).from(territoriesInputFileName));
/*
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territory", ParDo.of(new ParseTerritories()))
*/
        ;                   
        
/*            
        territories
            .apply("Upper", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toUpperCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_upper").withFormatFunction(new SerializeTerritory()));

        territories
            .apply("Lower", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toLowerCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_lower").withFormatFunction(new SerializeTerritory()));

        
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    */

}


In [None]:
TableReference tableRef = new TableReference();
tableRef.setProjectId("project-id");
tableRef.setDatasetId("dataset-name");
tableRef.setTableId("table-name");

List<TableFieldSchema> fieldDefs = new ArrayList<>();
fieldDefs.add(new TableFieldSchema().setName("column1").setType("STRING"));
fieldDefs.add(new TableFieldSchema().setName("column2").setType("FLOAT"));  
For the Pipeline steps,

Pipeline pipeLine = Pipeline.create(options);
pipeLine
.apply("ReadMyFile", 
        TextIO.read().from("path-to-json-file")) 

.apply("MapToTableRow", ParDo.of(new DoFn<String, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) { 
        Gson gson = new GsonBuilder().create();
        HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);

        TableRow row = new TableRow();
        row.set("column1", parsedMap.get("col1").toString());
        row.set("column2", Double.parseDouble(parsedMap.get("col2").toString()));
        c.output(row);
    }
}))
