In [1]:
!pip install tensorflow_io

Collecting tensorflow_io
  Downloading tensorflow_io-0.37.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Downloading tensorflow_io-0.37.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (49.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 MB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorflow_io
Successfully installed tensorflow_io-0.37.1


In [2]:
import numpy as np
import pandas as pd

data = {'label': np.random.binomial(1, 0.5, 10)}
data['x0'] = np.random.randn(10) + 5 * data['label']
data['x1'] = np.random.randn(10) + 5 * data['label']

df = pd.DataFrame(data)

print(df.head())

   label        x0        x1
0      0 -0.003454  0.434645
1      0  0.070342  0.143294
2      1  4.431127  5.518084
3      0  1.162140 -0.817346
4      0 -0.158896  1.471649


In [None]:
import tensorflow_io.arrow as arrow_io

ds = arrow_io.ArrowDataset.from_pandas(
    df,
    batch_size=2,
    preserve_index=False)

# Make an iterator to the dataset
ds_iter = iter(ds)

# Print the first batch
print(next(ds_iter))

In [None]:

import tensorflow_io.arrow as arrow_io
from pyarrow.feather import write_feather

# Write the Pandas DataFrame to a Feather file
write_feather(df, '/path/to/df.feather')

# Create the dataset with one or more filenames
ds = arrow_io.ArrowFeatherDataset(
    ['/path/to/df.feather'],
    columns=(0, 1, 2),
    output_types=(tf.int64, tf.float64, tf.float64),
    output_shapes=([], [], []))

# Iterate over each row of each file
for record in ds:
   label, x0, x1 = record
   # use label and feature tensors

In [None]:
import tensorflow_io.arrow as arrow_io

ds = arrow_io.ArrowStreamDataset.from_pandas(
    df,
    batch_size=2,
    preserve_index=False)

In [None]:
def model_fit(ds):
  """Create and fit a Keras logistic regression model."""

  # Build the Keras model
  model = tf.keras.Sequential()
  model.add(tf.keras.layers.Dense(1, input_shape=(2,),
            activation='sigmoid'))
  model.compile(optimizer='sgd', loss='mean_squared_error',
                metrics=['accuracy'])

  # Fit the model on the given dataset
  model.fit(ds, epochs=5, shuffle=False)
  return model

In [None]:
def read_and_process(filename):
  """Read the given CSV file and yield processed Arrow batches."""

  # Read a CSV file into an Arrow Table with threading enabled and
  # set block_size in bytes to break the file into chunks for granularity,
  # which determines the number of batches in the resulting pyarrow.Table
  opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096)
  table = pyarrow.csv.read_csv(filename, opts)

  # Fit the feature transform
  df = table.to_pandas()
  scaler = StandardScaler().fit(df[['x0', 'x1']])

  # Iterate over batches in the pyarrow.Table and apply processing
  for batch in table.to_batches():
    df = batch.to_pandas()

    # Process the batch and apply feature transform
    X_scaled = scaler.transform(df[['x0', 'x1']])
    df_scaled = pd.DataFrame({'label': df['label'],
                              'x0': X_scaled[:, 0],
                              'x1': X_scaled[:, 1]})
    batch_scaled = pa.RecordBatch.from_pandas(df_scaled, preserve_index=False)

    yield batch_scaled

In [None]:
def make_local_dataset(filename):
  """Make a TensorFlow Arrow Dataset that reads from a local CSV file."""

  # Read the local file and get a record batch iterator
  batch_iter = read_and_process(filename)

  # Create the Arrow Dataset as a stream from local iterator of record batches
  ds = arrow_io.ArrowStreamDataset.from_record_batches(
    batch_iter,
    output_types=(tf.int64, tf.float64, tf.float64),
    output_shapes=(tf.TensorShape([]), tf.TensorShape([]), tf.TensorShape([])),
    batch_mode='auto',
    record_batch_iter_factory=partial(read_and_process, filename))

  # Map the dataset to combine feature columns to single tensor
  ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
  return ds

In [None]:
ds = make_local_dataset(filename)
model = model_fit(ds)

print("Fit model with weights: {}".format(model.get_weights()))

In [1]:
def read_and_process_dir(directory):
  """Read a directory of CSV files and yield processed Arrow batches."""

  for f in os.listdir(directory):
    if f.endswith(".csv"):
      filename = os.path.join(directory, f)
      for batch in read_and_process(filename):
        yield batch

In [2]:
def serve_csv_data(ip_addr, port_num, directory):
  """
  Create a socket and serve Arrow record batches as a stream read from the
  given directory containing CVS files.
  """

  # Create the socket
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.bind((ip_addr, port_num))
  sock.listen(1)

  # Serve forever, each client will get one iteration over data
  while True:
    conn, _ = sock.accept()
    outfile = conn.makefile(mode='wb')
    writer = None
    try:

      # Read directory and iterate over each batch in each file
      batch_iter = read_and_process_dir(directory)
      for batch in batch_iter:

        # Initialize the pyarrow writer on first batch
        if writer is None:
          writer = pa.RecordBatchStreamWriter(outfile, batch.schema)

        # Write the batch to the client stream
        writer.write_batch(batch)

    # Cleanup client connection
    finally:
      if writer is not None:
        writer.close()
      outfile.close()
      conn.close()
  sock.close()

In [3]:
def make_remote_dataset(endpoint):
  """Make a TensorFlow Arrow Dataset that reads from a remote Arrow stream."""

  # Create the Arrow Dataset from a remote host serving a stream
  ds = arrow_io.ArrowStreamDataset(
      [endpoint],
      columns=(0, 1, 2),
      output_types=(tf.int64, tf.float64, tf.float64),
      output_shapes=(tf.TensorShape([]), tf.TensorShape([]), tf.TensorShape([])),
      batch_mode='auto')

  # Map the dataset to combine feature columns to single tensor
  ds = ds.map(lambda l, x0, x1: (tf.stack([x0, x1], axis=1), l))
  return ds