Skip to content

TensorFrames user guide

ludatabricks edited this page Aug 21, 2018 · 24 revisions

Experimental TensorFlow binding for Scala and Apache Spark.

TensorFrames (TensorFlow on Spark DataFrames) lets you manipulate Apache Spark's DataFrames with TensorFlow programs.

This package is experimental and is provided as a technical preview only. While the interfaces are all implemented and working, there are still some areas of low performance.

Supported platforms:

This package only officially supports linux 64bit platforms as a target. Contributions are welcome for other platforms.

See the file project/Dependencies.scala for adding your own platform.

Officially TensorFrames supports Spark 2.3+ and Scala 2.11.

Core API

The core API provides some primitives to express transformation of DataFrames using TensorFlow programs. These programs can be written in python using the official TensorFlow API, in Scala using TensorFrames, or directly by passing a protocol buffer description of the operations graph.

The most simple way of using TensorFrames in a python program is to import TensorFlow and TensorFrames into PySpark:

import tensorflow as tf
import tensorframes as tfs

Additionally, this guide will make use of the following imports from PySpark:

from pyspark.sql import Row
from pyspark.sql.functions import *

Basic concepts

At its core, TensorFlow expresses operations on tensors: homogeneous data structures that consist in an array and a shape. They can be interpreted as a generalization of vectors and matrices:

  • a scalar (real or integer) is a tensor of dimension zero,
  • a vector is a tensor of one dimension,
  • a matrix is a tensor of two dimensions

and so on. Tensors of an order higher than four are usually less common and they are not well-supported right now in TensorFrames. For more information about tensors, the TensorFlow tutorial contains extensive documentation.

Blocks In Spark SQL, tensors are represented as single values, as arrays or as arrays of arrays (or arrays of arrays of arrays, etc.). Consider the following table with some reals x and some vectors y.

x y
1 [1.1 1.2]
2 [2.1 2.2]
3 [3.1 3.2]

Spark SQL may decide to chunk this table into several pieces to distribute them across a cluster. For example, it may split the previous table into a first block:

x y
1 [1.1 1.2]

and a second block:

x y
2 [2.1 2.2]
3 [3.1 3.2]

TensorFlow is optimized for operations that manipulate large vectors of numbers at a time, and TensorFrames provides most operations in two forms: a row-based version and a block-based version. In the row based version, TensorFrames will work on one row after the other. In our example, it will process the table as follows:

process_row: x = 1, y = [1.1 1.2]
process_row: x = 2, y = [2.1 2.2]
process_row: x = 3, y = [3.1 3.2]

In the block-based version, TensorFrames will consider each column in a block as a single tensor. In our example, a block transform will process the table as follows:

process_block: x = [1], y = [1.1 1.2]
process_block: x = [2 3], y = [[2.1 2.2]
                               [3.1 3.2]]

The block transforms are usually more efficient: there is less overhead in calling TensorFlow, and they can manipulate more data at once. There are a couple of points to take into account, though:

  • the rank of a block is the rank of the elements in a column, incremented by one
  • the leading dimension (the size of the block) is usually unknown because it is selected by Spark and may vary between blocks. This is not an issue with TensorFlow.

Since blocks present such advantages, why offer a row-based interface? In some cases, it is not possible to consider a sequence of rows as a single tensors because the data must be homogeneous. Consider the following table in which each vector has a different size:

x y
1 [1]
2 [1 2]
3 [1 2 3]

Each row is a vector, but each vector has a different size, preventing TensorFlow from considering them all at once. However, each vector can be processed individually.

Shape analysis In some cases, TensorFrames must know in advance the shape of the elements in a table. This only needs to happen at the first ingest, as most shapes are inferred from the output of a TensorFlow program. TensorFrames provides a function to analyze and see the result of the analysis of a DataFrame. We create here a small dataframe:

data = [Row(x=[float(x), float(2 * x)],
	 		key=str(x % 2),
	 		z = float(x+1)) for x in range(1, 6)]
df = sqlContext.createDataFrame(data)

Here is the data:

