In [1]:
import os
import json
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Base path where your files are located
# base_path = '/Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas'
base_path = '/Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Atla'

# List all files in the directory
file_names = os.listdir(base_path)

# Generate full paths to the files
file_paths = [os.path.join(base_path, file_name) for file_name in file_names]

# Set the output path for the parquet file
# output_parquet_path = os.path.join(base_path, 'combined_internet_traffic_data_atla.parquet')
output_parquet_path = os.path.join(base_path, 'combined_internet_traffic_data_dallas.parquet')


# Set the chunk size (i.e., how many files to process at a time — here we're taking 3 days worth of files)
days_to_process = 3

# Initialize Parquet writer
parquet_writer = None

# Function to process and append data to the existing parquet file
def process_and_append_data(file_paths, parquet_path, days_to_process=3):
    global parquet_writer

    for i in range(0, len(file_paths), days_to_process):
        # Process the next batch of files (e.g., 3 days of data)
        batch_files = file_paths[i:i + days_to_process]
        df_list = []  # List to hold DataFrames from files

        for file_path in batch_files:
            data_list = []
            with open(file_path, 'r') as file:
                for line in file:
                    try:
                        # Append each valid JSON object to the list
                        data_list.append(json.loads(line))
                    except json.JSONDecodeError as e:
                        print(f"Error decoding JSON on this line: {line}")
                        print(e)

            # Convert the data to a DataFrame and append to the list
            df = pd.DataFrame(data_list)
            df_list.append(df)

        # Concatenate the DataFrames from the current batch
        combined_df = pd.concat(df_list, ignore_index=True)

        # Convert DataFrame to a PyArrow Table
        table = pa.Table.from_pandas(combined_df)

        # Append or create the Parquet file
        if parquet_writer is None:
            parquet_writer = pq.ParquetWriter(parquet_path, table.schema, compression='snappy')

        parquet_writer.write_table(table)
        print(f"Appended {len(batch_files)} days of data to {parquet_path}")

    print("All data processed and written incrementally.")

# Close the Parquet writer once the processing is done
def close_parquet_writer():
    global parquet_writer
    if parquet_writer:
        parquet_writer.close()
        print("Parquet writer closed.")

# Incrementally process data in 3-day batches and append to the Parquet file
process_and_append_data(file_paths, output_parquet_path, days_to_process=3)

# Close the Parquet writer when done
close_parquet_writer()

# Now you can load and verify the combined data
df_combined_parquet = pd.read_parquet(output_parquet_path)
print(df_combined_parquet.head())


Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet_traffic_data_dallas.parquet
Appended 3 days of data to /Users/maushariff/Downloads/Local_Disk_E/Byrav/Internet2_Data/Data/Dallas/combined_internet