# 1. Initialization Code

## Beam

### Initialize helper functions to run Java inside cells.

In [1]:
# https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/try-apache-beam-java.ipynb#scrollTo=CgTXBdTsBn1F
# Run and print a shell command.
def run(cmd, progress = True, verbose = False):
  if progress:
      print('>> {}'.format(cmd))
    
  if verbose:
      !{cmd}  # This is magic to run 'cmd' in the shell.
      print('')
  else:
      ! {cmd} > /dev/null 2>&1

import os

# Download the gradle source.
gradle_version = 'gradle-5.0'
gradle_path = f"/opt/{gradle_version}"
if not os.path.exists(gradle_path):
  run(f"wget -q -nc -O gradle.zip https://services.gradle.org/distributions/{gradle_version}-bin.zip")
  run('unzip -q -d /opt gradle.zip')
  run('rm -f gradle.zip')

# We're choosing to use the absolute path instead of adding it to the $PATH environment variable.
def gradle(args):
  run(f"{gradle_path}/bin/gradle --console=plain {args}")

gradle('-v')

! mkdir -p src/main/java/samples/quickstart/
print('Done')
        

>> /opt/gradle-5.0/bin/gradle --console=plain -v
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Done


### Definition for ___%%java___ Python magic cell function.

In [51]:
from IPython.core.magic import register_line_magic, register_cell_magic, register_line_cell_magic
@register_cell_magic
def java(line, cell):
    """
    Written by Joseph Gagliardo Jr.
    joegagliardo@gmail.com
    2021-12-22
    """
    gradle_text = """
plugins {
  // id 'idea'     // Uncomment for IntelliJ IDE
  // id 'eclipse'  // Uncomment for Eclipse IDE

  // Apply java plugin and make it a runnable application.
  id 'java'
  id 'application'

  // 'shadow' allows us to embed all the dependencies into a fat jar.
  id 'com.github.johnrengelman.shadow' version '4.0.3'
}

// This is the path of the main class, stored within ./src/main/java/
mainClassName = 'samples.quickstart.{class_name}'

// Declare the sources from which to fetch dependencies.
repositories {
  mavenCentral()
}

// Java version compatibility.
sourceCompatibility = 1.8
targetCompatibility = 1.8

// Use the latest Apache Beam major version 2.
// You can also lock into a minor version like '2.9.+'.
ext.apacheBeamVersion = '2.+'

// Declare the dependencies of the project.
dependencies {
  shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"

  runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
  compile "org.apache.beam:beam-sdks-java-extensions-sql:$apacheBeamVersion"
  compile "com.google.auto.value:auto-value-annotations:1.6"
  annotationProcessor "com.google.auto.value:auto-value:1.6"
  runtime "org.slf4j:slf4j-api:1.+"
  runtime "org.slf4j:slf4j-jdk14:1.+"

  testCompile "junit:junit:4.+"
}



// Configure 'shadowJar' instead of 'jar' to set up the fat jar.
shadowJar {
  baseName = '{class_name}' // Name of the fat jar file.
  classifier = null       // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
  manifest {
    attributes('Main-Class': mainClassName)  // Specify where the main class resides.
  }
}
"""   
    start = cell.find('class ')
    end = cell.find(' {')
    class_name = cell[start+6:end]
    progress = 'noprogress' not in line.lower()
    verbose = 'verbose' in line.lower()
    output = 'nooutput' not in line.lower()

    # if len(line) == 0:
    #     start = cell.find('class ')
    #     end = cell.find(' {')
    #     class_name = cell[start+6:end]
    # else:
    #     class_name = line
        
    
    run('rm src/main/java/samples/quickstart/*.java')
    run('rm build/libs/*.jar')
    run('rm -rf /tmp/outputs*', progress = progress, verbose = verbose)

    with open('build.gradle', 'w') as f:
        f.write(gradle_text.replace('{class_name}', class_name))

    with open(f'src/main/java/samples/quickstart/{class_name}.java', 'w') as f:
        f.write(cell)
        
    # Build the project.
    run(f"{gradle_path}/bin/gradle --console=plain build", progress = progress, verbose = verbose)
    run('ls -lh build/libs/', progress = progress, verbose = verbose)
    run(f"{gradle_path}/bin/gradle --console=plain runShadow", progress = progress, verbose = verbose)
    # run('head -n 20 /tmp/outputs*')
    if output:
        run('cat /tmp/outputs*', progress = False, verbose = True)

    print('Done')

print('Done')

Done


## Spark

### Install a Spark docker using the following commands.

In [3]:
! docker pull bitnami/spark && \
docker network create spark_network && \
docker run -d --name spark --network=spark_network -e SPARK_MODE=master bitnami/spark
! ln -s /opt/conda/lib/libtinfo.so /opt/conda/lib/libtinfor.so.6
print('Done')

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Using default tag: latest
latest: Pulling from bitnami/spark
Digest: sha256:ba1b7f08660ffaa48a98abf71de712c89b92b194f9efaef4a8af4575c1809af9
Status: Image is up to date for bitnami/spark:latest
docker.io/bitnami/spark:latest
Error response from daemon: network with name spark_network already exists
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
ln: failed to create symbolic link '/opt/conda/lib/libtinfor.so.6': File exists
Done


### Install pyspark.

In [4]:
import pip

def install(package):
    if hasattr(pip, 'main'):
        pip.main(['install', package])
    else:
        pip._internal.main(['install', package])

install('pyspark')
        
print('Done')

Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.


Done


### Initialize the Spark context variables.

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *

def initspark(appname = "Notebook", servername = "local[*]"):
    print ('initializing pyspark')
    conf = SparkConf().setAppName(appname).setMaster(servername)
    sc = SparkContext(conf=conf)
    spark = SparkSession.builder.appName(appname).enableHiveSupport().getOrCreate()
    sc.setLogLevel("ERROR")
    print ('pyspark initialized')
    return sc, spark, conf

sc, spark, conf = initspark()
print(sc, spark)
print('Done')

#

***

# 2. ___Create___ allows you to upload data into a PCollection.

## Python

### Non Beam example of applying a map function to a collection.

In [None]:
x = ['one', 'two', 'three', 'four']
print(list(map(str.title, x)))

### Simple transformation, turn the local collection into a PCollection and apply a Map PTransform on it.

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    lines = (
        p | beam.Create(['one', 'two', 'three', 'four'])
          | beam.Map(str.title)
          | beam.Map(print)
    )

# lines is a PCollection object
print('lines = ', lines)


### Simple transformation using a lambda instead of a built in function.

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    lines = (
        p | beam.Create(['one', 'two', 'three', 'four'])
          | beam.Map(lambda x : x.title())
          | beam.Map(print)
    )


### Simple transformation using a user defined function.

