In [None]:
from pyspark.sql import SparkSession
# Initialze Spark Session
spark = SparkSession.builder.getOrCreate()

In [None]:
import pyspark.sql.functions as F
from re import sub

In [None]:
# Read in metadata table.
index = spark \
        .read \
        .option("header", True) \
        .csv("s3://w251-asl-data/raw-data/train.csv")

index.show(3, truncate=False)

In [None]:
# Read in raw data, manipulate to include filename as join key, join to index table, drop any rows missing data. 
data = spark.read.parquet("s3://w251-asl-data/raw-data/train_landmark_files/*/*") \
        .withColumn("path", F.input_file_name()) \
        .withColumn("path", F.substring("path", 29, 1000)) \
        .join(index, ["path"], "left") \
        .na.drop() \
        .drop(F.col("z")) \
        .cache()
        
data.show(3, truncate=False)

In [None]:
# Get max frame length
data.select(F.max(F.col("frame"))).show()

In [None]:
data.select("path").distinct().show()

In [None]:
sample = spark.read.parquet("s3://w251-asl-data/raw-data/train_landmark_files/4718/2725590426.parquet", 
                           "s3://w251-asl-data/raw-data/train_landmark_files/16069/100015657.parquet",) \
        .withColumn("path", F.input_file_name()) \
        .withColumn("path", F.substring("path", 29, 1000)) \
        .join(index, ["path"], "left") \
        .na.drop() \
        .drop(F.col("z")) \
        .cache()

# Concat type and landmark index
test = sample.withColumn("landmark", F.concat(F.col("type"), F.lit("-"), F.col("landmark_index"))) \
            .drop("path", "row_id", "type", "landmark_index") \
            .groupBy(["participant_id", "sequence_id", "frame"]) \
            .pivot("landmark").sum("x", "y") \ 
            

In [None]:
for name in test.columns:
    test = test.withColumnRenamed(name, name.replace("_sum", "-"))

In [None]:
test.show()

In [None]:
len(test.columns)