# Apache Beam - Schemas

A schema is meta data used to describe the structure of elements in a `PCollection`.  The schema is *attached* to the `PCollection`.

A schema is defined as a set of named fields where each field can have a type.  The types of fields that can be used are:

* BOOLEAN
* BYTE
* BYTES
* DATETIME
* DECIMAL
* DOUBLE
* FLOAT
* INT16
* INT32
* INT64
* STRING

In addition, a field can be defined as a ROW which means that the field represents a nested schema.  Fields can also be tagged as:

* ARRAY - The field can contain 0 or more values.
* ITERABLE - The field can contain 0 or more values which can be iterated over.
* MAP - The field contains name/value pairs.

The schema becomes useful when the content of the `PCollection` are objects (records).  Imagine I told you that a `PCollection` contains Employee records.  Now consider if I told you that an Employee record contains:

* name: String
* salary: Double
* tenure: Integer

By declaring a schema, we are implicitly declaring that we have a mechanism for getting and setting values from an object.  With the ability to obtain a named value, we have now opened up a new capability in our data processing.  We can now write transforms that are cognitive of schemas.  Imagine we built a transform that takes as input the name of a field and a function that takes a value and returns true or false.  We could now imagine something like:

```
newPCollection<Employee> = originalPCollection.apply(Filter.on("salary", Double d -> d > 10000))
```

We are now getting an inkling that by having Beam known the logical structure of elements, we can build new transforms that take advantage of it.

Schemas can be defined explicitly or can be inferred.  We can infer a schema using the `@DefaultSchema` annotation.

If a `PCollection` has a schema we can get its definition by calling `getSchema()`.  Not all `PCollections` have a schema.  We can determine if a `PCollection` has a chema by calling `hasSchema()`.

Beam provides a class called `Row` that can be used to represent an element in a `PCollection` with an associated schema without having to create a corresponding Java Bean.

We can create a `Row` using:

```
Row myRow = Row.schema(mySchema).withFieldValue("field1Name", field1Value)....
```