In [None]:
import apache_beam as beam

def title(x):
    return x.title()

with beam.Pipeline() as p:
    lines = (
        p | beam.Create(['one', 'two', 'three', 'four'])
          | beam.Map(title)
          | beam.Map(print)
    )


### Python the pipe `|` is actually just an operator overload to call the apply method of the pipeline. You would never do this in python, but it helps to understand what is going on under the hood.

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
        lines = ((p | beam.Create(['one', 'two', 'three', 'four']))
             .apply(beam.Map(str.title)) 
             .apply(beam.Map(print))
        )

### The Spark equivalent would be to pload a local Python list into a Spark RDD and do a simple transformation.

In [None]:
rdd1 = ( sc.parallelize(['one', 'two', 'three', 'four'])
        
#           .map(str.title)
       )
rdd1.collect()


## Java

### Simple transformation using a ___lambda___.


In [40]:
%%java verbose
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.TextIO;

import java.util.*;

public class Create1 {
    public static void main(String[] args) {

        String outputsPrefix = "/tmp/outputs";
        Pipeline p = Pipeline.create();
        
        PCollection<String> lines = p.apply(Create.of("one", "two", "three", "four"));
        lines = lines.apply(MapElements.into(TypeDescriptors.strings()).via((String line) -> line.toUpperCase()));
        lines.apply(TextIO.write().to(outputsPrefix));

        p.run().waitUntilFinish();
    }
}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)

>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
[m> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :jar
> Task :startScripts UP-TO-DATE
> Task :distTar
> Task :distZip
> Task :shadowJar
> Task :startShadowScripts
> Task :shadowDistTar
> Task :shadowDistZip
> Task :assemble
> Task :compileTestJava NO-SOURCE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test NO-SOURCE
> Task :check UP-TO-DATE
> Task :build

BUILD SUCCESSFUL in 25s
9 actionable ta

### Simple transformation using ___SimpleFunction___ instead of lambda.


In [46]:
%%java verbose
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.io.TextIO;
import java.util.*;

public class Create2 {
    public static void main(String[] args) {

        String outputsPrefix = "/tmp/outputs";
        Pipeline p = Pipeline.create();
        
        PCollection<String> lines = p.apply(Create.of("one", "two", "three", "four"));
        lines = lines.apply(MapElements.via(
            new SimpleFunction<String, String>() {
              @Override
              public String apply(String line) {
                String ret = line.toUpperCase();
                //System.out.println("** " + ret);
                return ret;
              }
            }));

        lines.apply("Write", TextIO.write().to(outputsPrefix));

        p.run().waitUntilFinish();
    }
}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)

>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
[m> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :jar
> Task :startScripts UP-TO-DATE
> Task :distTar
> Task :distZip
> Task :shadowJar
> Task :startShadowScripts
> Task :shadowDistTar
> Task :shadowDistZip
> Task :assemble
> Task :compileTestJava NO-SOURCE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test NO-SOURCE
> Task :check UP-TO-DATE
> Task :build

BUILD SUCCESSFUL in 25s
9 actionable ta

### Java simple transformation using ___SimpleFunction___ to wrap a User Defined Function.


In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.io.TextIO;
import java.util.*;

public class Create3 {
    public static void main(String[] args) {

        String outputsPrefix = "/tmp/outputs";
        Pipeline p = Pipeline.create();
        
        PCollection<String> lines = p.apply(Create.of("one", "two", "three", "four"));
        lines = lines.apply(MapElements.via(
            new SimpleFunction<String, String>() {
              @Override
              public String apply(String line) {
                return upper(line);
              }
            }));

        lines.apply("Write", TextIO.write().to(outputsPrefix));

        p.run().waitUntilFinish();
    }
    
    public static String upper(String line) {
        return line.toUpperCase();
    }
}


#

***

# 3. ___ReadFromText___ allows you to read a text file into a __PCollection__.

## Python


### It's a good idea to start naming the steps for debugging and monitoring later. Names must be unique in the pipeline.

In [None]:
! rm /tmp/outputs*

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.Map(lambda x : x.split(','))
          | 'Transform' >> beam.Map(lambda x : (int(x[0]), x[1].upper()))
          | 'Write' >> WriteToText('/tmp/outputs')
#          | 'Print' >> beam.Map(print)
    )
    #p.run() # implicit in Python when using with block

! cat /tmp/outputs*

### Read from CSV and use ParDo

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        #return [(int(regionid), regionname)] # ParDo's need to return a list
        yield (int(regionid), regionname) # Can also use yield instead of returning a list
#        yield (int(regionid), regionname.upper()) # Include a transformation instead of doing it as a separate step

regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(regionsfilename)
          | 'Parse' >> beam.ParDo(RegionParseTuple())
          #| 'Write' >> WriteToText('regions.out')
          | 'Print' >> beam.Map(print)
    )


## Java


### Read from CSV and use Map with ___lambda___.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.io.TextIO;

public class ReadRegions1 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", MapElements.into(TypeDescriptors.strings()).via((String element) -> element.toUpperCase()));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
}


### ___ParDo___ Example using anonymous class inline.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;

public class ReadRegions2 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";


        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    String element = c.element();
                    // String[] elements = element.split(",");
                    c.output(element + "*");
                }
            }));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
}



### ___ParDo___ using a defined class instead of an anonynous class.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;

public class ReadRegions3 {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String regionsInputFileName = "datasets/northwind/CSV/regions/regions.csv";
        String outputsPrefix = "/tmp/outputs";


        PCollection<String> regions = p
            .apply("Read", TextIO.read().from(regionsInputFileName))
            .apply("Parse", ParDo.of(new AddStar()));
        
        regions.apply(TextIO.write().to(outputsPrefix));
        p.run().waitUntilFinish();
    }
    
    static class AddStar extends DoFn<String, String> {
        @ProcessElement
        public void process(@Element String line, OutputReceiver<String> out) {
            out.output(line + "*");
        }
    }
}



#

***

# 4. Parse into a model class.


## Python

### Create a model based on ___typing.NamedTuple___ so you can use properties instead of keys and use the Filter __PTransform__ with ___lambda___.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter 1' >> beam.Filter(lambda x : x.regionid % 2 == 0)
          | 'Filter 2' >> beam.Filter(lambda x : x.territoryname.startswith('S'))
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Use Filter with a UDF.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

def startsWithS(element):
    return element.territoryname.startswith('S')

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter' >> beam.Filter(startsWithS)
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Use a ParDo class to accomplish filtering.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname, int(regionid))

class StartsWithSFilter(beam.DoFn):
    def process(self, element):
        if element.territoryname.startswith('S'):
            yield element
            
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Filter' >> beam.ParDo(StartsWithSFilter())
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


