In [None]:
def get_char_dict():
    char_dict = {
        " ": 0,
        "!": 1,
        "#": 2,
        "$": 3,
        "%": 4,
        "&": 5,
        "'": 6,
        "(": 7,
        ")": 8,
        "*": 9,
        "+": 10,
        ",": 11,
        "-": 12,
        ".": 13,
        "/": 14,
        "0": 15,
        "1": 16,
        "2": 17,
        "3": 18,
        "4": 19,
        "5": 20,
        "6": 21,
        "7": 22,
        "8": 23,
        "9": 24,
        ":": 25,
        ";": 26,
        "=": 27,
        "?": 28,
        "@": 29,
        "[": 30,
        "_": 31,
        "a": 32,
        "b": 33,
        "c": 34,
        "d": 35,
        "e": 36,
        "f": 37,
        "g": 38,
        "h": 39,
        "i": 40,
        "j": 41,
        "k": 42,
        "l": 43,
        "m": 44,
        "n": 45,
        "o": 46,
        "p": 47,
        "q": 48,
        "r": 49,
        "s": 50,
        "t": 51,
        "u": 52,
        "v": 53,
        "w": 54,
        "x": 55,
        "y": 56,
        "z": 57,
        "~": 58,
    }
    char_dict["P"] = 59
    char_dict["SOS"] = 60
    char_dict["EOS"] = 61
    return char_dict


class Constants:
    ROWS_PER_FRAME = 543
    MAX_STRING_LEN = 50
    INPUT_PAD = -100.0
    char_dict = get_char_dict()
    LABEL_PAD = char_dict["P"]
    inv_dict = {v: k for k, v in char_dict.items()}
    NOSE = [1, 2, 98, 327]
    LIP = [
        0,
        61,
        185,
        40,
        39,
        37,
        267,
        269,
        270,
        409,
        291,
        146,
        91,
        181,
        84,
        17,
        314,
        405,
        321,
        375,
        78,
        191,
        80,
        81,
        82,
        13,
        312,
        311,
        310,
        415,
        95,
        88,
        178,
        87,
        14,
        317,
        402,
        318,
        324,
        308,
    ]

    REYE = [33, 7, 163, 144, 145, 153, 154, 155, 133, 246, 161, 160, 159, 158, 157, 173]
    LEYE = [263, 249, 390, 373, 374, 380, 381, 382, 362, 466, 388, 387, 386, 385, 384, 398]

    LHAND = list(range(468, 489))
    RHAND = list(range(522, 543))

    LNOSE = [98]
    RNOSE = [327]

    LLIP = [84, 181, 91, 146, 61, 185, 40, 39, 37, 87, 178, 88, 95, 78, 191, 80, 81, 82]
    RLIP = [
        314,
        405,
        321,
        375,
        291,
        409,
        270,
        269,
        267,
        317,
        402,
        318,
        324,
        308,
        415,
        310,
        311,
        312,
    ]
    POSE = [500, 502, 504, 501, 503, 505, 512, 513]
    LPOSE = [513, 505, 503, 501]
    RPOSE = [512, 504, 502, 500]

    POINT_LANDMARKS_PARTS = [LHAND, RHAND, LLIP, RLIP, LPOSE, RPOSE, NOSE, REYE, LEYE]
    # POINT_LANDMARKS_PARTS = [LHAND, RHAND, NOSE]
    POINT_LANDMARKS = [item for sublist in POINT_LANDMARKS_PARTS for item in sublist]
    parts = {
        "LLIP": LLIP,
        "RLIP": RLIP,
        "LHAND": LHAND,
        "RHAND": RHAND,
        "LPOSE": LPOSE,
        "RPOSE": RPOSE,
        "LNOSE": LNOSE,
        "RNOSE": RNOSE,
        "REYE": REYE,
        "LEYE": LEYE,
    }

    LANDMARK_INDICES = {}  # type: ignore
    for part in parts:
        LANDMARK_INDICES[part] = []
        for landmark in parts[part]:
            if landmark in POINT_LANDMARKS:
                LANDMARK_INDICES[part].append(POINT_LANDMARKS.index(landmark))

    CENTER_LANDMARKS = LNOSE + RNOSE
    CENTER_INDICES = LANDMARK_INDICES["LNOSE"] + LANDMARK_INDICES["RNOSE"]

    NUM_NODES = len(POINT_LANDMARKS)
    NUM_INPUT_FEATURES = 2 * NUM_NODES
    CHANNELS = 6 * NUM_NODES


In [None]:

def selected_columns(file_example):
    df = pd.read_parquet(file_example)
    selected_x = df.columns[[x + 1 for x in Constants.POINT_LANDMARKS]].tolist()
    selected_y = [c.replace("x", "y") for c in selected_x]
    selected = []
    for i in range(Constants.NUM_NODES):
        selected.append(selected_x[i])
        selected.append(selected_y[i])
    return selected  # x1,y1,x2,y2,...


In [None]:
import glob
import pandas as pd
import os
import numpy as np
import tensorflow as tf


input_path = "/kaggle/input/asl-fingerspelling/"
output_path = "/kaggle/working/"

def _float_array_feature(value):
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _int_array_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def preprocess():

    files1 = glob.glob(input_path + "train_landmarks/*.parquet")
    files2 = glob.glob(input_path + "supplemental_landmarks/*.parquet")
    files = files1 + files2

    dtrain1 = pd.read_csv(input_path + "train.csv")
    dtrain2 = pd.read_csv(input_path + "supplemental_metadata.csv")
    dtrain = pd.concat([dtrain1, dtrain2])
    # print(dtrain[["file_id", "sequence_id", "participant_id"]].sort_values(by=["participant_id"]))
    # MAX_STRING_LEN = 43

    os.makedirs(output_path + "records/", exist_ok=True)
    fold = 0
    options = tf.io.TFRecordOptions(compression_type="GZIP")
    columns = selected_columns(files[0])
    for file_name in files:
        print(file_name)
        fold += 1
        file_id = file_name.split("/")[-1].split(".")[0]
        df = pd.read_parquet(file_name, columns=columns)
        labels = dtrain[dtrain["file_id"].astype(str) == file_id]
        unique_seqs = df.index.unique()
        output_file = file_name.split("/")[-1].replace("parquet", "tfrecord")
        if "supp" in file_name:
            output_file = "supp_" + output_file
        output_file = output_path + "records/" + output_file

        with tf.io.TFRecordWriter(output_file,options=options) as writer:
            for seq in unique_seqs:
                phrase = labels[labels["sequence_id"] == seq]["phrase"].item()
                label = [Constants.char_dict[x] for x in phrase]
                frames = df.loc[seq]
                # print(file_id, seq, phrase)
                if frames.empty:
                    continue
                frames_numpy = frames.to_numpy().flatten().astype(np.float32)
                features_dict = {
                    "coordinates": _float_array_feature(frames_numpy),
                    "label": _int_array_feature(label),
                }
                features = tf.train.Features(feature=features_dict)
                example_proto = tf.train.Example(features=features)
                example = example_proto.SerializeToString()
                writer.write(example)


In [None]:
preprocess()