The resulting row is immutable (can't change its values) but does have getters to obtain fields by name or ordinal.

Q: What is `SchemaProvider`?
Q: What is `SchemaRegistry`?

Since our notebook is going to use Google Cloud SDK JARS we must include these in our dependencies.  Specifically, we need to include:

```
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>2.43.0</version>
</dependency>
```

Normally we would load our dependencies using the IJava Jupyter cell magic called `%%loadFromPom`.  Unfortunately, this doesn't work ([issue](https://github.com/SpencerPark/IJava/issues/139)).  A workaround is to download the dependencies outside of Jupyter and then launch Jupyter with the downloaded dependencies in the classpath.

```
mvn dependency:copy-dependencies
export IJAVA_CLASSPATH="./target/dependency/*"
jupyter notebook

```



See also:
* [Programming Guide - Schemas](https://beam.apache.org/documentation/programming-guide/#schemas)
* [Schema Patterns](https://beam.apache.org/documentation/patterns/schema/)
* [JavaDoc: Row](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/values/Row.html)
* [JavaDoc: Schema](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/Schema.html)
* [JavaDoc: SqlTransform](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/extensions/sql/SqlTransform.html)
* [Video: Schema-aware PCollections and Beam SQL (Beam Summit Europe 2019)](https://www.youtube.com/watch?v=aRIZXtQiCHw)
* [Schema Aware Generic Pipelines Using Apache Beam Rows](https://medium.com/@gauravmishra_82578/schema-aware-generic-pipelines-using-apache-beam-rows-e0a36b11b929)

Next we define our imports required for execution.

In [1]:
import java.util.Arrays;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.joda.time.Instant;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.JavaBeanSchema;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.schemas.transforms.Filter;
import org.apache.beam.sdk.schemas.transforms.DropFields;
import org.apache.beam.sdk.schemas.transforms.RenameFields;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.schemas.transforms.AddFields;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.transforms.ToJson;

String args[] = new String[] {"--tempLocation=gs://kolban-tmp"};
var options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

// Disable block on run for direct runner
options.as(DirectOptions.class).setBlockOnRun(false);

public class LoggingDoFn<T> extends DoFn<T, T>  {
  @ProcessElement
  public void processElement(@Element T element, OutputReceiver<T> out) {
    System.out.println(element);
    out.output(element);
  }
}

One way to define a schema is by using Java Beans.  Each of the properties of the bean (as defined by introspection to determine the getters and setters) becomes a field in the schema.  To define a schema from a Java Bean, annotate its declaration with `@DefaultSchema(JavaBeanSchema.class)`.

In [2]:
@DefaultSchema(JavaBeanSchema.class)
public class Employee {
  private String name;
  private Double salary;

  public  String getName() { return name;}
  public  void   setName(String name) { this.name = name; }
  public  Double getSalary() { return salary; }
  public  void   setSalary(Double salary) { this.salary = salary; }

  public  String toString() {
    return "Employee: name: " + name + ", salary: " + salary;
  }

  @SchemaCreate
  public Employee(String name, Double salary) {
    this.name = name; this.salary = salary;
  }
} // Employee

Another way to create a schema is to explicitly construct one using the `Schema` class and the `builder()` method.  Invoking Schema.builder() returns a Schema.Builder object that allows us to add fields to our schema.  The order of insertion is remembered so that we can refer to the fields ordinally as well as by name.  We can add fields using the convenience constructors of the form `add<Type>Field(<fieldName>)` ... for example `addStringField("item")` but if using this form, we don't get to specify non default values for things such as `description` or options.  Instead, we can create the field using `Schema.Field.of(...)` and add the field to the schema using `addFields(...)`.

* [JavaDoc: Schema](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/Schema.html)

In [38]:
Schema saleSchema = Schema
  .builder()
  .addStringField("item")
  .addDoubleField("amount")
  .addDateTimeField("time")
  .build();
System.out.println(saleSchema);

Fields:
Field{name=item, description=, type=STRING NOT NULL, options={{}}}
Field{name=amount, description=, type=DOUBLE NOT NULL, options={{}}}
Field{name=time, description=, type=DATETIME NOT NULL, options={{}}}
Encoding positions:
{item=0, amount=1, time=2}
Options:{{}}UUID: null


And here is an eequivalent using explicit field definitions which allows us to add a description:

In [39]:
Schema saleSchema = Schema
  .builder()
  .addFields(Schema.Field.of("item",   Schema.FieldType.STRING).withDescription("Purchased Item"))
  .addFields(Schema.Field.of("amount", Schema.FieldType.DOUBLE).withDescription("Purchased Cost"))
  .addFields(Schema.Field.of("time",   Schema.FieldType.DATETIME).withDescription("Purchased Time"))
  .build();
System.out.println(saleSchema);

Fields:
Field{name=item, description=Purchased Item, type=STRING NOT NULL, options={{}}}
Field{name=amount, description=Purchased Cost, type=DOUBLE NOT NULL, options={{}}}
Field{name=time, description=Purchased Time, type=DATETIME NOT NULL, options={{}}}
Encoding positions:
{item=0, amount=1, time=2}
Options:{{}}UUID: null


Now let's look at some PTransforms that are schema aware.

## Select

First we have `Select`.  This transforms keeps (selects) only the named fields from the input.  This transform will return a `PCollection<Row>` where the `Row` contains only the named fields from the source.

* [JavaDoc: Select](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/Select.html)

In [4]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Select names", Select.fieldNames("name"))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Row: 
name:Bob

Row: 
name:Neil



DONE

## DropFields
An inverse of `Select` is called `DropFields`.  This transform drops the named fields from the input `PCollection`.  The output of the transform is a `PCollection<Row>`.  It can't be anything else since we have created data that doesn't map to a class.

* [JavaDoc: DropFields](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/DropFields.html)

In [5]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Drop names", DropFields.fields("name"))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Row: 
salary:20000.0

Row: 
salary:10000.0



DONE

## RenameFields
We can rename fields using `RenameFields`.  The output of the transform is a `PCollection<Row>`.  It can't be anything else since we have created data that doesn't map to a class.

* [JavaDoc: RenameFields](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/RenameFields.html)

In [6]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Rename salary", RenameFields.<Employee>create().rename("salary", "pay"))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Row: 
name:Bob
pay:20000.0

Row: 
name:Neil
pay:10000.0



DONE

## Filter
Next we will look at `Filter`.  This transform applies a predicate to each element and only the elements that evaluate to true are included in the output PCollection.  In the following example, we filter (keep) only elements that have a salary greater than 10000.  This transform returns a `PCollection<T>` matching the input type.

* [JavaDoc: Filter](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/Filter.html)

In [7]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Filter salaries", Filter.<Employee>create().whereFieldName("salary",
    new SerializableFunction<Double, Boolean>() {
      public Boolean apply(Double amount) {
        return amount > 10000.0;
      }
    }))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Employee: name: Bob, salary: 20000.0


DONE

## Convert
We can convert from one Java object to another as long as they have identical schema.

* [JavaDoc: Convert](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/Convert.html)

In [8]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Convert to rows", Convert.toRows())
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()))
  .apply("Convert from rows", Convert.fromRows(Employee.class))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Row: 
name:Neil
salary:10000.0

Row: 
name:Bob
salary:20000.0

Employee: name: Neil, salary: 10000.0
Employee: name: Bob, salary: 20000.0


DONE

## AddFields
We can add fields to a schema using AddFields.

* [JavaDoc: AddFields](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/AddFields.html)

In [9]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00)
  ))
  .apply("Add fields", AddFields.<Employee>create().field("isManager", Schema.FieldType.BOOLEAN, Boolean.FALSE))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
  