### Put the parsing and filtering all into one ParDo.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        if territoryname.startswith('S'):
            yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'
with beam.Pipeline() as p:
    regions = (
        p | 'Read' >> ReadFromText(territoriesfilename)
          | 'Parse' >> beam.ParDo(TerritoryParseClass())
          | 'Print' >> beam.Map(print)
#          | 'Write' >> WriteToText('regions.out')
    )


In [None]:
with beam.Pipeline() as p:
  # records = (p | 'Read' >> beam.io.ReadFromAvro('gs://joey-shared-bucket/datasets/northwind/AVRO/categories/categories.avro')
  records = (p | 'Read' >> beam.io.ReadFromText('gs://joey-shared-bucket/datasets/northwind/CSV/categories/categories.csv')
             | beam.Map(print))
    
    

## Java

### Parse a CSV into a class and filter it using a Pardo

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("Filter", ParDo.of(new FilterTerritories()))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


### Parse a CSV into a class and filter it using and anonymous class to create the condition.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("Filter", Filter.by(new SerializableFunction<Territory, Boolean>() {
                @Override
                public Boolean apply(Territory t) {
                    return t.territoryID % 2 == 0 && t.territoryName.startsWith("S");
                }
            }))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


### Parse a CSV into a class and filter it in one step.

In [None]:
%%java
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
        ;                   
        
        territories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                if (territoryName.startsWith("S")) {
                    c.output(new Territory(territoryID, territoryName, regionID));
                }
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    static class FilterTerritories extends DoFn<Territory, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(FilterTerritories.class);

        @ProcessElement
        public void process(@Element Territory t, OutputReceiver<Territory> o) {
            if (t.territoryID % 2 == 0 && t.territoryName.startsWith("S")) {
                o.output(t);
            }
        }
    }
}


#

***

# 5. Create multiple outputs from a single read.

## Python

### Send the same data down multiple paths, such as to group it on two different keys with one read from the source. Also show how to read AVRO.

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        yield Territory(int(element['territoryid']), element['territorydescription'], int(element['regionid']))

territoriesfilename = 'datasets/northwind/AVRO/territories/territories.avro'
with beam.Pipeline() as p:
    territories = (p | 'Read' >> beam.io.ReadFromAvro(territoriesfilename)
                     | 'Parse' >> beam.ParDo(TerritoryParseClass())
                  )

    # Branch 1
    (territories 
         | 'Lowercase' >> beam.Map(lambda x : (x.territoryid, x.territoryname.lower(), x.regionid))
         | 'Write Lower' >> WriteToText('/tmp/territories_lower.out')
    )
    
    # Branch 2
    (territories 
         | 'Uppercase' >> beam.Map(lambda x : (x.territoryid, x.territoryname.upper(), x.regionid))
         | 'Write Upper' >> WriteToText('/tmp/territories_upper.out')
    )

! echo "Lower" && cat /tmp/territories_lower.out* && echo "Upper" && cat /tmp/territories_upper.out*
    

### Use TaggedOutput in the ParDo to split data into two different paths with different data on each. Also show how to read Parquet.

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

territoriesfilename = 'datasets/northwind/PARQUET/territories/territories.parquet'

with beam.Pipeline() as p:
  records = p | 'Read' >> beam.io.ReadFromParquet(territoriesfilename) | beam.Map(print)

In [None]:
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
import typing

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class OddEvenTerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = int(element['territoryid']), element['territoryname'], int(element['regionid'])
        if int(regionid) % 2 == 0:
            yield pvalue.TaggedOutput('Even', Territory(int(territoryid), territoryname, int(regionid)))
        else:
            yield pvalue.TaggedOutput('Odd', Territory(int(territoryid), territoryname, int(regionid)))

territoriesfilename = 'datasets/northwind/PARQUET/territories/territories.parquet'

with beam.Pipeline() as p:
    territories = p | 'Read' >> beam.io.ReadFromParquet(territoriesfilename) 
    # territories would return a tuple of the two tagged outputs
    # unpack the two outputs to two separate variables to process differently
    evens, odds = territories | 'Parse' >> beam.ParDo(OddEvenTerritoryParseClass()).with_outputs("Even", "Odd")
    
    evens | 'Write Even' >> WriteToText('/tmp/territories_even.out')
    
    odds | 'Write Odd' >> WriteToText('/tmp/territories_odd.out')

! echo "Evens" && cat /tmp/territories_even.out* && echo "Odds" && cat /tmp/territories_odd.out*

## Java

### Send the same output down two different paths.

In [None]:
! rm /tmp/territories*

### Incomplete AVRO example

In [7]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 Pipeline p = ...;

#  // Read Avro-generated classes from files on GCS
#  PCollection<AvroAutoGenClass> records =
#      p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));

#  // Read GenericRecord's of the given schema from files on GCS
#  Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
#  PCollection<GenericRecord> records =
#      p.apply(AvroIO.readGenericRecords(schema)
#                 .from("gs://my_bucket/path/to/records-*.avro"));
 
    
  pipeline.apply("Read Avro files",
      AvroIO.readGenericRecords(schemaJson).from(options.getInputFile()))
      .apply("Convert Avro to CSV formatted data",
          ParDo.of(new ConvertAvroToCsv(schemaJson, options.getCsvDelimiter())))
      .apply("Write CSV formatted data", TextIO.write().to(options.getOutput())
          .withSuffix(".csv"));
    
    
    
