In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib widget

In [2]:
import os
import time
from pathlib import Path
from io import BytesIO
import math

import torch
import zstandard as zstd
from torch import Tensor
from tqdm import tqdm
from PIL import Image
import pandas as pd
import numpy as np
from torchvision import transforms as T
import pyarrow as pa
import pyarrow.parquet as pq

from dataloader import XPlaneVideoDataset
from utils import frame2bytes

In [4]:
schema = pa.schema([
    ("video_name", pa.string()),
    ("frame_bytes", pa.binary()),
    ("frame_id", pa.int64()),
    ("sim_time", pa.float64()),
    ("state", pa.list_(pa.float64(), 11)),
    ("time_of_day", pa.string()),
    ("time_since_midnight", pa.float64()),
    ("weather_cloud_cover", pa.list_(pa.float64(), 3)),
    ("weather_cloud_type", pa.list_(pa.float64(), 3)),
    ("weather_rain_snow_none", pa.string()),
    ("weather_rain_percent", pa.float64()),
])

In [21]:
#transform = T.Compose([T.Lambda(lambda x: x.transpose(-1, -3)), T.Resize((360, 640))])
transform = T.Compose([T.Lambda(lambda x: x.permute((2, 0, 1)))])
ds = XPlaneVideoDataset(
    Path("~/datasets/xplane_recording").expanduser(), output_full_data=True, transform=transform
)

def custom_collate_fn(batch):
    frame, data, weather = zip(*batch)
    mask = [
        f is not None and d is not None and w is not None for (f, d, w) in zip(frame, data, weather)
    ]
    frame = [f for (f, m) in zip(frame, mask) if m]
    data = [d for (d, m) in zip(data, mask) if m]
    weather = [w for (w, m) in zip(weather, mask) if m]
    frames = [frame2bytes(f) for f in frame]
    data = {k: [d[k] for d in data] for k in data[0].keys()}
    weather = {k: [w[k] for w in weather] for k in weather[0].keys()}
    return frames, data, weather


dl = torch.utils.data.DataLoader(ds, batch_size=128, collate_fn=custom_collate_fn, num_workers=8)

[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719e1d6ec0] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719866cd00] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719e1d6ec0] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719ea08600] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719de3f280] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719e1d6ec0] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719e1d6ec0] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x55719e2f4dc0] moov atom not found


Done initializing XPlaneVideoDataset.


In [6]:
db_file = Path("/mnt/Storage2/xplane_dataset_gs1.parquet")
writer = pq.ParquetWriter(db_file, schema)
try:
    for frames, datas, weathers in tqdm(dl, total=len(dl)):
        df = pd.DataFrame.from_dict(
            {
                "video_name": datas["video_name"],
                "frame_bytes": frames,
                "frame_id": datas["frame_id"],
                "sim_time": datas["sim_time"],
                "state": datas["state"],
                "time_of_day": weathers["time_of_day"],
                "time_since_midnight": weathers["time_since_midnight"],
                "weather_cloud_cover": weathers["cloud_cover"],
                "weather_cloud_type": weathers["cloud_type"],
                "weather_rain_snow_none": weathers["rain_snow_none"],
                "weather_rain_percent": weathers["rain_percent"],
            }
        )
        datum = pa.Table.from_pandas(df, schema=schema)
        writer.write_table(datum, row_group_size=1)
finally:
    writer.close()

  0%|          | 0/1949 [00:00<?, ?it/s]

100%|██████████| 1949/1949 [29:43<00:00,  1.09it/s] 


# Reading

In [12]:
db_file = Path("~/datasets/xplane_datset.parquet").expanduser()
reader = pq.ParquetFile(db_file)


In [13]:
from parquet_dataloader import ParquetXPlaneVideoDataset, ParquetXPlaneVideoDataLoader

In [14]:
ds2 = ParquetXPlaneVideoDataset(db_file, transform=None)

In [19]:
import random
t = time.time()
for _ in range(1000):
    idx = random.randint(0, 249438 - 1)
    a = ds2[idx]
print(f"{(time.time() - t) / 1e3:.4e} s")

5.1722e-03 s


In [20]:
dl2 = ParquetXPlaneVideoDataLoader(db_file)

In [22]:
t = time.time()
for _ in range(100):
    ridx = random.randint(0, 249438 - 200)
    X, y = dl2.get_range(ridx, ridx + 128)
print(f"{(time.time() - t) / 100:.4e} s")

5.0520e-01 s


In [3]:
ds = XPlaneVideoDataset(Path("~/datasets/xplane_recording").expanduser(), transform=None)


def custom_collate_fn(batch):
    frames, states = zip(*batch)
    mask = [f is not None and s is not None for (f, s) in zip(frames, states)]
    frames = [f for (f, m) in zip(frames, mask) if m]
    states = [s for (s, m) in zip(states, mask) if m]
    return torch.stack(frames), torch.stack(states).to(torch.float32)


dl = torch.utils.data.DataLoader(
    ds, batch_size=128, collate_fn=custom_collate_fn, num_workers=8, shuffle=True
)
it = iter(dl)
t = time.time()
for _ in range(100):
    batch = next(it)
print(f"{(time.time() - t) / 100:.4e} s")

[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b93e7dac0] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b8dc57e40] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b93e93140] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b93ea8880] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b8e419440] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b8e419440] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b8e419440] moov atom not found
[mov,mp4,m4a,3gp,3g2,mj2 @ 0x560b93edeb40] moov atom not found


Done initializing XPlaneVideoDataset.
4.2104e-01 s
