In [2]:
import numpy as np
import h5py
import os

from datasets import load_dataset
import polars as pl

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
print(np.__version__)
print(pl.__version__)

1.24.2
1.18.0


## Get list of files

In [4]:
raw_dir = "/home/vikas/Desktop/Globus/gaia"

In [5]:
def get_files(raw_dir, ext):

    files_found = []

    for path, dirs, files in os.walk(raw_dir):
        for file in files:
            if file.endswith(ext):
                files_found.append(os.path.join(path, file))

    return files_found

In [6]:
# Define the sorting key function
def healpix_sort_key(path):
    
    # Extract the number after 'healpix=' using string manipulation
    healpix_part = path.split('healpix=')[1]
    healpix_number = int(healpix_part.split('/')[0])
    
    return healpix_number

In [7]:
def sorted_files(start, end):
    
    files_hdf = get_files(raw_dir, ".hdf5")
    
    files_hdf_sorted = sorted(files_hdf, key = healpix_sort_key)
    
    return files_hdf_sorted[start:end]    

## Load dataset

In [8]:
def load_data(start, end):
    
    # Total number of files/shards is 1882    
    data_files = {
    "train": sorted_files(start, end)
    }
    
    hdf_dset = load_dataset(raw_dir,
                            split = 'train',
                            num_proc = 24,
                            streaming = False,
                            data_files = data_files)
    
    #hdf_dset = hdf_dset.with_format('numpy')
    
    return hdf_dset   

## Convert to Polars DataFrame

In [9]:
def get_object_id(hdf_dset):
    
    object_id_all = hdf_dset[:]["object_id"]
    
    object_series = pl.Series("object_id", object_id_all)
    
    return object_series    

In [10]:
def dset_to_polars(hdf_dset, feature, object_series):
    
    # Collect all rows    
    feature_all = hdf_dset[:][feature]
    
    # Convert to Polars DataFrame    
    df_feature_all = pl.DataFrame(feature_all)
    
    # Add object_id as the first column    
    df_feature_all.insert_column(0, object_series)
    
    return df_feature_all   

In [11]:
# hdf_dset = load_data(10, 58)

# object_series = get_object_id(hdf_dset)

# df_feature = dset_to_polars(hdf_dset, "gspphot") 

## Convert to parquet

In [12]:
dest_dir = "/home/vikas/Desktop/Globus/gaia_parquet"

In [13]:
def hdf_to_parquet(start, end):

    hdf_dset = load_data(start, end)

    object_series = get_object_id(hdf_dset)

    features_list = [
                     "photometry",
                     "astrometry",
                     "radial_velocity",
                     "gspphot"        
                    ]
    
    for feature in features_list:
        try:
            df_feature = dset_to_polars(hdf_dset, feature, object_series)
            
            dest_parquet = os.path.join(
                                        dest_dir,
                                        feature,
                                        f"gaia_{feature}_{start}_{end}.parquet"
                                       )
            
            df_feature.write_parquet(
                dest_parquet,
                compression = "zstd",
                compression_level = 22,
                
            )
            
        except:
            print(f"Unable to create parquet output for {feature}!")
            continue
            
    hdf_dset, object_series = None, None
            
    return None        

## Run export

In [14]:
def get_total_file_size(paths):
    
    file_sizes = []
    
    for path in paths:
        try:
            # Get file size in bytes
            size = os.path.getsize(path)  
            file_sizes.append(size)
            
        except FileNotFoundError:
            # Handle case where file does not exist
            print(f"{path} not found!")
            continue
            
    # Size in GB            
    return sum(file_sizes) / (1024 * 1024 * 1024)

In [27]:
def run_export(start):
    
    # Starting at num_proc = 1 as an initial estimate
    end = start + 1
    cycle_end = False
    
    while end <= 1882:
        
        print(f"File start: {start}")
        print(f"Initial file end: {end}")    
        
        size_on_disk = get_total_file_size(sorted_files(start, end))
    
        # Keep increasing batch size until total size ~ 12 GB    
        while size_on_disk < 1.00 and end < 1881:
            
            end = end + 1
            size_on_disk = get_total_file_size(sorted_files(start, end))       
            
        print(f"New file end: {end}")
        
        hdf_to_parquet(start, end)
        
        if cycle_end:
            break
            
        start = end
        end = start + 1
        
        if end > 1882:
            end = 1882
            cycle_end = True        
            
    return None    

In [None]:
run_export(466)

File start: 466
Initial file end: 467
New file end: 477


Setting num_proc from 24 to 11 for the train split as it only contains 11 shards.
Generating train split: 43533 examples [00:26, 1746.22 examples/s]

In [22]:
#get_total_file_size(sorted_files(466, 478))

## Test exported parquet files

In [18]:
# test_dir = "/home/vikas/Desktop/Globus/gaia_parquet/photometry/*.parquet"

# query = (pl
#          .scan_parquet(test_dir)
#          .filter(pl.col("phot_g_mean_flux") > 200000)
#         )

# query.collect(streaming=True)