In [102]:
import boto3
import os
from datetime import datetime
import pytz

import re
from tqdm import tqdm
import time
import wandb
from torch.utils.tensorboard import SummaryWriter
import shutil
import json
import base64
import fiftyone as fo
import numpy as np

In [103]:
start_date = datetime(2023, 11, 19)    # year, month, day   
end_date = datetime(2023, 11, 25)      # 25 November 2023: Michigan vs Ohio
cameras_excluded = []

# Path to download the data
storage_target_root = "/media/dbogdoll/Datasets/data_engine_rolling"
data_target = os.path.join(storage_target_root, "data")
os.makedirs(data_target, exist_ok=True)

log_target = os.path.join(storage_target_root, "logs")
os.makedirs(log_target, exist_ok=True)

In [104]:
# Connect to AWS
with open("/home/dbogdoll/mcity_data_engine/.secret", "r") as file:
    for line in file:
        key, value = line.strip().split("=")
        os.environ[key] = value

s3 = boto3.client(
    "s3",
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", None),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", None),
)

In [105]:
cameras = {
    "Geddes_Huron_1",
    "Geddes_Huron_2",
    "Huron_Plymouth_1",
    "Huron_Plymouth_2",
    "Main_stadium_1",
    "Main_stadium_2",
    "Plymouth_Beal",
    "Plymouth_Bishop",
    "Plymouth_EPA",
    "Plymouth_Georgetown",
    "State_Ellsworth_NE",
    "State_Ellsworth_NW",
    "State_Ellsworth_SE",
    "State_Ellsworth_SW",
    "Fuller_Fuller_CT",
    "Fuller_Glazier_1",
    "Fuller_Glazier_2",
    "Fuller_Glen",
    "Dexter_Maple_1",
    "Dexter_Maple_2",
    "Hubbard_Huron_1",
    "Hubbard_Huron_2",
    "Maple_Miller_1",
    "Maple_Miller_2",    
}

cameras_dict = {camera.lower(): {} for camera in cameras}

for id, camera in enumerate(cameras_dict):
    if camera not in cameras_excluded:
        cameras_dict[camera]["id"] = id
        cameras_dict[camera]["aws-sources"] = {}

print(f"Processed {len(cameras_dict)} cameras")

Processed 24 cameras


In [106]:
def process_aws_result(result):
    if 'CommonPrefixes' in result:
        folders = [prefix['Prefix'] for prefix in result['CommonPrefixes']] # Get list of folders from AWS response
        return folders
    else:
        return None

In [107]:
aws_sources = {
    "sip-sensor-data":[""],
    "sip-sensor-data2":["wheeler1/","wheeler2/"],
}

for bucket in tqdm(aws_sources, desc="Processing AWS sources"):
    for folder in aws_sources[bucket]:
        # Get and pre-process AWS data
        result = s3.list_objects_v2(Bucket=bucket, Prefix=folder, Delimiter='/')
        folders = process_aws_result(result)

        #Align folder names with camera names
        folders_aligned = [re.sub(r'(?<!_)(\d)', r'_\1', folder.lower().rstrip('/')) for folder in folders] # Align varying AWS folder names with camera names
        folders_aligned = [folder.replace('fullerct', 'fuller_ct') for folder in folders_aligned]  # Replace "fullerct" with "fuller_ct" to align with camera names
        folders_aligned = [folder.replace('fullser', 'fuller') for folder in folders_aligned]  # Fix "fullser" typo in AWS

        # Check cameras for AWS sources
        if folders:
            for camera_name in cameras_dict:
                for folder_name, folder_name_aligned in zip(folders, folders_aligned):
                    if camera_name in folder_name_aligned and "gs_" in folder_name_aligned: # gs_ is the prefix used in AWS
                        aws_source = f"{bucket}/{folder_name}"
                        if aws_source not in cameras_dict[camera_name]["aws-sources"]:
                            cameras_dict[camera_name]["aws-sources"][aws_source] = {}
        else:
            print(f"AWS did not return a list of folders for {bucket}/{folder}")
            print(result)



Processing AWS sources: 100%|██████████| 2/2 [00:00<00:00,  6.36it/s]


