In [None]:
import sys
sys.path.append('../')

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

from tensorflow.python.client import device_lib
print([dev.name for dev in device_lib.list_local_devices()])

In [43]:
import subprocess
import numpy as np
import tensorflow as tf
from multiprocessing.shared_memory import SharedMemory

class SamplesService:
    def __init__(self, batch_size):
        """
        Initializes the SamplesService class.

        Args:
            batch_size: The batch size to use.
        """

        size = batch_size * 3 * 12 * 8

        # Create the shared memory file.
        self.shmem = SharedMemory(create=True, size=size)

        # Start the program subprocess.
        args = [
            "../tools/target/release/tools",
            "samples-service",
            "--inputs=../tools/pqr.csv",
            "--shmem=" + self.shmem.name,
            "--batch-size=" + str(batch_size),
            "--feature-set=basic",
            "pqr"
        ]
        self.program = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.PIPE)

        # Initialize the numpy array using the shared memory as buffer.
        self.data = np.frombuffer(
            buffer=self.shmem.buf,
            dtype=np.uint64
        ).reshape((batch_size, 3 * 12))

    def next_batch(self):
        """
        Gets the next batch of samples.

        Returns:
            A TensorFlow tensor containing the next batch of samples.
        """

        # Wait until there is a byte in the stdout of the program (meaning data is ready).
        a = self.program.stdout.read(1)

        # Create a TensorFlow tensor using the numpy array.
        with tf.device('/GPU:0'):
            tensor = tf.constant(self.data, dtype=tf.uint64)

        # Write a byte into the program's stdin, so it can start working on the next batch.
        self.program.stdin.write(b'\x00')
        self.program.stdin.flush()

        # Return the TensorFlow tensor.
        return tensor

    def __del__(self):
        """
        Destructor.
        """
        print("Samples service cleanup")

        # Kill the subprocess and close the shared memory.
        self.data = None
        self.program.kill()
        self.program.wait()
        self.shmem.close()

a = SamplesService(batch_size=4096)

from tqdm import tqdm

print(a.next_batch())
print(a.next_batch())

for i in tqdm(range(10000)):
    a.next_batch()

a = None

tf.Tensor(
[[       274878169856                   0           134218752 ...
  2594073385365405696                   0 4611686018427387904]
 [            4235520          8589934592                   0 ...
  2377900603251621888    9007199254740992 9223372036854775808]
 [          270714112              262144                   0 ...
   720575940379279360                   0      17592186044416]
 ...
 [            4235520          8589934592                   0 ...
  2377900603251621888    9007199254740992 9223372036854775808]
 [          270714112              262144                   0 ...
   720575940379279360                   0      17592186044416]
 [                  0                   0                   0 ...
                    0                   0                   0]], shape=(4096, 36), dtype=uint64)
tf.Tensor(
[[          135324160         68719478784              524288 ...
  2305843009213693952          4294967296 4611686018427387904]
 [           67299840             83

  3%|▎         | 292/10000 [00:06<03:42, 43.72it/s]


KeyboardInterrupt: 