In [4]:
# we'd like the 'timestamp' column to be imported into pandas as a 'datetime' object
# but it's getting imported as an 'object'

# using pd.to_datetime() is slow and memory intensive
# we can do this in a chunked approach to avoid memory issues

# first check how many 'row groups' are in the parquet file
# if they are small enough, we can use a native pyarrow function to import the data
# if not, we can use a custom function to import the data in chunks
import pyarrow.parquet as pq


def check_row_groups(file_path):
    parquet_file = pq.ParquetFile(file_path)
    num_row_groups = parquet_file.num_row_groups
    print(f'Total Row Groups: {num_row_groups}')
    for i in range(num_row_groups):
        row_group = parquet_file.metadata.row_group(i)
        print(f'Row Group {i} Rows: {row_group.num_rows}')

# Check the row groups in your Parquet file
check_row_groups('../data/train_series.parquet')

Total Row Groups: 488
Row Group 0 Rows: 262185
Row Group 1 Rows: 262185
Row Group 2 Rows: 262185
Row Group 3 Rows: 262185
Row Group 4 Rows: 262185
Row Group 5 Rows: 262185
Row Group 6 Rows: 262185
Row Group 7 Rows: 262185
Row Group 8 Rows: 262185
Row Group 9 Rows: 262185
Row Group 10 Rows: 262185
Row Group 11 Rows: 262185
Row Group 12 Rows: 262185
Row Group 13 Rows: 262185
Row Group 14 Rows: 262185
Row Group 15 Rows: 262185
Row Group 16 Rows: 262185
Row Group 17 Rows: 262185
Row Group 18 Rows: 262185
Row Group 19 Rows: 262185
Row Group 20 Rows: 262185
Row Group 21 Rows: 262185
Row Group 22 Rows: 262185
Row Group 23 Rows: 262185
Row Group 24 Rows: 262185
Row Group 25 Rows: 262185
Row Group 26 Rows: 262185
Row Group 27 Rows: 262185
Row Group 28 Rows: 262185
Row Group 29 Rows: 262185
Row Group 30 Rows: 262185
Row Group 31 Rows: 262185
Row Group 32 Rows: 262185
Row Group 33 Rows: 262185
Row Group 34 Rows: 262185
Row Group 35 Rows: 262185
Row Group 36 Rows: 262185
Row Group 37 Rows: 262185


In [None]:
import os
import fastparquet as fp
import pandas as pd

# there are 488 symmetrical row groups in the train_series.parquet file
# so we can use the native pyarrow function to import the data
def process_and_save_in_chunks(input_file_path, output_file_path):
    parquet_file = pq.ParquetFile(input_file_path)
    num_row_groups = parquet_file.num_row_groups

    temp_files = []  # List to hold temporary file paths

    # Process each row group
    for i in range(num_row_groups):
        table = parquet_file.read_row_group(i)
        df = table.to_pandas()

        # Convert to datetime and handle errors
        df['timestamp'] = pd.to_datetime(df['timestamp'], format='%Y-%m-%dT%H:%M:%S%z', errors='coerce', utc=True)

        # Check for NaT values
        nat_rows = df[df['timestamp'].isna()]
        if not nat_rows.empty:
            print(f"NaT values found in chunk {i}:")
            print(nat_rows)

        # Convert to UTC and remove timezone information if dtype is correct
        if isinstance(df['timestamp'].dtype, pd.DatetimeTZDtype):
            df['timestamp'] = df['timestamp'].dt.tz_convert('UTC').dt.tz_localize(None)
        else:
            print(f"Skipped timezone conversion for chunk {i} due to incorrect dtype.")
        
        # Check for an empty DataFrame and exit early if found
        if df.empty:
            print(f"Empty DataFrame detected in chunk {i}. Exiting.")
            return
        
        # Write the chunk to a temporary Parquet file
        temp_file_path = f'temp_file_{i}.parquet'
        temp_files.append(temp_file_path)
        df.to_parquet(temp_file_path, index=False)

    # Combine the temporary Parquet files into a single Parquet dataset
    # list_of_dfs = [pd.read_parquet(temp_file) for temp_file in temp_files]
    # final_df = pd.concat(list_of_dfs, axis=0)
    # final_df.to_parquet(output_file_path, index=False)

    # fp.writer.merge(temp_files, output_file_path)
    
    import pyarrow.parquet as pq
    import shutil
    
    # Initialize with the first file
    shutil.copy(temp_files[0], output_file_path)
    # Open the output file in append mode
    output_file = pq.ParquetWriter(
        output_file_path,
        schema=pq.ParquetFile(temp_files[0]).schema
        )
    # Append data from subsequent temporary files
    for temp_file in temp_files[1:]:
        table = pq.read_table(temp_file)
        output_file.write_table(table)

    # Close the output file
    output_file.close()
    
    # check the output
    merged_df = pd.read_parquet(output_file_path)
    print(merged_df)

    # remove the temporary Parquet files
    for temp_file_path in temp_files:
        os.remove(temp_file_path)