In [108]:
def select_data():
    n_files_to_download = 0
    download_size_bytes = 0
    for camera in tqdm(cameras_dict, desc="Looking for data entries in range"):
        for aws_source in cameras_dict[camera]["aws-sources"]:
            bucket = aws_source.split("/")[0]
            prefix_camera = "/".join(aws_source.split("/")[1:])
            result = s3.list_objects_v2(Bucket=bucket, Prefix=prefix_camera, Delimiter='/')
            folders_day = process_aws_result(result)        # Each folder represents a day
            for folder_day in folders_day:
                date = folder_day.split('/')[-2]
                timestamp = datetime.strptime(date, '%Y-%m-%d')
                if start_date <= timestamp <= end_date:     # Only collect data within the date range
                    cameras_dict[camera]["aws-sources"][aws_source][date] = {}
                    result = s3.list_objects_v2(Bucket=bucket, Prefix=folder_day, Delimiter='/')
                    folders_hour = process_aws_result(result)    # Each folder represents an hour
                    for folder_hour in folders_hour:
                        result = s3.list_objects_v2(Bucket=bucket, Prefix=folder_hour, Delimiter='/')
                        files = result["Contents"]
                        for file in files:
                            n_files_to_download += 1
                            download_size_bytes += file["Size"]
                            file_name = os.path.basename(file["Key"])
                            cameras_dict[camera]["aws-sources"][aws_source][date][file_name] = {}
                            cameras_dict[camera]["aws-sources"][aws_source][date][file_name]["key"] = file["Key"]
                            cameras_dict[camera]["aws-sources"][aws_source][date][file_name]["size"] = file["Size"]
                            cameras_dict[camera]["aws-sources"][aws_source][date][file_name]["date"] = file["LastModified"].strftime("%Y-%m-%d %H:%M:%S")

                            # For testing purposes
                            if n_files_to_download > 10:
                                return n_files_to_download, download_size_bytes
    print(f"Found {n_files_to_download} files to download")
    return n_files_to_download, download_size_bytes

In [109]:
n_files_to_download, download_size_bytes = select_data()

Looking for data entries in range:   0%|          | 0/24 [00:00<?, ?it/s]


In [110]:
# Safety checks prior to downloading
passed_checks = True

# Check if each camera was assigned at least one AWS source
for camera in cameras_dict:
    if len(cameras_dict[camera]["aws-sources"]) == 0:
        print(f"Camera {camera} has no AWS sources")
        passed_checks = False

# Check if the total size of the data to download is within the limit
MAX_SIZE_TB = 1.5
total_size_tb = download_size_bytes / (1024 ** 4)
if total_size_tb > MAX_SIZE_TB:
    print(f"Total size {total_size_tb} TB exceeds {MAX_SIZE_TB} TB")
    passed_checks = False

# Check if the date range is within the limit
MAX_DAYS = 7
days_delta = (end_date - start_date).days + 1 # +1 to include the end date
if (days_delta) > MAX_DAYS:
    print(f"Date range exceeds {MAX_DAYS} days")
    passed_checks = False

