In [1]:
%pip install requests aria2 netCDF4 numpy xarray scikit-learn tqdm

Collecting aria2
  Downloading aria2-0.0.1b0-py3-none-manylinux_2_17_x86_64.whl.metadata (28 kB)
Collecting netCDF4
  Downloading netCDF4-1.7.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.8 kB)
Collecting xarray
  Downloading xarray-2024.11.0-py3-none-any.whl.metadata (11 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.5.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting cftime (from netCDF4)
  Downloading cftime-1.6.4.post1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.7 kB)
Collecting pandas>=2.1 (from xarray)
  Downloading pandas-2.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
Collecting scipy>=1.6.0 (from scikit-learn)
  Downloading scipy-1.14.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (60 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloadin

In [6]:
import os

# Constants
DOWNLOAD_DATA = True
DATA_DIR = './data'  # Directory containing .tar.gz files
EXTRACT_DIR = os.path.join(DATA_DIR, 'extracted')

In [None]:
import logging
import subprocess
import tarfile

# Setup logging
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s - %(levelname)s - %(message)s")

# Bucket and endpoint configuration
CUSTOM_ENDPOINT = "bbproxy.meyerstk.com/file"
APP = "TorNetBecauseZenodoSlow"
TMP_FILE = os.path.join(DATA_DIR, "tmp.txt")

# Ensure directories exist
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(EXTRACT_DIR, exist_ok=True)


def download_links(links):
    """
    Download files from the provided links using aria2c.
    Uses a file named tmp.txt in DATA_DIR for links.
    """
    try:
        # Write links to tmp.txt
        with open(TMP_FILE, 'w') as file:
            file.writelines(link + '\n' for link in links)
        logging.info(f"Temporary file created: {TMP_FILE}")

        # Run aria2c to download files
        logging.info(f"Starting downloads for links: {', '.join(links)}")
        command = [
            "aria2c",
            "-j", "5",                # Download up to 3 files concurrently
            "-x", "16",               # Use up to 16 connections per file
            # "--console-log-level=info",
            "-s", "16",               # Split each file into 16 segments
            "--dir", DATA_DIR,        # Specify the download directory
            "-i", TMP_FILE            # Input file with download links
        ]
        subprocess.run(command, check=True)
        logging.info("Downloads completed successfully.")
    except Exception as e:
        logging.error(f"Error during download: {e}")
        exit(1)
    finally:
        if os.path.exists(TMP_FILE):
            os.remove(TMP_FILE)
            logging.info(f"Temporary file deleted: {TMP_FILE}")


def download_files_with_aria():
    """
    Download files from a public Backblaze B2 bucket served via a custom endpoint using aria2c.
    """
    logging.info("Starting download process with aria2c...")

    # # List of files to download
    file_list = [
        "tornet_2013.tar.gz",
        "tornet_2014.tar.gz",
        "tornet_2015.tar.gz",
        "tornet_2016.tar.gz",
        "tornet_2017.tar.gz",
        "tornet_2018.tar.gz",
        "tornet_2019.tar.gz",
        "tornet_2020.tar.gz",
        "tornet_2021.tar.gz",
        "tornet_2022.tar.gz",
        "catalog.csv"
    ]

    # Construct the public URLs
    links = [f"https://{CUSTOM_ENDPOINT}/{APP}/{file_name}" for file_name in file_list]
    
    # Filter out already downloaded files
    links_to_download = [
        link for link in links
        if not os.path.exists(os.path.join(DATA_DIR, os.path.basename(link)))
    ]

    if links_to_download:
        download_links(links_to_download)
    else:
        logging.info("All files already downloaded.")


def extract_local_tar_files():
    """
    Extract all .tar.gz files from the local DATA_DIR to EXTRACT_DIR.
    """
    logging.info("Starting extraction process...")
    for file_name in os.listdir(DATA_DIR):
        if file_name.endswith('.tar.gz'):
            file_path = os.path.join(DATA_DIR, file_name)
            logging.info(f'Extracting {file_path}...')
            with tarfile.open(file_path, 'r:gz') as tar:
                tar.extractall(path=EXTRACT_DIR)
            logging.info(f'Extracted {file_path} to {EXTRACT_DIR}')

            os.remove(file_path)

if DOWNLOAD_DATA:
    download_files_with_aria()
    extract_local_tar_files()

2024-12-05 03:31:54,497 - INFO - Starting download process with aria2c...
2024-12-05 03:31:54,498 - INFO - Temporary file created: ./data/tmp.txt
2024-12-05 03:31:54,498 - INFO - Starting downloads for links: https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2014.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2015.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2016.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2017.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2018.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2019.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2020.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2021.tar.gz, https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2022.tar.gz



12/05 03:31:54 [[1;32mNOTICE[0m] Downloading 9 item(s)

12/05 03:31:55 [[1;31mERROR[0m] CUID#17 - Download aborted. URI=https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2018.tar.gz
Exception: [AbstractCommand.cc:351] errorCode=8 URI=https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2018.tar.gz
  -> [HttpResponse.cc:81] errorCode=8 Invalid range header. Request: 4692377600-5474615295/12510393928, Response: 0-12510393927/12510393928

12/05 03:31:55 [[1;31mERROR[0m] CUID#41 - Download aborted. URI=https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2014.tar.gz
Exception: [AbstractCommand.cc:351] errorCode=8 URI=https://bbproxy.meyerstk.com/file/TorNetBecauseZenodoSlow/tornet_2014.tar.gz
  -> [HttpResponse.cc:81] errorCode=8 Invalid range header. Request: 10358882304-11300503551/15066513529, Response: 0-15066513528/15066513529

12/05 03:31:55 [[1;31mERROR[0m] CUID#65 - Download aborted. URI=https://bbproxy.meyerstk.com/file/TorNetBecause

In [7]:
import numpy as np
import xarray as xr
import tensorflow as tf
from pathlib import Path
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
from collections import defaultdict

# Constants for normalization
CHANNEL_MIN_MAX = {
    'DBZ': [-20., 60.],
    'VEL': [-60., 60.],
    'KDP': [-2., 5.],
    'RHOHV': [0.2, 1.04],
    'ZDR': [-1., 8.],
    'WIDTH': [0., 9.]
}

VARIABLES = ['DBZ', 'VEL', 'KDP', 'RHOHV', 'ZDR', 'WIDTH']

def parse_nc_file(file_path):
    """
    Parse and preprocess a single .nc file.
    Output: features (4D array), label (int)
    """
    try:
        with xr.open_dataset(file_path, engine="netcdf4") as ds:
            data_list = []

            # Process radar variables
            for var in VARIABLES:
                if var not in ds:
                    raise ValueError(f"Variable {var} not found in dataset.")

                var_data = ds[var].values  # Shape: [time, azimuth, range, sweep]
                var_min, var_max = CHANNEL_MIN_MAX[var]

                # Handle missing data and normalize
                var_data = np.nan_to_num(var_data, nan=0, posinf=0, neginf=0)
                var_data[var_data == ds.attrs.get('MissingDataFlag', -999.0)] = 0
                var_data = np.clip(var_data, var_min, var_max)
                var_data = (var_data - var_min) / (var_max - var_min)
                var_data = (var_data * 255).astype(np.uint8)  # Scale to [0, 255] and convert to uint8

                data_list.append(var_data)

            # Combine variables into the channel dimension
            data = np.stack(data_list, axis=-1)  # Shape: [time, azimuth, range, sweep, variables]
            data = data.transpose(0, 1, 2, 4, 3)  # [time, azimuth, range, variables, sweep]
            data = data.reshape(data.shape[0], data.shape[1], data.shape[2], -1)  # [time, azimuth, range, channels]

            # Ensure correct time dimension
            if data.shape[0] < 4:
                raise ValueError(f"File {file_path} has fewer than 4 time steps.")

            # Extract label from category attribute
            label = ds.attrs.get("category", "NUL")
            label = 1 if label == "TOR" else 0

            return data[:4], label  # Return first 4 time steps

    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        return None, None

def serialize_example(features, label):
    """
    Serialize features and labels into a TFRecord-compatible format.
    """
    feature = {
        "features": tf.train.Feature(bytes_list=tf.train.BytesList(value=[features.tobytes()])),
        "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label])),
        "shape": tf.train.Feature(int64_list=tf.train.Int64List(value=list(features.shape)))
    }
    return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()

def group_files_by_year(input_dir):
    """
    Group `.nc` files by year.
    """
    files_by_year = defaultdict(list)
    for file in Path(input_dir).rglob("*.nc"):
        year = file.parent.name  # Assuming year is the folder name
        files_by_year[year].append(file)
    return files_by_year

def process_year(year, files, output_dir):
    """
    Process files for a given year and save them as a TFRecord file.
    """
    output_path = str(Path(output_dir) / f"{year}.tfrecord")  # Convert Path to string
    with tf.io.TFRecordWriter(output_path) as writer:
        for file in tqdm(files, desc=f"Processing year {year}"):
            features, label = parse_nc_file(file)
            if features is not None:
                example = serialize_example(features, label)
                writer.write(example)
    return f"Completed {year}: {len(files)} files"

def create_tfrecords(input_dir, output_dir, num_workers=4):
    """
    Create TFRecords for all years in train/test directories in parallel.
    """
    os.makedirs(output_dir, exist_ok=True)
    files_by_year = group_files_by_year(input_dir)

    # Process each year in parallel
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [
            executor.submit(process_year, year, files, output_dir)
            for year, files in files_by_year.items()
        ]
        for future in tqdm(futures, desc="Processing all years"):
            print(future.result())

# Paths
TRAIN_DIR = "./data/extracted/train"
TEST_DIR = "./data/extracted/test"
TRAIN_OUTPUT_DIR = "./data/tfrecords/train"
TEST_OUTPUT_DIR = "./data/tfrecords/test"

# Create TFRecords
print("Creating training TFRecords...")
create_tfrecords(TRAIN_DIR, TRAIN_OUTPUT_DIR, num_workers=8)

print("Creating testing TFRecords...")
create_tfrecords(TEST_DIR, TEST_OUTPUT_DIR, num_workers=8)

Creating training TFRecords...


Processing year 2013: 100%|██████████| 3498/3498 [01:19<00:00, 43.79it/s]
Processing all years: 100%|██████████| 1/1 [01:19<00:00, 79.89s/it]

Completed 2013: 3498 files
Creating testing TFRecords...



Processing year 2013: 100%|██████████| 573/573 [00:11<00:00, 49.98it/s]
Processing all years: 100%|██████████| 1/1 [00:11<00:00, 11.61s/it]

Completed 2013: 573 files





In [8]:
def parse_tfrecord(example):
    feature_description = {
        "features": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.int64)
    }
    parsed_example = tf.io.parse_single_example(example, feature_description)

    # Decode features and reshape directly to the known fixed shape
    features = tf.io.decode_raw(parsed_example["features"], tf.uint8)
    features = tf.reshape(features, [4, 120, 240, 12])  # Directly use the fixed shape
    features = tf.cast(features, tf.float32) / 255.0  # Scale back to [0, 1]

    # Parse label
    label = tf.cast(parsed_example["label"], tf.float32)
    label = tf.reshape(label, (1,))  # Ensure label has shape [1]

    return features, label

def create_tf_dataset_with_count(tfrecord_dir, batch_size, shuffle=True):
    """
    Create a tf.data.Dataset from TFRecord files and count total samples.
    """
    tfrecord_files = list(Path(tfrecord_dir).glob("*.tfrecord"))
    dataset = tf.data.TFRecordDataset(tfrecord_files)

    sample_count = 0
    for record in dataset:
        sample_count += 1

    dataset = dataset.map(parse_tfrecord)
    if shuffle:
        dataset = dataset.shuffle(1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)

    return dataset, sample_count

train_dataset, train_sample_count = create_tf_dataset_with_count(TRAIN_OUTPUT_DIR, batch_size=32)
test_dataset, test_sample_count = create_tf_dataset_with_count(TEST_OUTPUT_DIR, batch_size=32, shuffle=False)

train_steps_per_epoch = train_sample_count // 32
validation_steps = test_sample_count // 32

for features, labels in train_dataset.take(1):
    print(f"Feature shape: {features.shape}, Label shape: {labels.shape}")

2024-12-05 03:29:53.359088: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


Feature shape: (32, 4, 120, 240, 12), Label shape: (32, 1)


In [9]:
import tensorflow as tf

def analyze_tfrecord_contents(tfrecord_path):
    feature_sizes = []
    label_sizes = []
    shape_sizes = []

    for raw_record in tf.data.TFRecordDataset([tfrecord_path]):
        example = tf.train.Example()
        example.ParseFromString(raw_record.numpy())

        # Extract each field and calculate its size
        feature = example.features.feature["features"].bytes_list.value[0]
        label = example.features.feature["label"].int64_list.value[0]
        shape = example.features.feature["shape"].int64_list.value

        feature_sizes.append(len(feature))
        label_sizes.append(len(str(label).encode("utf-8")))
        shape_sizes.append(len(str(shape).encode("utf-8")))

    # Summarize field sizes
    def summarize(field_name, sizes):
        total = sum(sizes)
        avg = total / len(sizes) if sizes else 0
        min_size = min(sizes) if sizes else 0
        max_size = max(sizes) if sizes else 0
        print(f"{field_name} Sizes:")
        print(f"  Total: {total / (1024 ** 2):.2f} MB")
        print(f"  Average: {avg / 1024:.2f} KB")
        print(f"  Min: {min_size / 1024:.2f} KB")
        print(f"  Max: {max_size / 1024:.2f} KB\n")

    print(f"Analyzing {len(feature_sizes)} records in TFRecord: {tfrecord_path}")
    summarize("Feature", feature_sizes)
    summarize("Label", label_sizes)
    summarize("Shape", shape_sizes)

# Example usage
tfrecord_path = "./data/tfrecords/test/2013.tfrecord"  # Replace with your TFRecord file path
analyze_tfrecord_contents(tfrecord_path)


Analyzing 573 records in TFRecord: ./data/tfrecords/test/2013.tfrecord
Feature Sizes:
  Total: 755.42 MB
  Average: 1350.00 KB
  Min: 1350.00 KB
  Max: 1350.00 KB

Label Sizes:
  Total: 0.00 MB
  Average: 0.00 KB
  Min: 0.00 KB
  Max: 0.00 KB

Shape Sizes:
  Total: 0.01 MB
  Average: 0.02 KB
  Min: 0.02 KB
  Max: 0.02 KB



In [2]:
import tensorflow as tf

print(tf.config.list_physical_devices('GPU'))

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [10]:
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.regularizers import l2
from tensorflow.keras import models, layers

def create_3d_torcnn(input_shape=(4, 120, 240, 12), dropout_rate=0.3):
    """
    Define a 3D CNN model for tornado detection with adjusted pooling.
    """
    model = models.Sequential(
        [
            # Input Layer
            layers.Input(shape=input_shape),
            
            # Block 1
            layers.Conv3D(32, (3, 3, 3), activation="relu", padding="same"),
            layers.Conv3D(32, (3, 3, 3), activation="relu", padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling3D((1, 2, 2)),  # Pool spatial dimensions only
            layers.Dropout(dropout_rate),

            # Block 2
            layers.Conv3D(64, (3, 3, 3), activation="relu", padding="same"),
            layers.Conv3D(64, (3, 3, 3), activation="relu", padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling3D((1, 2, 2)),  # Pool spatial dimensions only
            layers.Dropout(dropout_rate),

            # Block 3
            layers.Conv3D(128, (3, 3, 3), activation="relu", padding="same"),
            layers.Conv3D(128, (3, 3, 3), activation="relu", padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling3D((2, 2, 2)),  # Pool across all dimensions
            layers.Dropout(dropout_rate),

            # Fully Connected Layers
            layers.Flatten(),
            layers.Dense(256, activation="relu", kernel_regularizer=l2(0.01)),
            layers.Dropout(0.4),
            layers.Dense(128, activation="relu", kernel_regularizer=l2(0.01)),
            layers.Dropout(0.4),
            layers.Dense(1, activation="sigmoid")  # Binary classification output
        ]
    )

    # Compile the model
    model.compile(
        optimizer=Adam(learning_rate=0.0005),
        loss=BinaryCrossentropy(),
        metrics=["accuracy", Precision(), Recall(), AUC()],
    )
    return model


In [11]:
# Train the model
model = create_3d_torcnn()

history = model.fit(
    train_dataset,
    epochs=50,
    steps_per_epoch=train_steps_per_epoch,
    validation_data=test_dataset,
    validation_steps=validation_steps,
    callbacks=[
        tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
        tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=3, min_lr=1e-6)
    ]
)

Epoch 1/50


2024-12-05 03:30:16.235725: W tensorflow/core/kernels/data/prefetch_autotuner.cc:52] Prefetch autotuner tried to allocate 176947328 bytes after encountering the first element of size 176947328 bytes.This already causes the autotune ram budget to be exceeded. To stay within the ram budget, either increase the ram budget or reduce element size
I0000 00:00:1733369416.262919   27013 service.cc:148] XLA service 0x771954018740 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1733369416.263036   27013 service.cc:156]   StreamExecutor device (0): NVIDIA GeForce RTX 4090, Compute Capability 8.9
2024-12-05 03:30:16.313627: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1733369416.514963   27013 cuda_dnn.cc:529] Loaded cuDNN version 90300
I0000 00:00:1733369431.479072   27013 device_compiler.h:188] Compiled cluster using XLA! 

[1m 88/109[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m3s[0m 143ms/step - accuracy: 0.7957 - auc: 0.5151 - loss: 14.7857 - precision: 0.1110 - recall: 0.1344

KeyboardInterrupt: 