pipeline
      .apply("Read from Avro", AvroIO.read(BigtableRow.class).from(options.getInputFilePattern()))
      .apply(
          "Transform to Bigtable",
          ParDo.of(
              AvroToBigtableFn.createWithSplitLargeRows(
                  options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
      .apply("Write to Bigtable", write);

  return pipeline.run();



public class ReadTerritoriesAvro {

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/AVRO/territories/territories.avro";
        String outputsPrefix = "/tmp/outputs";

        PCollection<AvroAutoGenClass> records= p
            .apply("Read Avro", AvroIO.read(AvroAutoGenClass.class).from(territoriesInputFileName));
/*
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territory", ParDo.of(new ParseTerritories()))
*/
        ;                   
        
/*            
        territories
            .apply("Upper", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toUpperCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_upper").withFormatFunction(new SerializeTerritory()));

        territories
            .apply("Lower", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toLowerCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_lower").withFormatFunction(new SerializeTerritory()));

        
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    */

}


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 54)

In [None]:
TableReference tableRef = new TableReference();
tableRef.setProjectId("project-id");
tableRef.setDatasetId("dataset-name");
tableRef.setTableId("table-name");

List<TableFieldSchema> fieldDefs = new ArrayList<>();
fieldDefs.add(new TableFieldSchema().setName("column1").setType("STRING"));
fieldDefs.add(new TableFieldSchema().setName("column2").setType("FLOAT"));  
For the Pipeline steps,

Pipeline pipeLine = Pipeline.create(options);
pipeLine
.apply("ReadMyFile", 
        TextIO.read().from("path-to-json-file")) 

.apply("MapToTableRow", ParDo.of(new DoFn<String, TableRow>() {
    @ProcessElement
    public void processElement(ProcessContext c) { 
        Gson gson = new GsonBuilder().create();
        HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);

        TableRow row = new TableRow();
        row.set("column1", parsedMap.get("col1").toString());
        row.set("column2", Double.parseDouble(parsedMap.get("col2").toString()));
        c.output(row);
    }
}))


### Branching

In [8]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse Territory", ParDo.of(new ParseTerritories()))
        ;                   
        
            
        territories
            .apply("Upper", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toUpperCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_upper").withFormatFunction(new SerializeTerritory()));

        territories
            .apply("Lower", ParDo.of(new DoFn<Territory, Territory>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    Territory t = c.element();
                    c.output(new Territory(t.territoryID, t.territoryName.toLowerCase(), t.regionID));
                }
            }))
             .apply(TextIO.<Territory>writeCustomType().to("/tmp/territories_lower").withFormatFunction(new SerializeTerritory()));

        
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> ls -lh build/libs/
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> /opt/gradle-5.0/bin/gradle --console=plain runShadow
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Done


In [9]:
! echo "Upper" && cat /tmp/territories_upper* && echo "Lower" && cat /tmp/territories_lower*


/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Upper
(territoryID = 85014, territoryName = PHOENIX, regionID = 2)
(territoryID = 94105, territoryName = SAN FRANCISCO, regionID = 2)
(territoryID = 98004, territoryName = BELLEVUE, regionID = 2)
(territoryID = 1730, territoryName = BEDFORD, regionID = 1)
(territoryID = 2184, territoryName = BRAINTREE, regionID = 1)
(territoryID = 6897, territoryName = WILTON, regionID = 1)
(territoryID = 10038, territoryName = NEW YORK, regionID = 1)
(territoryID = 48084, territoryName = TROY, regionID = 3)
(territoryID = 55439, territoryName = MINNEAPOLIS, regionID = 3)
(territoryID = 78759, territoryName = AUSTIN, regionID = 4)
(territoryID = 11747, territoryName = MELLVILE, regionID = 1)
(territoryID = 20852, territoryName = ROCKVILLE, regionID = 1)
(territoryID = 30346, territoryName = ATLANTA, regionID = 4)
(territoryID = 40222, territoryName = LOUISVILLE, regionID = 1)
(territoryID = 85251, territor

### Use TupleTag to split the output into two separate path.

In [None]:
! rm /tmp/territories*

In [10]:
%%java nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {

    final static TupleTag<Territory> evenTag = new TupleTag<Territory>() {};
    final static TupleTag<Territory> oddTag = new TupleTag<Territory>() {};

    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollectionTuple territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("OddEvenSplit", ParDo.of(new ParseTerritoriesOddEvenSplit()).withOutputTags(evenTag, TupleTagList.of(oddTag)))
        ;                   
        
        PCollection<Territory> evenTerritories = territories.get(evenTag);
        evenTerritories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix + "_even").withFormatFunction(new SerializeTerritory()));

        PCollection<Territory> oddTerritories = territories.get(oddTag);
        oddTerritories.apply(TextIO.<Territory>writeCustomType().to(outputsPrefix + "_odd").withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritoriesOddEvenSplit extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritoriesOddEvenSplit.class);

        @ProcessElement
        public void process(ProcessContext c) {


            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                if (regionID % 2 == 0) {
                    c.output(evenTag, new Territory(territoryID, territoryName, regionID));
                } else {
                    c.output(oddTag, new Territory(territoryID, territoryName, regionID));
                }
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritoriesOddEvenSplit: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> ls -lh build/libs/
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> /opt/gradle-5.0/bin/gradle --console=plain runShadow
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Done


In [11]:
! echo "Odd" && cat /tmp/outputs_odd* && echo "Even" && cat /tmp/outputs_even*


/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
Odd
(territoryID = 48304, territoryName = Bloomfield Hills, regionID = 3)
(territoryID = 1730, territoryName = Bedford, regionID = 1)
(territoryID = 2184, territoryName = Braintree, regionID = 1)
(territoryID = 7960, territoryName = Morristown, regionID = 1)
(territoryID = 14450, territoryName = Fairport, regionID = 1)
(territoryID = 27511, territoryName = Cary, regionID = 1)
(territoryID = 53404, territoryName = Racine, regionID = 3)
(territoryID = 1581, territoryName = Westboro, regionID = 1)
(territoryID = 2903, territoryName = Providence, regionID = 1)
(territoryID = 8837, territoryName = Edison, regionID = 1)
(territoryID = 19428, territoryName = Philadelphia, regionID = 3)
(territoryID = 40222, territoryName = Louisville, regionID = 1)
(territoryID = 45839, territoryName = Findlay, regionID = 3)
(territoryID = 55113, territoryName = Roseville, regionID = 3)
(territoryID = 1833, terri

#

***

# 6. To use Group, Join, Sort you need to reshape the data into a KV pair first.

## Python

### ___WithKeys___ will reshape your data first, then GroupByKey will cluster the elements as a list under each unique key. The data must be in a KV tuple pair first. Also not how to read a JSON file.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
import json

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
beam.coders.registry.register_coder(Territory, beam.coders.RowCoder)
        
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = int(element['territoryid']), element['territoryname'], int(element['regionid'])
        yield Territory(int(territoryid), territoryname, int(regionid))

territoriesfilename = 'datasets/northwind/JSON/territories/territories.json'

with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText(territoriesfilename)
                    | 'From json' >> beam.Map(json.loads)
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'Territories With Keys' >> beam.util.WithKeys(lambda x : x.regionid)
#                    | 'Group Territories' >> beam.GroupByKey() 
                    | 'Print Territories' >> beam.Map(print)
                  )


### Combine is equivalent to a SQL ___GROUP BY___ query
### ___SELECT key, sum(value) as total FROM source GROUP BY key___

In [None]:
import apache_beam as beam

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', 10), ('a', 20), ('b', 30), ('b', 40), ('c', 50), ('a', 60)])
          | 'Combine' >> beam.CombinePerKey(sum)
          | 'Print' >> beam.Map(print)
    )


### Custom Combine Function

In [None]:
import apache_beam as beam