key x z
1 [1.0, 2.0] 2.0
0 [2.0, 4.0] 3.0
1 [3.0, 6.0] 4.0
0 [4.0, 8.0] 5.0
1 [5.0, 10.0] 6.0

And here is the output from the analysis:

tfs.print_schema(df)
root
 |-- key: string (nullable = true) <no tensor info>
 |-- x: array (nullable = true) double[?,?]
 |-- z: double (nullable = true) double[?]

TensorFrames has correctly interpreted z to be rows of doubles, hence represented as vectors when they are packed as blocks. The ? means that the exact value of the dimension is unknown. The x column was found to be vectors of doubles (hence making matrices when they are put together in blocks). Both the number of row and columns of these blocks is unkown.

The analyze() method can be used to gather more information about the shape. It performs a full analysis of each element in the dataframe and checks that all the shapes are homogeneous, and reports any issue. As such, it is relatively expensive to run.

df2 = tfs.analyze(df)
tfs.print_schema(df2)
root
 |-- key: string (nullable = true) <no tensor info>
 |-- x: array (nullable = true) double[1,2]
 |-- z: double (nullable = true) double[1]

In this case, all the ? have been replaced by some numbers. The first number (1) means that Spark represents our dataframe as a collection of 5 blocks, each of which contains exactly one row. This is not very efficient... See that for the x column, TensorFrames has correctly found that all vectors in this column all have exactly 2 elements. This information will let TensorFlow run more efficient algorithms on aligned data.

In most cases, since we know the input dataframe, we can automatically infer the shape of the elements coming in. See tfs.row and tfs.block for more information.

Mapping

The mapping operation adds some extra columns based on the input dataframe. Here is an example with rows:

data = [Row(x=float(x)) for x in range(5)]
df = sqlContext.createDataFrame(data)
with tf.Graph().as_default() as g:
    # The placeholder that corresponds to column 'x'
    x = tf.placeholder(tf.double, shape=[None], name="x")
    # The output that adds 3 to x
    z = tf.add(x, 3, name='z')
    # The resulting dataframe
    df2 = tfs.map_blocks(z, df)

df2.show()
+---+---+
|  z|  x|
+---+---+
|3.0|0.0|
|4.0|1.0|
|5.0|2.0|
|6.0|3.0|
|7.0|4.0|
+---+---+

In the previous example, we manually built the placeholder with the proper shape of the block. This placeholder can be inferred automatically from the dataframe using the block function:

with tf.Graph().as_default() as g:
    x = tfs.block(df, "x")
    z = tf.add(x, 3, name='z')
    df2 = tfs.map_blocks(z, df)

df2.show()

We can also use the row-wise interface that works one row at a time. This program performs a similar transform.

with tf.Graph().as_default() as g:
    # The placeholder that corresponds to rows in column 'x'
    x = tf.placeholder(tf.double, shape=[], name="x")
    z = tf.add(x, 3, name='z')
    df2 = tfs.map_rows(z, df)

Again, instead of writing the placeholder ourselves, we can infer it from the content of the column:

with tf.Graph().as_default() as g:
    # The placeholder that corresponds to rows in column 'x'
    x = tfs.row(df, "x")
    z = tf.add(x, 3, name='z')
    df2 = tfs.map_rows(z, df)

Reducing

Reduction operations coalesce a pair or a collection of rows and transform them into a single row, until there is one row left. Under the hood, TensorFrames minimizes the data transfer between computers by reducing all the rows on each computer and then performing the reduction of the reminder.

The transforms must be algebraic: the order in which they are done should not matter. In mathematical terms, given some function f and some function inputs a, b, c, the following must hold:

f(f(a, b), c) == f(a, f(b, c))

Some examples of algebraic reductions are sum, min, count. One typical non-algebraic transform is mean, which is the division of sum and count. Usually, most numerical reductions can be transformed into algebraic reductions with some additional pre-processing and post-processing. In the case of mean, the sum and the count can be computed separately and then divided.

TensorFrames uses naming conventions to figure out how to feed the TensorFlow program. Here is a simple example that computes the sum of a column by repeatedly summing pairs of columns together:

