Convert tif data to dask for training

In [None]:
import rioxarray
import os
import pandas as pd
import numpy as np
import glob
import time
import dask
import dask.dataframe as dd
from dask.delayed import delayed

# Record the start time
start_time = time.time()


# Output path
out = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files'
os.makedirs(out, exist_ok=True)

# Code to read in all training data
# all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/l8_sent_collection2_proj/*.tif')
# all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/l8_sent_collection2_proj_mtbs/*.tif')
# all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/mtbs_old/*.tif')
all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/Russia/anna_old/*.tif')

# Function to process each file
@delayed
def process_file(f):
    try:
        # Read in file and convert to numpy
        in_file = rioxarray.open_rasterio(f).to_numpy().astype(float).round(3)

        # Convert to band last
        in_file = np.moveaxis(in_file, 0, 2)

        x = in_file[:, :, :-1]
        x[x == 0] = np.nan
        x = np.round(x, 2)

        y = in_file[:, :, -1].astype(float)
        y[y < 0] = 0
        y[y > 1] = 0

        stacked = np.dstack([x, y])

        # Reshape the 3D matrix to 2D
        rows, cols, bands = stacked.shape
        reshaped_data = stacked.reshape(rows * cols, bands)

        # band_names = ['blue', 'green', 'red', 'NIR', 'SWIR1', 'SWIR2', 'dNBR', 'dNDVI', 'dNDII', 'y']
        band_names = ['dNBR', 'dNDVI', 'dNDII', 'y']

        # Create a DataFrame
        training = pd.DataFrame(reshaped_data, columns=band_names).dropna()

        training = training[['dNBR', 'dNDVI', 'dNDII', 'y']]
        
        return training

    except Exception as e:
        return pd.DataFrame()  # Return an empty DataFrame on error

# Process all files in parallel
delayed_results = [process_file(f) for f in all_files]
combined_training = dd.from_delayed(delayed_results)

# Write to Parquet in chunks
combined_training.to_parquet(
    os.path.join(out, 'all_training_anna_new.parquet'),
    write_index=False,
    engine='pyarrow',
    compression='snappy',
    write_metadata_file=True,
    append=True,
    overwrite=False,
    compute=True,
)

# Record the end time
end_time = time.time()

# Calculate the elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert seconds to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print(f"NBAC Elapsed time: {elapsed_time_minutes:.2f} minutes")


In [4]:
end_time

1726587535.1589634

Memory efficient way

In [2]:
import rioxarray
import os
import pandas as pd
import numpy as np
import glob
import time
import dask
from dask.delayed import delayed

# Record the start time
start_time = time.time()

# Output path
out = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files'
os.makedirs(out, exist_ok=True)

# Code to read in all training data
all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/l8_sent_collection2_proj/*.tif')

# Function to process each file
@delayed
def process_file(f):
    try:
        # Read in file and convert to numpy
        in_file = rioxarray.open_rasterio(f).to_numpy().astype(float).round(3)

        # Convert to band last
        in_file = np.moveaxis(in_file, 0, 2)

        x = in_file[:, :, :-1]
        x[x == 0] = np.nan
        x = np.round(x, 2)

        y = in_file[:, :, -1].astype(float)
        y[y < 0] = 0
        y[y > 1] = 0

        stacked = np.dstack([x, y])

        # Reshape the 3D matrix to 2D
        rows, cols, bands = stacked.shape
        reshaped_data = stacked.reshape(rows * cols, bands)

        band_names = ['blue', 'green', 'red', 'NIR', 'SWIR1', 'SWIR2', 'dNBR', 'dNDVI', 'dNDII', 'y']

        # Create a DataFrame
        training = pd.DataFrame(reshaped_data, columns=band_names).dropna()
        training = training[['dNBR', 'dNDVI', 'dNDII', 'y']]

        return training

    except Exception as e:
        return pd.DataFrame()  # Return an empty DataFrame on error

# Process and save each file incrementally to avoid memory errors
for i, f in enumerate(all_files):
    delayed_training = process_file(f)
    df = delayed_training.compute()  # Compute the delayed result (process the file)
    
    # Save each DataFrame to a separate Parquet file
    df.to_parquet(
        os.path.join(out, f'training_nbac_part_{i}.parquet'),
        engine='pyarrow',
        compression='snappy',
        index=False
    )
    
# Record the end time
end_time = time.time()

# Calculate the elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert seconds to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print(f"NBAC Elapsed time: {elapsed_time_minutes:.2f} minutes")


NBAC Elapsed time: 66.27 minutes


In [1]:
import rioxarray
import os
import pandas as pd
import numpy as np
import glob
import time
import dask
import dask.dataframe as dd
from dask.delayed import delayed

# Record the start time
start_time = time.time()

# Output path for the final Parquet file
output_parquet = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac_new.parquet'

# Code to read in all training data
# all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/l8_sent_collection2_proj/*.tif')
all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/nbac_old/*.tif')

# Function to process each file
@delayed
def process_file(f):
    try:
        # Read in file and convert to numpy
        in_file = rioxarray.open_rasterio(f).to_numpy().astype(float).round(3)

         # Convert to band last
        in_file = np.moveaxis(in_file, 0, 2)

        x = in_file[:, :, :-1]
        x[x == 0] = np.nan
        x = np.round(x, 2)

        y = in_file[:, :, -1].astype(float)
        y[y < 0] = 0
        y[y > 1] = 0

        stacked = np.dstack([x, y])

        # Reshape the 3D matrix to 2D
        rows, cols, bands = stacked.shape
        reshaped_data = stacked.reshape(rows * cols, bands)

        # band_names = ['blue', 'green', 'red', 'NIR', 'SWIR1', 'SWIR2', 'dNBR', 'dNDVI', 'dNDII', 'y']
        band_names = ['dNBR', 'dNDVI', 'dNDII', 'y']

        # Create a DataFrame
        training = pd.DataFrame(reshaped_data, columns=band_names).dropna()
        return training

    except Exception as e:
        return pd.DataFrame()  # Return an empty DataFrame on error

# Process all files in parallel
delayed_results = [process_file(f) for f in all_files]

# Create a Dask DataFrame from delayed objects
combined_training = dd.from_delayed(delayed_results)

# Compute and save the combined Dask DataFrame to a single Parquet file
combined_training.repartition(npartitions=1).to_parquet(
    output_parquet,
    write_index=False,
    engine='pyarrow',
    compression='snappy',
    compute=True,
    write_metadata_file=False
)

# Record the end time
end_time = time.time()

# Calculate the elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert seconds to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print(f"NBAC Elapsed time: {elapsed_time_minutes:.2f} minutes")




MemoryError: Unable to allocate 307. GiB for an array with shape (4, 10300370236) and data type float64

In [None]:
import rioxarray
import os
import pandas as pd
import numpy as np
import glob
import time
import dask
import dask.dataframe as dd
from dask.delayed import delayed

# Record the start time
start_time = time.time()

# Output path for the final Parquet file
output_parquet = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac_new.parquet'

# Get a list of all TIF files
all_files = glob.glob('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/nbac_old/*.tif')

# Function to process each file
@delayed
def process_file(f):
    try:
        # Read in file and convert to numpy
        in_file = rioxarray.open_rasterio(f).to_numpy().astype(float).round(3)

        # Convert to band last
        in_file = np.moveaxis(in_file, 0, 2)

        x = in_file[:, :, :-1]
        x[x == 0] = np.nan
        x = np.round(x, 2)

        y = in_file[:, :, -1].astype(float)
        y[y < 0] = 0
        y[y > 1] = 0

        stacked = np.dstack([x, y])

        # Reshape the 3D matrix to 2D
        rows, cols, bands = stacked.shape
        reshaped_data = stacked.reshape(rows * cols, bands)

        # Define band names
        band_names = ['dNBR', 'dNDVI', 'dNDII', 'y']

        # Create a DataFrame
        training = pd.DataFrame(reshaped_data, columns=band_names).dropna()
        return training

    except Exception as e:
        return pd.DataFrame()  # Return an empty DataFrame on error

# Process files in smaller batches
batch_size = 100  # Adjust batch size depending on memory limitations
for i in range(0, len(all_files), batch_size):
    batch_files = all_files[i:i + batch_size]

    # Process the batch of files
    delayed_results = [process_file(f) for f in batch_files]

    # Create a Dask DataFrame from delayed objects
    combined_training = dd.from_delayed(delayed_results)

    # Save the batch to a Parquet file, append to existing files
    combined_training.to_parquet(
        output_parquet,
        write_index=False,
        engine='pyarrow',
        compression='snappy',
        compute=True,
        append=True  # Append to avoid overwriting
    )

# Record the end time
end_time = time.time()

# Calculate the elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert seconds to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print(f"NBAC Elapsed time: {elapsed_time_minutes:.2f} minutes")




In [4]:
import dask.dataframe as dd
import pandas as pd
# Read the Parquet file in chunks using Dask
mtbs = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_mtbs.parquet', blocksize='100MB')

# Initialize a counter for the total number of rows
total_rows = 0

# Iterate through each chunk and calculate the number of rows
for chunk in mtbs.to_delayed():
    chunk_df = chunk.compute()  # Convert the chunk to a Pandas DataFrame
    total_rows += len(chunk_df)  # Add the number of rows in this chunk to the total

print(f"Total number of rows: {total_rows}")

#mtbs is 468203344


KeyboardInterrupt



In [None]:
import dask.dataframe as dd

# Read the large Parquet files with Dask
mtbs = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_mtbs.parquet')
nbac = dd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac.parquet')

# Concatenate the two DataFrames
combined_df = dd.concat([mtbs, nbac])

selected_columns = combined_df[['dNBR', 'dNDVI', 'dNDII', 'y']]

# Drop all rows containing NaN values
cleaned_df = selected_columns.dropna()

# Drop all rows containing NaN values
cleaned_df = combined_df.dropna()

# Save the cleaned, combined DataFrame to a new Parquet file
output_path = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_mtbs_nbac.parquet'
cleaned_df.to_parquet(output_path, write_index=False, engine='pyarrow', compression='snappy')


In [None]:
import pandas as pd
t1 = pd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_mtbs.parquet', columns = ['dNBR', 'dNDVI', 'dNDII', 'y'])
t2 = pd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac.parquet', columns = ['dNBR', 'dNDVI', 'dNDII', 'y'])


t.head()

In [1]:
import dask.dataframe as dd

# Define the file paths
file1 = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_mtbs.parquet'
file2 = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_nbac.parquet'
new_file = '/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet'

# Read the parquet files in chunks using Dask
df1 = dd.read_parquet(file1, columns=['dNBR', 'dNDVI', 'dNDII', 'y'], hunksize='100MB')
df2 = dd.read_parquet(file2, columns=['dNBR', 'dNDVI', 'dNDII', 'y'], hunksize='100MB')

# Concatenate the Dask dataframes
combined_df = dd.concat([df1, df2])

# Write the combined dataframe to a new parquet file
combined_df.to_parquet(new_file)

print(f'Combined file saved to {new_file}')



Combined file saved to /explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet


In [2]:
't'

't'

In [3]:
combined_df.shape

(<dask_expr.expr.Scalar: expr=(Concat(frames=[ReadParquetFSSpec(4c7ba11), ReadParquetFSSpec(0615f87)], )).size() / 4, dtype=float64>,
 4)

In [None]:
import pandas as pd
l = pd.read_parquet('/explore/nobackup/people/spotter5/cnn_mapping/nbac_training/parquet_files/all_training_na.parquet',  columns=['dNBR', 'dNDVI', 'dNDII', 'y']).reset_index(drop = True).head(1000000)
l['y'] = l['y'].astype(int)
l.shape

In [9]:
l['y'].unique()

array([0., 1.])

In [12]:
l.head().reset_index(drop = True)

Unnamed: 0,dNBR,dNDVI,dNDII,y
0,-101.0,-45.0,-104.0,0.0
1,-91.0,-54.0,-107.0,0.0
2,-66.0,-5.0,-74.0,0.0
3,-49.0,-105.0,-33.0,0.0
4,-101.0,-144.0,-94.0,0.0