class CustomCombine(beam.CombineFn):
    """
    This custom combiner will calculate the max of the first element, sum of the second element and a count of total elements
    The final step will also return the average of the second element.
    """
    def create_accumulator(self):
        # method defining how to create an empty accumulator
        return dict()

    def add_input(self, accumulator, input):
        # get the input and split it up for easier manipulation
        k, v = input
        # get the values from the accumulator for the input key or initialize it if it's the first time we see this key
        m, s, c = accumulator.get(k, (0, 0, 0))

        # take the max for the first element of the tuple and sum the second element and count for the third
        accumulator[k] = (v[0] if v[0] > m else m, s + v[1], c + 1)
        return accumulator

    def merge_accumulators(self, accumulators):
        # merge the accumulators from the various workers once they have finished accumulating locally
        merged = dict()
        for accum in accumulators:
          for k, v in accum.items():
            m, s, c = merged.get(k, (0, 0, 0))
            merged[k] = (v[0] if v[0] > m else m, s + v[1], c + v[2])
        return merged

    def extract_output(self, accumulator):
        # called when all the works accumulators have been merge to render the final output
        # return the max, the sum, the count and the average for the key
        return {k : (v[0], v[1], v[2], v[1]/v[2]) for k, v in accumulator.items()}

with beam.Pipeline() as p:
    data = (
        p | 'Create' >> beam.Create([('a', (1, 10)), ('a', (2, 20)), 
                                     ('b', (3, 30)), ('c', (5, 50)), 
                                     ('b', (4, 40)), ('a', (6, 60))])
          | 'Combine' >> beam.CombineGlobally(CustomCombine())
          | 'Print' >> beam.Map(print)
    )


### Create a nested repeating output
### First create a dataset. Here is Python code for the equivalent bq command of bq mk dataflow.


In [None]:
# same as doing bq mk dataflow

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set dataset_id to the ID of the dataset to create.
PROJECT_ID = 'qwiklabs-gcp-04-b1b7cded1c4b'
dataset_id = f"{PROJECT_ID}.dataflow" #.format(client.project)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = "US"

# # Construct a full Dataset object to send to the API.
# dataset = bigquery.Dataset(dataset_id)


try:
    client.get_dataset(dataset_id)  # Make an API request.
    print("Dataset {} already exists".format(dataset_id))
except:
    print("Dataset {} is not found".format(dataset_id))
    dataset = bigquery.Dataset(dataset_id)
    dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
    
    

schema = [
    bigquery.SchemaField("regionid", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("regionname", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("territories", "RECORD", mode="REPEATED", 
            fields=[
                    bigquery.SchemaField("territoryid", "STRING", mode="REQUIRED"),
                    bigquery.SchemaField("territoryname", "STRING", mode="REQUIRED")
                   ]
                        )
]

# create table dataflow.region_territory
# (regionid NUMERIC
# ,regionname STRING
# ,territories ARRAY<STRUCT<territoryid NUMERIC, territoryname STRING>>)

table_id = f"{PROJECT_ID}.dataflow.region_territory"

try:
    table = client.get_table(table_id)  # Make an API request.
    print("Table {} already exists.".format(table_id))
    print(table)
except:
    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print("Table {} created.".format(table_id))



### Sometimes need to manually create a schema for a nested repeating because it cannot use a simple string. In this case we don't really need it but it's included here as a reference in case we do.

In [None]:
from apache_beam.io.gcp.internal.clients import bigquery as bq
region_territory_schema = bq.TableSchema()
regionid = bq.TableFieldSchema(name = 'regionid', type = 'string', mode = 'required')
region_territory_schema.fields.append(regionid)
regionname = bq.TableFieldSchema(name = 'regionname', type = 'string', mode='required')
region_territory_schema.fields.append(regionname)

# A nested field
territories = bq.TableFieldSchema(name = 'territories', type = 'record', mode = 'nullable')
territoryid = bq.TableFieldSchema(name = 'territoryid', type = 'string', mode = 'required')
territories.fields.append(territoryid)
territoryname = bq.TableFieldSchema(name = 'territoryname', type = 'string', mode = 'required')
territories.fields.append(territoryname)

region_territory_schema.fields.append(territories)

print(region_territory_schema)

### The code here is tricky: 
### First parse the two tables into tuples, (regionid, regionname) & (regionid, {'territoryid':territoryid, 'territoryname':territoryname})
### CoGroupByKey yields a shape like (regionid, {'regions':['regionname'], 'territories':[{}]) so we need to reshape it to dicts to write it to BQ


In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText

class RegionParseTuple(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield (int(regionid), regionname) # Can also use yield instead of returning a list

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(regionid), {'territoryid': int(territoryid), 'territoryname':territoryname})

class SortTerritories(beam.DoFn):
    #{'regionid': 1, 'regionname': 'Eastern', 'territories': [{'territoryid': 1730, 'territoryname': 'Bedford'}, 
    def process(self, element):
        territories = element['territories']
        element['territories'] = sorted(territories, key = lambda x : x['territoryid'])
        yield element

regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

#PROJECT_ID = 'qwiklabs-gcp-04-4cf93802c378'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseTuple())
#                | 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#                    | 'Print Territories' >> beam.Map(print)
                  )
    nested = ( 
        {'regions':regions, 'territories':territories} 
              | 'Nest territories into regions' >> beam.CoGroupByKey()
              | 'Reshape to dict' >> beam.Map(lambda x : {'regionid': x[0], 'regionname': x[1]['regions'][0], 
                                                        'territories': x[1]['territories']})
              | 'Sort by territoryid' >> beam.ParDo(SortTerritories())
#              | 'Print' >> beam.Map(print)
    )
    nested | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
                                                                             , project = PROJECT_ID
                                                                             , method = 'STREAMING_INSERTS'
                                                                             )
#    nested | 'Print' >> beam.Map(print)
             
#help(beam.io.WriteToBigQuery)    
#(1, {'regions': ['Eastern'], 'territories': [{'territoryid': 1730, 'territoryname': 'Bedford'}, {'territoryid': 1581, 'territoryname': 'Westboro'}, {'territoryid': 1833, 'territoryname': 'Georgetow'}, {'territoryid': 2116, 'territoryname': 'Bosto
#{'regionid': 1, 'regionname':'Eastern', 'territories' : [{'territoryid':1, 'territoryname':'name1'}, {}, {}]}

### Query the table to show it was populated.

In [None]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

table_id = f"{PROJECT_ID}.dataflow.region_territory"

query_job = client.query(f"""SELECT * FROM {table_id}""")

results = query_job.result()  # Waits for job to complete.
display(list(results))

### Helper functions to make a generic transform to nest children

In [None]:
import apache_beam as beam

class NestJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key, sort = lambda x : x):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key
        self.sort = sort

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToDict(item):
            ret = {self.parent_key : item[0]}
            ret.update(item[1][self.parent_pipeline_name][0])
            ret[self.child_pipeline_name] = item[1][self.child_pipeline_name]
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToDict)
                  | f'Sort the nested data' >> beam.Map(self.sort)
            
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}
      