data = [Row(x=float(x)) for x in range(5)]
df = sqlContext.createDataFrame(data)

with tf.Graph().as_default() as g:
    # The placeholders that correspond to column 'x'.
    # Note the convention of calling them with '_1' and '_2'
    x_1 = tf.placeholder(tf.double, shape=[], name="x_1")
    x_2 = tf.placeholder(tf.double, shape=[], name="x_2")
    # We sum the two inputs.
    x = tf.add(x_1, x_2, name='x')
    # The resulting number
    res = tfs.reduce_rows(x, df)

This program executes immediately and returns a single value, or a list of values if multiple columns are requested.

10.0

The same transform can be expressed in a vector form:

with tf.Graph().as_default() as g:
    # The placeholder that corresponds to column 'x'
    # Note the convention of calling it with a '_input' suffix.
    x_input = tf.placeholder(tf.double, shape=[None], name="x_input")
    x = tf.reduce_sum(x_input, name='x')
    res = tfs.reduce_blocks(x, df)

Again, the same programs can be written with the automatic extraction of the placeholders. That program has an additional advantage: it works for any numerical data of any type (not only doubles), without having to perform some changes in the code.

with tf.Graph().as_default() as g:
    # We supply the name of the input as a second argument.
    x_input = tfs.block(df, "x", tf_name="x_input")
    x = tf.reduce_sum(x_input, name='x')
    res = tfs.reduce_blocks(x, df)

Aggregation

Here is an example that computes the harmonic mean of same values, aggregated by a key. Since we are using TensorFrames, all the operations will be vectorized under the hood. The following program works for any tensor shape at the input (reals, vectors, matrices):

data = [Row(x=[float(x), float(2 * x)], key=str(x % 2)) for x in range(1, 6)]
# The rest of the example works without modification if we replace the data with scalars:
# data = [Row(x=float(x), key=str(x % 2)) for x in range(1, 6)]
# The analysis is not required if x is a real
df = tfs.analyze(sqlContext.createDataFrame(data))
col_name = "x"
col_key = "key"

We first compute the counts and element-wise inverses of the elements:

with tf.Graph().as_default() as g:
    x = tfs.block(df, col_name)
    invs = tf.divide(tf.to_double(1.0), tf.to_double(x), name="invs")
    df2 = tfs.map_blocks([invs, tf.ones_like(invs, name="count")], df)

Then we gather all the data by key and compute the sum of the inverses, as well as the counts for each key. The computation is distributed, including within a key: TensorFrames works block by block, without having to coalesce all the values of a key on a single machine.

gb = df2.select(col_key, "invs", "count").groupBy(col_key)
with tf.Graph().as_default() as g:
	# Look at the documentation of tfs.aggregate for the naming conventions of the placeholders.
    x_input = tfs.block(df2, "invs", tf_name="invs_input")
    count_input = tfs.block(df2, "invs", tf_name="count_input")
    x = tf.reduce_sum(x_input, [0], name='invs')
    count = tf.reduce_sum(count_input, [0], name='count')
    df3 = tfs.aggregate([x, count], gb)

The final values can now be computed. Up to this point, all the transforms have been lazy and the final show() will trigger the computations.

with tf.Graph().as_default() as g:
    invs = tfs.block(df2, "invs")
    count = tfs.block(df2, "count")
    geom_mean = tf.div(tf.to_double(count), invs,  name = "harmonic_mean")
    df4 = tfs.map_blocks(geom_mean, df3).select("key", "harmonic_mean")

df4.show()
+---+---------------------------------------+
|key|harmonic_mean                          |
+---+---------------------------------------+
|0  |[2.6666666666666665, 5.333333333333333]|
|1  |[1.956521739130435, 3.91304347826087]  |
+---+---------------------------------------+

And we find out as expected that the harmonic mean is invariant through scaling. This example can be compared with the UDAF example in Scala that performs a similar task. There are a few differences:

  • the TensorFrames program cannot be registered as a SQL function
  • it works on any tensor shapes without change to the code
  • it can use efficient vectorized instructions without having to program them
Clone this wiki locally