In [111]:
# Download data
def download_data(delete_old_data=False):
    mb_per_s_list = []
    
    if passed_checks:
        download_successful = True

        if delete_old_data:
            try:
                shutil.rmtree(data_target)
            except:
                pass
            os.makedirs(data_target) 

        step = 0
        writer = SummaryWriter(log_dir="logs/download/s3")
        download_started = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        with tqdm(desc="Downloading data", total=n_files_to_download) as pbar:
            for camera in cameras_dict:
                for aws_source in cameras_dict[camera]["aws-sources"]:
                    bucket = aws_source.split("/", 1)[0]
                    for date in cameras_dict[camera]["aws-sources"][aws_source]:
                        for file in cameras_dict[camera]["aws-sources"][aws_source][date]:
                            time_start = time.time()
                            
                            # AWS S3 Download
                            file_name = os.path.basename(file)
                            key = cameras_dict[camera]["aws-sources"][aws_source][date][file_name]["key"]
                            target = os.path.join(data_target, file_name)
                            s3.download_file(bucket, key, target)
                            
                            # Calculate duration and GB/s
                            file_size_mb = cameras_dict[camera]["aws-sources"][aws_source][date][file_name]["size"] / (1024 ** 2)
                            time_end = time.time()
                            duration = time_end - time_start
                            mb_per_s = file_size_mb / duration
                            mb_per_s_list.append(mb_per_s)
                            
                            # Update stats
                            step += 1
                            pbar.update(1)
                            writer.add_scalar("download/mb_per_second", mb_per_s, step)
        writer.close()

        # Check if all files were downloaded properly
        downloaded_files = os.listdir(data_target)

        for camera in tqdm(cameras_dict, desc="Checking downloaded data"):
            for aws_source in cameras_dict[camera]["aws-sources"]:
                for date in cameras_dict[camera]["aws-sources"][aws_source]:
                    for file in cameras_dict[camera]["aws-sources"][aws_source][date]:
                        if file not in downloaded_files:
                            download_successful = False
                            print(f"File {file} was not downloaded properly")
        
        if download_successful: # Save log only if data is fully downloaded
            log = {}
            log["selection_start_date"] = start_date.strftime("%Y-%m-%d")
            log["selection_end_date"] = end_date.strftime("%Y-%m-%d")
            log["old_files_deleted"] = delete_old_data
            log["n_files_to_download"] = n_files_to_download
            log["download_size_tb"] = total_size_tb
            log["download_started"] = download_started
            log["download_ended"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            log["download_speed_avg_mbs"] = sum(mb_per_s_list) / len(mb_per_s_list)
            log["data"] = cameras_dict
            log_name = download_started.replace(" ", "_").replace(":", "_") + ".json"
            log_file_path = os.path.join(log_target, log_name)
            with open(log_file_path, 'w') as json_file:
                json.dump(log, json_file, indent=4)  
        else:
            print("Download failed")
    else:
        download_successful = False
        print("Safety checks failed. Not downloading data")

    return download_successful

run = wandb.init(
    name=f"{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}",
    sync_tensorboard=True,
    group="S3",
    job_type="download",
    project="Data Engine Download",
)

download_successful = download_data(delete_old_data=True)
print(download_successful)
run.finish(exit_code=0)


Downloading data: 100%|██████████| 11/11 [00:30<00:00,  2.81s/it]
Checking downloaded data: 100%|██████████| 24/24 [00:00<00:00, 535443.06it/s]


True


0,1
download/mb_per_second,▁▅▅▄█▆▅▃▄▆▃
global_step,▁▂▂▃▄▅▅▆▇▇█

0,1
download/mb_per_second,36.74863
global_step,11.0


In [112]:
# Unpack data
# Support V51 dataset structure https://docs.voxel51.com/user_guide/dataset_creation/datasets.html#fiftyonedataset

def unpack_data():
    v51_metadata = {}
    v51_metadata["name"] = "Data Engine Rolling Dataset"

    v51_samples = {}
    v51_samples_array = []
    if download_successful:
        for camera in tqdm(cameras_dict, desc="Unpacking data"):
            for aws_source in cameras_dict[camera]["aws-sources"]:
                for date in cameras_dict[camera]["aws-sources"][aws_source]:
                    for file in cameras_dict[camera]["aws-sources"][aws_source][date]:
                        file_path = os.path.join(data_target, file)
                        unpacking_successful = True
                        with open(file_path, 'r') as file:
                            for line in file:
                                try:
                                    data = json.loads(line)
                                    v51_sample = {}
                                    if "time" in data and "data" in data:
                                        timestamp = data.get("time")
                                        image_base64 = data.get("data")

                                        # Decode the base64 image data
                                        image_data = base64.b64decode(image_base64)

                                        time_obj = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
                                        formatted_time = time_obj.strftime('%Y-%m-%d_%H-%M-%S')
                                        image_filename = f"{camera}_{formatted_time}.jpg"
                                        output_path = os.path.join(data_target, image_filename)

                                        # Prepare import with V51
                                        v51_sample["filepath"] = output_path
                                        v51_sample["sensor"] = camera
                                        v51_sample["timestamp"] = formatted_time
                                        v51_samples_array.append(v51_sample)    
                                        # Save the decoded image data as a JPEG
                                        with open(output_path, 'wb') as image_file:
                                            image_file.write(image_data)

                                    elif "image" in data and "sensor_name" in data and "event_timestamp" in data:
                                        image_base64 = data.get("image")
                                        sensor_name = data.get("sensor_name")
                                        timestamp = data.get("event_timestamp")
                                        # TODO Check if 'camera' == sensor_name (or at least similar)
                                        # Decode the base64 image data
                                        image_data = base64.b64decode(image_base64)
                                        
                                        # Get timestamps in UTC and Michigan time
                                        utc_time = datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
                                        michigan_tz = pytz.timezone('America/Detroit')
                                        michigan_time = utc_time.astimezone(michigan_tz)
                                        formatted_time = michigan_time.strftime('%Y%m%d-%H%M%S-%f')[:-3]

                                        image_filename = f"{camera}_{formatted_time}.jpg"
                                        output_path = os.path.join(data_target, image_filename)

                                        # Prepare import with V51
                                        v51_sample["filepath"] = output_path
                                        v51_sample["sensor"] = camera
                                        v51_sample["timestamp"] = formatted_time
                                        v51_samples_array.append(v51_sample) 
                                        # Save the decoded image data as a JPEG
                                        with open(output_path, 'wb') as image_file:
                                            image_file.write(image_data)
                                    else:
                                        unpacking_successful = False
                                        print(f"Format cannot be procssed: {data}")

                                except json.JSONDecodeError as e:
                                    unpacking_successful = False
                                    print(f"Error decoding JSON: {e}")
                        
                        # Delete the original file if unpacking was successful
                        if unpacking_successful:
                            os.remove(file_path)

    # Store V51 metadata files
    v51_samples["samples"] = v51_samples_array
    file_path = os.path.join(storage_target_root, "metadata.json")
    with open(file_path, 'w') as json_file:
        json.dump(v51_metadata, json_file, indent=4)

    file_path = os.path.join(storage_target_root, "samples.json")
    with open(file_path, 'w') as json_file:
        json.dump(v51_samples, json_file, indent=4)

unpack_data()

Unpacking data: 100%|██████████| 24/24 [00:02<00:00,  8.78it/s]


In [None]:
name = "my-dataset"
dataset_dir = storage_target_root
print(dataset_dir)

# Create the dataset
dataset = fo.Dataset.from_dir(
    dataset_dir=dataset_dir,
    dataset_type=fo.types.FiftyOneDataset,
    name=name,
)

# View summary info about the dataset
print(dataset)

# Print the first few samples in the dataset
print(dataset.head())

/media/dbogdoll/Datasets/data_engine_rolling
 100% |█████████████| 16483/16483 [1.5s elapsed, 0s remaining, 11.4K samples/s]         
Name:        my-dataset
Media type:  image
Num samples: 16483
Persistent:  False
Tags:        []
Sample fields:
    id:               fiftyone.core.fields.ObjectIdField
    filepath:         fiftyone.core.fields.StringField
    tags:             fiftyone.core.fields.ListField(fiftyone.core.fields.StringField)
    metadata:         fiftyone.core.fields.EmbeddedDocumentField(fiftyone.core.metadata.ImageMetadata)
    created_at:       fiftyone.core.fields.DateTimeField
    last_modified_at: fiftyone.core.fields.DateTimeField
    sensor:           fiftyone.core.fields.StringField
    timestamp:        fiftyone.core.fields.StringField
[<Sample: {
    'id': '67351c527e94e68f6ace14e3',
    'media_type': 'image',
    'filepath': '/media/dbogdoll/Datasets/data_engine_rolling/data/geddes_huron_1_2023-11-18_19-00-00.jpg',
    'tags': [],
    'metadata': None,
    '

In [None]:
NUM_WORKERS=32
dataset.compute_metadata(num_workers=NUM_WORKERS, progress=True)

ServerSelectionTimeoutError: localhost:46813: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 673517ce7e94e68f6ace14db, topology_type: Single, servers: [<ServerDescription ('localhost', 46813) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:46813: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>

In [115]:
session = fo.launch_app()

Connected to FiftyOne on port 5151 at localhost.
If you are not connecting to a remote session, you may need to start a new session and specify a port
Server version (1.0.1) does not match client version (1.0.2)



Welcome to

███████╗██╗███████╗████████╗██╗   ██╗ ██████╗ ███╗   ██╗███████╗
██╔════╝██║██╔════╝╚══██╔══╝╚██╗ ██╔╝██╔═══██╗████╗  ██║██╔════╝
█████╗  ██║█████╗     ██║    ╚████╔╝ ██║   ██║██╔██╗ ██║█████╗
██╔══╝  ██║██╔══╝     ██║     ╚██╔╝  ██║   ██║██║╚██╗██║██╔══╝
██║     ██║██║        ██║      ██║   ╚██████╔╝██║ ╚████║███████╗
╚═╝     ╚═╝╚═╝        ╚═╝      ╚═╝    ╚═════╝ ╚═╝  ╚═══╝╚══════╝ v1.0.2

If you're finding FiftyOne helpful, here's how you can get involved:

|
|  ⭐⭐⭐ Give the project a star on GitHub ⭐⭐⭐
|  https://github.com/voxel51/fiftyone
|
|  🚀🚀🚀 Join the FiftyOne Slack community 🚀🚀🚀
|  https://slack.voxel51.com
|