class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

def sort_territories(element):
    territories = element['territories']
    element['territories'] = list(sorted(territories, key = lambda x : x['territoryid']))
    return element

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | NestJoin('regions', 'regionid', 'territories', 'regionid', sort = sort_territories)
    nestjoin | 'Print Nest Join' >> beam.Map(print)
#     nestjoin | 'Write nested region_territory to BQ' >> beam.io.WriteToBigQuery('region_territory', dataset = 'dataflow'
#                                                                              , project = PROJECT_ID
#                                                                              , method = 'STREAMING_INSERTS'
#                                                                              )



### Simulate an Outer Join with CoGroup

In [None]:
import apache_beam as beam

class LeftJoin(beam.PTransform):
    '''
    This PTransform will take a dictionary to the left of the | which will be the collection of the two
    PCollections you want to join together. Both must be a dictionary. You will then pass in the name of each
    PCollection and the key to join them on.
    It will automatically reshape the two dicts into tuples of (key, dict) where it removes the key from each dict
    It then CoGroups them and reshapes the tuple into a dict ready for insertion to a BQ table
    '''
    def __init__(self, parent_pipeline_name, parent_key, child_pipeline_name, child_key):
        self.parent_pipeline_name = parent_pipeline_name
        self.parent_key = parent_key
        self.child_pipeline_name = child_pipeline_name
        self.child_key = child_key

    def expand(self, pcols):
        def reshapeToKV(item, key):
            # pipeline object should be a dictionary
            item1 = item.copy()
            del item1[key]
            return (item[key], item1)

        def reshapeCoGroupToFlatDict(item):
            parent = {self.parent_key : item[0]}
            parent.update(item[1][self.parent_pipeline_name][0])
            ret = []
            for row1 in item[1][self.child_pipeline_name]:
                row = parent.copy()
                row.update(row1)
                ret.append(row)
            return ret

        return (
                {
                self.parent_pipeline_name : pcols[self.parent_pipeline_name] | f'Convert {self.parent_pipeline_name} to KV' 
                    >> beam.Map(reshapeToKV, self.parent_key)
                ,self.child_pipeline_name : pcols[self.child_pipeline_name] | f'Convert {self.child_pipeline_name} to KV'
                    >> beam.Map(reshapeToKV, self.child_key)
                } | f'CoGroupByKey {self.child_pipeline_name} into {self.parent_pipeline_name}'
                    >> beam.CoGroupByKey()
                  | f'Reshape to dictionary'
                    >> beam.Map(reshapeCoGroupToFlatDict)
        )

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid':int(regionid), 'regionname':regionname.title()}

class TerritoryParseDict(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield {'territoryid':int(territoryid), 'territoryname' : territoryname, 'regionid':int(regionid)}
    
regionsfilename = 'datasets/northwind/CSV/regions/regions.csv'
territoriesfilename = 'datasets/northwind/CSV/territories/territories.csv'

with beam.Pipeline() as p:
    regions = (
              p | 'Read Regions' >> ReadFromText(regionsfilename)
                | 'Parse Regions' >> beam.ParDo(RegionParseDict())
                #| 'Print Regions' >> beam.Map(print)
              )
        
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseDict())
                    #| 'Print Territories' >> beam.Map(print)
                  )

    nestjoin = {'regions':regions, 'territories':territories} | LeftJoin('regions', 'regionid', 'territories', 'regionid')
    nestjoin | 'Print Nest Join' >> beam.Map(print)



## Java

#

***

# 7. BeamSQL

## Python

### SQL Transform uses PCOLLECTION as the name of a single source passed into it.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

class Territory(typing.NamedTuple):
    territoryid: int
    territoryname: str
    regionid: int
    
    def __str__(self):
        return f'territoryid = {self.territoryid} territoryname = {self.territoryname} regionid = {self.regionid}'
coders.registry.register_coder(Territory, coders.RowCoder)
        
@beam.typehints.with_output_types(Territory)
class TerritoryParseClass(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield Territory(int(territoryid), territoryname.title(), int(regionid))
    
class RegionCount(typing.NamedTuple):
    regionid: int
    count: int
    
    def __str__(self):
        return f'regionid = {self.regionid} count = {self.count}'
coders.registry.register_coder(RegionCount, coders.RowCoder)
        
        
territoriesfilename = 'territories.csv'
with beam.Pipeline() as p:
    territories = (
                  p | 'Read Territories' >> ReadFromText('territories.csv')
#                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass()).with_output_types(Territory) # if we didn't have with_output_types decorator
                    | 'Parse Territories' >> beam.ParDo(TerritoryParseClass())
                    | 'SQL Territories' >> SqlTransform("""SELECT regionid, count(*) as `count` FROM PCOLLECTION GROUP BY regionid""")
#                    | 'Map Territories for Print' >> beam.Map(lambda x : f'regionid = {x.regionid}  count = {x.count}')
#                    | 'Convert to RegionCount Class' >> beam.Map(lambda x : RegionCount(x.regionid, x.count))
                    | 'Print SQL' >> beam.Map(print)
                    )
    


### For a SQL query that has more than one source, bundle the sources together in a dictionary, they keys become the table names inside the SQL string.

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform

import typing
import json

with beam.Pipeline() as p:
    parent = (
            p | 'Create Parent' >> beam.Create([(1, 'Vowel'), (2, 'Consonant'), (4, 'Unknown')])
              | 'Map Parent' >> beam.Map(lambda x : beam.Row(parent_id = x[0], parent_name = x[1]))
    )

    child = (
            p | 'Create Child' >> beam.Create([('Alpha', 1), ('Beta', 2), ('Gamma', 2), ('Delta', 2), ('Epsilon', 1), ('Pi', 3)])
              | 'Map Child' >> beam.Map(lambda x : beam.Row(child_name = x[0], parent_id = x[1]))
    )
    
    result = ( {'parent': parent, 'child' : child} 
         | SqlTransform("""
             SELECT p.parent_id, p.parent_name, c.child_name 
             FROM parent as p 
             INNER JOIN child as c ON p.parent_id = c.parent_id
             """)
        | 'Map Join' >> beam.Map(lambda x : f'{x.parent_id}, {x.parent_name}, {x.child_name}')
        | 'Print Join' >> beam.Map(print)
        )


## Java

In [None]:
%%java 



  // Define the schema for the records.
    Schema appSchema =
        Schema
          .builder()
          .addInt32Field("appId")
          .addStringField("description")
          .addDateTimeField("rowtime")
          .build();

    // Create a concrete row with that type.
    Row row =
        Row
          .withSchema(appSchema)
          .addValues(1, "Some cool app", new Date())
          .build();

    // Create a source PCollection containing only that row
    PCollection<Row> testApps =
        PBegin
          .in(p)
          .apply(Create
                    .of(row)
                    .withCoder(RowCoder.of(appSchema)));
    

In [77]:
%%java verbose nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.transforms.Convert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        // Define the schema for the records.
        Schema territorySchema = Schema
          .builder()
          .addInt64Field("territoryID")
          .addStringField("territoryName")
          .addInt64Field("regionID")
          .build();

        // Define the schema to hold the results.
        Schema resultSchema = Schema.of(
            Schema.Field.of("regionID", Schema.FieldType.INT64), 
            Schema.Field.of("cnt", Schema.FieldType.INT64));
        
        
        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
            .apply("To Rows", Convert.toRows())
        ;                   
        
        
          PCollection<Row> territories3 = territories.apply(
             SqlTransform.query("SELECT regionID, COUNT(*) as cnt from PCOLLECTION GROUP BY regionID")).setRowSchema(resultSchema);
        
          territories3.apply(
              "Print", MapElements.via(new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                      System.out.println("SQL Result: " + input.getValues());
                      return input;
                  }
              }
          )).setRowSchema(resultSchema);
        