pipeline.run().waitUntilFinish();

Row: 
name:Neil
salary:10000.0
isManager:false

Row: 
name:Bob
salary:20000.0
isManager:false



DONE

## Group
We can group together elements by field using the `Group` transform.  The output of the transform is a `PCollection<Row>` but we need to take a few moments to discuss the schema of the resulting `Row`.

The Row has schema:

```
{
  "key": {
    "type": "ROW"
    "row": {
      "<field1>": "<Field1Type>"
    }
  },
  "value": {
    "type": "ITERABLE,ROW"
  }
}
```

This transform also provides for aggregation including global aggregation.

* [JavaDoc: Group](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/Group.html)

In [10]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00),
    new Employee("Neil", 15000.00)
  ))
  .apply("Group", Group.byFieldNames("name"))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
pipeline.run().waitUntilFinish();

Row: 
key:Row: 
name:Bob

value:[Row: 
name:Bob
salary:20000.0
, ]

Row: 
key:Row: 
name:Neil

value:[Row: 
name:Neil
salary:10000.0
, Row: 
name:Neil
salary:15000.0
, ]



DONE

## CoGroup
* [JavaDoc: CoGroup](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/CoGroup.html)

## Join
* [JavaDoc: Join](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/schemas/transforms/Join.html)

## SqlTransform
There is a separate notebook dedicated to Beam SQL but this is also a good place to illustrate that we don't *have* to input a `PCollection<Row>`.  Any PCollection which has an attached schema can be used as input to the `SqlTransform`.

In [11]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00),
    new Employee("Neil", 15000.00)
  ))
  .apply("SQL", SqlTransform.query("SELECT SUM(salary) as total_salary FROM PCOLLECTION"))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
pipeline.run().waitUntilFinish();

Row: 
total_salary:45000.0



DONE

## JsonToRow
Through the use of schema, we can parse JSON strings.  To use, we define a schema and use the `JsonToRow.withSchema(schema)` method.  This takes as input a `PCollection<String>` and returns a `PCollection<Row>`.

* [JavaDoc: JsonToRow](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/JsonToRow.html)

In [12]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of("{\"item\": \"blue\", \"amount\": 1.23, \"time\": \"2022-12-21\"}"))
  .apply("JSON", JsonToRow.withSchema(saleSchema))
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
pipeline.run().waitUntilFinish();

Row: 
item:blue
amount:1.23
time:2022-12-21T00:00:00.000Z



DONE

## ToJson
We can also convert a `PCollection` which has a schema to a `PCollection<String>` of JSON encodings using the `ToJson` transform.

* [JavaDoc: ToJson](https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/ToJson.html)

In [13]:
var pipeline = Pipeline.create(options);
pipeline
  .apply("Create elements", Create.of(
    new Employee("Neil", 10000.00),
    new Employee("Bob",  20000.00),
    new Employee("Neil", 15000.00)
  ))
  .apply("To JSON", ToJson.of())
  .apply("Print elements", ParDo.of(new LoggingDoFn<>()));
pipeline.run().waitUntilFinish();

{"name":"Neil","salary":15000.0}
{"name":"Bob","salary":20000.0}
{"name":"Neil","salary":10000.0}


DONE

## Schema Options
So far we have seen that a Beam schema describes the meta data of a record.  That meta data is used to describe the *structure* of the record.  Specifically, it defines the fields and their types.  However, we may wish to have other meta data associated with a record.  Examples include:

* Description
* Lineage
* Creation time
* Allowed values for fields (data quality)
* ... many more

To accomodate this, the Beam schema allows us to associate arbitrary *options* with the schema.  An option is defined as a name, type and value.

To obtain the options associated with a schema, we can call the getOptions() method.  This returns a Schema.Options object.

See also:
* [Video: Beam Schema Options](https://www.youtube.com/watch?v=Oi946DJVE7g)

In [35]:
Schema saleSchema = Schema
  .builder()
  .addFields(Schema.Field
    .of("item", Schema.FieldType.STRING)
    .withDescription("Purchased Item")
    .withOptions(Schema.Options.builder().setOption("myFieldOption", Schema.FieldType.STRING, "Hello")))
  .addDoubleField("amount")
  .addDateTimeField("time")
  .setOptions(Schema.Options.builder().setOption("mySchemaOption", Schema.FieldType.STRING, "Hello"))
  .build();
System.out.println(saleSchema);

Fields:
Field{name=item, description=Purchased Item, type=STRING NOT NULL, options={{myFieldOption=Option{type=STRING NOT NULL, value=Hello}}}}
Field{name=amount, description=, type=DOUBLE NOT NULL, options={{}}}
Field{name=time, description=, type=DATETIME NOT NULL, options={{}}}
Encoding positions:
{item=0, amount=1, time=2}
Options:{{mySchemaOption=Option{type=STRING NOT NULL, value=Hello}}}UUID: null
