In [1]:
import pandas as pd
import pyarrow.parquet as pq

# Read the first CSV file
dataset_train_df = pd.read_csv("train.csv")

# Read the second CSV file
dataset_supplemental_df = pd.read_csv("supplemental_metadata.csv")

# Concatenate the two dataframes
dataset_df = pd.concat([dataset_train_df, dataset_supplemental_df], ignore_index=True)

# Save the combined CSV file
dataset_df.to_csv("train_full.csv", index=False)

In [2]:
# Read the first row of the DataFrame
path, sequence_id, file_id, phrase = dataset_df.iloc[0][
    ["path", "sequence_id", "file_id", "phrase"]
]
print(f"path: {path}, sequence_id: {sequence_id}, file_id: {file_id}, phrase: {phrase}")

sample_sequence_df = pq.read_table(
    f"{str(path)}",
    filters=[
        [("sequence_id", "=", sequence_id)],
    ],
).to_pandas()
print("Full sequence dataset shape is {}".format(sample_sequence_df.shape))

path: train_landmarks/5414471.parquet, sequence_id: 1816796431, file_id: 5414471, phrase: 3 creekhouse
Full sequence dataset shape is (123, 1630)


In [3]:
# Read the total amount unique files
unique_paths = dataset_df["path"].unique()

sum = unique_paths.shape[0]

print("Total number of files: {}".format(sum))

Total number of files: 121


In [4]:
LIP = [
    61, 185, 40, 39, 37, 267, 269, 270, 409,
    291, 146, 91, 181, 84, 17, 314, 405, 321, 375,
    78, 191, 80, 81, 82, 13, 312, 311, 310, 415,
    95, 88, 178, 87, 14, 317, 402, 318, 324, 308,
]

FACE = [f'x_face_{i}' for i in LIP] + [f'y_face_{i}' for i in LIP] + [f'z_face_{i}' for i in LIP]
LHAND = [f'x_left_hand_{i}' for i in range(21)] + [f'y_left_hand_{i}' for i in range(21)] + [f'z_left_hand_{i}' for i in range(21)]
RHAND = [f'x_right_hand_{i}' for i in range(21)] + [f'y_right_hand_{i}' for i in range(21)] + [f'z_right_hand_{i}' for i in range(21)]
POSE = [f'x_pose_{i}' for i in range(33)] + [f'y_pose_{i}' for i in range(33)] + [f'z_pose_{i}' for i in range(33)]

SEL_COLS = FACE + LHAND + RHAND + POSE
FRAME_LEN = 128

In [26]:
from functools import partial
import gc
import multiprocessing as mp
import tensorflow as tf
import numpy as np
import pyarrow.parquet as pq
from multiprocessing import Pool, Manager
from tqdm import tqdm
from tqdm.notebook import tqdm_notebook
from pathlib import Path

# Create a Manager object for the progress_queue
manager = Manager()
progress_queue = manager.Queue()

def process_file(file_id):
    file_df = dataset_df.loc[dataset_df["file_id"] == file_id]
    path = file_df["path"].values[0]
    parquet_df = pq.read_table(path, columns=["sequence_id"] + SEL_COLS).to_pandas()

    tf_file = f"preprocessed/{file_id}.tfrecord"
    parquet_numpy = parquet_df.to_numpy(copy=False)

    col_to_index = {col: i for i, col in enumerate(parquet_df.columns)}

    LHAND_indices = [col_to_index[col] for col in LHAND]
    RHAND_indices = [col_to_index[col] for col in RHAND]

    buffer_size = 1000  # Adjust as needed
    buffer = []

    with tf.io.TFRecordWriter(tf_file) as file_writer:
        for seq_id, phrase in zip(file_df["sequence_id"], file_df["phrase"]):
            frames = parquet_numpy[parquet_df.index == seq_id]
            progress_queue.put(
                f"Thread: {mp.current_process().name}, File: {file_id}, Sequence: {seq_id}"
            )

            # Calculate the number of NaN values in each hand landmark
            r_nonan = np.sum(np.sum(np.isnan(frames[:, RHAND_indices]), axis=1) == 0)
            l_nonan = np.sum(np.sum(np.isnan(frames[:, LHAND_indices]), axis=1) == 0)
            no_nan = max(r_nonan, l_nonan)

            if 2 * len(phrase) < no_nan:
                features = {
                    COL: tf.train.Feature(
                        float_list=tf.train.FloatList(
                            value=frames[:, col_to_index[COL]]
                        )
                    )
                    for COL in SEL_COLS
                }
                features["phrase"] = tf.train.Feature(
                    bytes_list=tf.train.BytesList(value=[bytes(phrase, "utf-8")])
                )

                example = tf.train.Example(features=tf.train.Features(feature=features))
                record_bytes = example.SerializeToString()

                buffer.append(record_bytes)
                if len(buffer) == buffer_size:
                    for record in buffer:
                        file_writer.write(record)
                    buffer = []

        if buffer:
            for record in buffer:
                file_writer.write(record)

        # gc.collect()


# cpu_count = int(mp.cpu_count() / 2)
cpu_count = 8

with Pool(cpu_count) as pool:
    progress_bars = [tqdm_notebook(desc=f"Thread {i + 1}", unit="seq") for i in range(cpu_count)]

    for result in pool.imap(
        process_file,
        dataset_df["file_id"].unique(),
    ):
        progress_updates = []
        while not progress_queue.empty():
            progress_updates.append(progress_queue.get())
        for update, bar in zip(progress_updates, progress_bars):
            bar.set_description(update)
            bar.update()

Thread 1: 0seq [00:00, ?seq/s]

Thread 2: 0seq [00:00, ?seq/s]

Thread 3: 0seq [00:00, ?seq/s]

Thread 4: 0seq [00:00, ?seq/s]

Thread 5: 0seq [00:00, ?seq/s]

Thread 6: 0seq [00:00, ?seq/s]

Thread 7: 0seq [00:00, ?seq/s]

Thread 8: 0seq [00:00, ?seq/s]

KeyboardInterrupt: 

In [None]:
import tensorflow as tf 
raw_dataset = tf.data.TFRecordDataset("preprocessed/5414471.tfrecord")

for raw_record in raw_dataset.take(10):
    print("im here!")
    print(repr(raw_record))

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchdata.datapipes.iter import FileLister, FileOpener, TFRecordLoader

tf_records_directory = 'preprocessed/'

file_lister = FileLister(tf_records_directory, "*.tfrecord") # Lists all tfrecords

file_opener = FileOpener(file_lister, mode="b") # Opens files, b for binary mode

tfrecord_loader = TFRecordLoader(file_opener) # Load and decodes tfrecords

# Process an example
def process_tf_examples(example):
    # Perform even more preprocessing
    # Extract things
    return example

from torch.utils.data import DataLoader

# Create a DataLoader
dataloader = DataLoader(tfrecord_loader, batch_size=32, num_workers=4)

# Now you can iterate over the dataloader
for batch in dataloader:
    # Each batch is a tuple of (data, target)
    data, target = batch
    # Now you can use the data and target for further steps

# Notes

import tensorflow_addons as tfa
pbar = tfa.callbacks.TQDMProgressBar()
model.fit(…,callbacks=[pbar])
# TQDMProgressBar() also works with evaluate()
model.evaluate(…,callbacks=[pb

# Multiprocessing

with Pool(workers) as pool:
    results = list(tqdm(pool.imap(worker,thread_list, total=len(thread_list))
                        ar])