//        territories3.apply(TextIO.<Row>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    

    
     
/*    
    
    
    @DefaultCoder(AvroCoder.class)
    @DefaultSchema(JavaFieldSchema.class)
    static class Region {
        Long regionID;
        Long cnt regionName;
        
        Region() {}
        
        Region(long regionID, long cnt) {
            this.regionID = regionID;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionID = %d, cnt = %d)", regionID, cnt);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Region, String> {
        @Override
        public String apply(Region input) {
          return input.toString();
        }
    }

    
    
private class Transform extends PTransform<pcollectionlist<row>, PCollection<row>> {
 
    @Override
    public PCollection<row> expand(PCollectionList<row> pinput) {
      checkArgument(
          pinput.size() == 1,
          "Wrong number of inputs for %s: %s",
          BeamUncollectRel.class.getSimpleName(),
          pinput);
      PCollection<row> upstream = pinput.get(0);
 
      // Each row of the input contains a single array of things to be emitted; Calcite knows
      // what the row looks like
      Schema outputSchema = CalciteUtils.toSchema(getRowType());
 
      PCollection<row> uncollected =
          upstream.apply(ParDo.of(new UncollectDoFn(outputSchema))).setRowSchema(outputSchema);
 
      return uncollected;
    }
  }    
    static class ParseRegions extends DoFn<Row, Region> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    
    
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MyPersonClass {
  public abstract String getName();
  public abstract Integer getAge();
  public abstract Float getHeight();

  @SchemaCreate
  public static MyPersonClass create(String name, Integer age, Float height) {
    return new AutoValue_MyPersonClass(name, age, height);
  }
}
    
*/
    
    
}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)