# Call the function
file_bases = ['train_series']

for base in file_bases:
    input_file = f'../data/{base}.parquet'
    output_file = f'../data/{base}_datecorrected.parquet'
    process_and_save_in_chunks(input_file, output_file)

In [1]:
import os
import pyarrow.parquet as pq

files = []
input_folder = r"../notebooks"
target_path = r"../data/train_series_datecorrected.parquet"

for file_name in os.listdir(input_folder):
    if os.path.splitext(file_name)[1] == ".parquet":
        files.append(pq.read_table(os.path.join(input_folder, file_name)))

with pq.ParquetWriter(target_path, files[0].schema, version='2.0', compression='gzip', use_dictionary=True, data_page_size=2097152) as writer:
    for f in files:
        writer.write_table(f)

  with pq.ParquetWriter(target_path, files[0].schema, version='2.0', compression='gzip', use_dictionary=True, data_page_size=2097152) as writer:


In [1]:
# check that the files are the same
import pandas as pd

def compare_parquet_files(file1: str, file2: str) -> bool:
    """
    Compare two Parquet files to determine if they are identical.
    
    Parameters:
    - file1: Path to the first Parquet file.
    - file2: Path to the second Parquet file.
    """
    
    # Read the Parquet files into DataFrames
    df1 = pd.read_parquet(file1)
    df2 = pd.read_parquet(file2)

    # Check row differences
    if df1.shape[0] != df2.shape[0]:
        print(f"Row count difference: File1 has {df1.shape[0]} rows, while File2 has {df2.shape[0]} rows.")
        
    # Check column differences
    diff_columns = set(df1.columns) ^ set(df2.columns)
    if diff_columns:
        print(f"Different columns found: {', '.join(diff_columns)}")

    # Check for row data differences and display a sample
    if not df1.equals(df2):
        differing_rows = (df1 != df2).any(axis=1)
        sample_differing_rows = df1[differing_rows].head()  # Adjust this to display more rows if needed
        sample_differing_rows2 = df2[differing_rows].head()  # Adjust this to display more rows if needed
        print("Sample rows that differ between files:")
        print(sample_differing_rows)
        print("Versus:")
        print(sample_differing_rows2)

# Example usage:
file_path_original = '../data/train_series.parquet'
file_path_processed = '../data/train_series_datecorrected.parquet'

compare_parquet_files(file_path_original, file_path_processed)


  differing_rows = (df1 != df2).any(axis=1)


Sample rows that differ between files:
      series_id  step                 timestamp  anglez    enmo
0  038441c925bb     0  2018-08-14T15:30:00-0400  2.6367  0.0217
1  038441c925bb     1  2018-08-14T15:30:05-0400  2.6368  0.0215
2  038441c925bb     2  2018-08-14T15:30:10-0400  2.6370  0.0216
3  038441c925bb     3  2018-08-14T15:30:15-0400  2.6368  0.0213
4  038441c925bb     4  2018-08-14T15:30:20-0400  2.6368  0.0215
Versus:
      series_id  step           timestamp  anglez    enmo
0  038441c925bb     0 2018-08-14 19:30:00  2.6367  0.0217
1  038441c925bb     1 2018-08-14 19:30:05  2.6368  0.0215
2  038441c925bb     2 2018-08-14 19:30:10  2.6370  0.0216
3  038441c925bb     3 2018-08-14 19:30:15  2.6368  0.0213
4  038441c925bb     4 2018-08-14 19:30:20  2.6368  0.0215