>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
[m
> Task :compileJava FAILED
/home/jupyter/dataflowclass1/src/main/java/samples/quickstart/ReadTerritories.java:54: error: incompatible types: inference variable OutputT has incompatible bounds
            .apply("To Rows", Convert.toRows())
                  ^
    equality constraints: PCollection<Row>
    upper bounds: PCollection<Territory>,POutput
  where OutputT,T are type-variables:
    OutputT extends POutput declared in method <OutputT>apply(String

## BeamSQL Java working

In [79]:
%%java verbose nooutput
package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.transforms.Convert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadTerritories {
    public static void main(String[] args) {
        Pipeline p = Pipeline.create();

        String territoriesInputFileName = "datasets/northwind/CSV/territories/territories.csv";
        String outputsPrefix = "/tmp/outputs";

        PCollection<Territory> territories = p
            .apply("Read", TextIO.read().from(territoriesInputFileName))
            .apply("Parse", ParDo.of(new ParseTerritories()))
        ;                   
        
        // Define the schema for the records.
        Schema territorySchema = Schema
          .builder()
          .addInt64Field("territoryID")
          .addStringField("territoryName")
          .addInt64Field("regionID")
          .build();
        // Define the schema to hold the results.
        
        Schema resultSchema = Schema.of(
            Schema.Field.of("regionID", Schema.FieldType.INT64), 
            Schema.Field.of("cnt", Schema.FieldType.INT64));
        
        // Convert them to Rows with the same schema as defined above via a DoFn.
        PCollection<Row> territories2 = territories
          .apply(
          ParDo.of(new DoFn<Territory, Row>() {
            @ProcessElement
            public void process(ProcessContext c) {
              // Get the current POJO instance
              Territory t = c.element();

              // Create a Row with the appSchema schema
              // and values from the current POJO
              Row territoryRow =
                    Row
                      .withSchema(territorySchema)
                      .addValues(
                        t.territoryID,
                        t.territoryName,
                        t.regionID)
                      .build();

              // Output the Row representing the current POJO
              c.output(territoryRow);
            }
          })).setRowSchema(territorySchema);
        
          PCollection<Row> territories3 = territories2.apply(Convert.toRows()).apply(
             SqlTransform.query("SELECT regionID, COUNT(*) as cnt from PCOLLECTION GROUP BY regionID")).setRowSchema(resultSchema);
        
          territories3.apply(
              "Print", MapElements.via(new SimpleFunction<Row, Row>() {
                  @Override
                  public Row apply(Row input) {
                      System.out.println("SQL Result: " + input.getValues());
                      return input;
                  }
              }
          )).setRowSchema(resultSchema);
//        territories3.apply(TextIO.<Row>writeCustomType().to(outputsPrefix).withFormatFunction(new SerializeTerritory()));
        p.run().waitUntilFinish();
    }

/*    
    @schemultSchema(AutoValueSchema.class)
    @AutoValue
    public static abstract class Territory {
      public abstract Long getTerritoryID();
      public abstract String getTerritoryName();
      public abstract Long getRegionID();

      @SchemaCreate
      public static Territory create(Long territoryID, String territoryName, Long regionID) {
        return new AutoValue_TerritoryClass(territoryID, territoryName, regionID);
      }
*/    
    
    @DefaultCoder(AvroCoder.class)
    static class Territory {
        Long territoryID;
        String territoryName;
        Long regionID;
        
        Territory() {}
        
        Territory(long territoryID, String territoryName, long regionID) {
            this.territoryID = territoryID;
            this.territoryName = territoryName;
            this.regionID = regionID;
        }
        
        @Override
        public String toString() {
            return String.format("(territoryID = %d, territoryName = %s, regionID = %d)", territoryID, territoryName, regionID);
        }
    }
    
    static class SerializeTerritory implements SerializableFunction<Territory, String> {
        @Override
        public String apply(Territory input) {
          return input.toString();
        }
    }

    static class ParseTerritories extends DoFn<String, Territory> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }
    

    
     
/*    
    
    
    @DefaultCoder(AvroCoder.class)
    static class Region {
        Long regionID;
        Long cnt regionName;
        
        Region() {}
        
        Region(long regionID, long cnt) {
            this.regionID = regionID;
            this.cnt = cnt;
        }
        
        @Override
        public String toString() {
            return String.format("(regionID = %d, cnt = %d)", regionID, cnt);
        }

    }
    
    static class SerializeTerritory implements SerializableFunction<Region, String> {
        @Override
        public String apply(Region input) {
          return input.toString();
        }
    }

    
    
private class Transform extends PTransform<pcollectionlist<row>, PCollection<row>> {
 
    @Override
    public PCollection<row> expand(PCollectionList<row> pinput) {
      checkArgument(
          pinput.size() == 1,
          "Wrong number of inputs for %s: %s",
          BeamUncollectRel.class.getSimpleName(),
          pinput);
      PCollection<row> upstream = pinput.get(0);
 
      // Each row of the input contains a single array of things to be emitted; Calcite knows
      // what the row looks like
      Schema outputSchema = CalciteUtils.toSchema(getRowType());
 
      PCollection<row> uncollected =
          upstream.apply(ParDo.of(new UncollectDoFn(outputSchema))).setRowSchema(outputSchema);
 
      return uncollected;
    }
  }    
    static class ParseRegions extends DoFn<Row, Region> {
        private static final Logger LOG = LoggerFactory.getLogger(ParseTerritories.class);

        @ProcessElement
        public void process(ProcessContext c) {
            
            String[] columns = c.element().split(",");
            try {
                Long territoryID = Long.parseLong(columns[0].trim());
                String territoryName = columns[1].trim();
                Long regionID = Long.parseLong(columns[2].trim());
                c.output(new Territory(territoryID, territoryName, regionID));
            } catch (ArrayIndexOutOfBoundsException | NumberFormatException e) {
                LOG.info("ParseTerritories: parse error on '" + c.element() + "': " + e.getMessage());
            }
        }
    }

    
    
*/
    
    
}


>> rm src/main/java/samples/quickstart/*.java
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm build/libs/*.jar
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
>> rm -rf /tmp/outputs*
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)

>> /opt/gradle-5.0/bin/gradle --console=plain build
/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)
[m> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :jar
> Task :startScripts UP-TO-DATE
> Task :distTar
> Task :distZip
> Task :shadowJar
> Task :startShadowScripts
> Task :shadowDistTar
> Task :shadowDistZip
> Task :assemble
> Task :compileTestJava NO-SOURCE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test NO-SOURCE
> Task :check UP-TO-DATE
> Task :build

BUILD SUCCESSFUL in 26s
9 actionable ta

#

***

# 8. DoFn Lifecycle

## Python

### DoFn Lifecycle

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def setup(self):
        self.lookup = {1:'North', 2:'South', 3:'East', 4:'West'}
        print('setup')
        
    def start_bundle(self):
        print('start bundle')
        
    def process(self, element, uppercase = 0):
        #lookuptable = {1:'North', 2:'South', 3:'East', 4:'West'}
        territoryid, territoryname, regionid = element
        region = self.lookup.get(regionid, 'No Region')
        if uppercase == 1:
            region = region.upper()
        yield(territoryid, territoryname, regionid, region)
        
    def finish_bundle(self):
        print('finish bundle')

    def teardown(self):
        print('teardown')
        del self.lookup
    

with beam.Pipeline() as p:
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), uppercase = 1 ) 
        | 'Print Loopup' >> beam.Map(print)
    )
        


#

***

# 9. Side Inputs

## Python

### Side inputs are about passing extra parameters to a function where the parameters are calculated in the pipeline itself.

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsSingleton, AsDict
from apache_beam.io import ReadFromText
from apache_beam.transforms.combiners import Sample

class TerritoryParseTuple(beam.DoFn):
    # split territory into KV pair of (regionid, (territoryid, territoryname))
    def process(self, element, uppercase = '0'):
        # It's a bit weird here but what is passed in is a single element array of a string
        #print('***', uppercase)
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname if uppercase[0] == '0' else territoryname.upper(), int(regionid))

        
with beam.Pipeline() as p:
    sideinput = (
        p | 'Read sideinput.txt' >> ReadFromText('sideinput.txt')
          | Sample.FixedSizeGlobally(1)
    )
    
    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = ["0"]) # This is not a side input but just passing a fixed parameter
#          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = sideinput)  # fails because sideinput is a PCollection not an integer
#          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple(), uppercase = beam.pvalue.AsSingleton(sideinput))  # When the parameter is calculated in the pipeline itself, that makes it a side input
          | 'Print Loopup' >> beam.Map(print)
    )

#    maxregion | 'Print Min' >> beam.Map(print)


### Side input that is a lookup list
### More realistic example where the entire lookup table is read in the pipeline then distributed to each worker as a side input

In [None]:
import apache_beam as beam
from apache_beam.pvalue import AsList
from apache_beam.io import ReadFromText

class RegionParseDict(beam.DoFn):
    def process(self, element):
        regionid, regionname = element.split(',')
        yield {'regionid': int(regionid), 'regionname': regionname.title()}

class TerritoryParseTuple(beam.DoFn):
    def process(self, element):
        territoryid, territoryname, regionid = element.split(',')
        yield(int(territoryid), territoryname, int(regionid))
        
                
class LookupRegion(beam.DoFn):
    def process(self, element, lookuptable = [{'regionid':1, 'regionname':'North'}, {'regionid':2, 'regionname':'South'}]):
        # {1:'North', 2:'South'}
        territoryid, territoryname, regionid = element
        # Becase the regions PCollection is a different shape, use the following comprehension to make it easier to do a lookup
        lookup = {e['regionid'] : e['regionname'] for e in lookuptable } # {1:'North', 2:'South'}
        yield(territoryid, territoryname, regionid, lookup.get(regionid, 'No Region'))

with beam.Pipeline() as p:
    regions = (
        p | 'Read Regions' >> ReadFromText('regions.csv')
          | 'Parse Regions' >> beam.ParDo(RegionParseDict())
#          | 'Print Regions' >> beam.Map(print)
    )

    territories =  (
        p | 'Read Territories' >> ReadFromText('territories.csv')
          | 'Parse Territories' >> beam.ParDo(TerritoryParseTuple())
#          | 'Print Territories' >> beam.Map(print)
    )
    
    lookup = (
        territories
        | beam.ParDo(LookupRegion(), lookuptable = beam.pvalue.AsList(regions))
        | 'Print Loopup' >> beam.Map(print)
    )
        


#

***