In [1]:
import os
import glob
import csv
import json
import pandas as pd
import dask.dataframe as dd

In [None]:
# Merging all JSONL files into multiple chunks of 1000 files each

def merge_jsonl_files(input_folder, output_folder, chunk_size=1000):
    files = [f for f in os.listdir(input_folder) if f.endswith(".jsonl")]
    total_files = len(files)
    
    # Counter for creating different chunk files
    chunk_counter = 1
    
    for i in range(0, total_files, chunk_size):
        chunk_files = files[i:i + chunk_size]
        output_file = os.path.join(output_folder, f'merged_chunk_{chunk_counter}.jsonl')
        
        with open(output_file, 'w') as outfile:
            for filename in chunk_files:
                file_path = os.path.join(input_folder, filename)
                with open(file_path, 'r') as infile:
                    for line in infile:
                        outfile.write(line)  # Append each line to the output file
        
        print(f"Chunk {chunk_counter} created: {output_file}")
        chunk_counter += 1

input_folder = '/path/to/input_folder'  # Update with the actual input folder path
output_folder = '/path/to/output_folder'  # Update with the actual output folder path

# Usage
merge_jsonl_files(input_folder, output_folder)

In [None]:
# Get the total number of lines in JSONL files

def count_lines_in_jsonl_files(directory_path):
    total_lines = 0
    
    # Loop through all the files in the specified directory
    for filename in os.listdir(directory_path):
        if filename.endswith('.jsonl'):  # Check if it's a JSONL file
            file_path = os.path.join(directory_path, filename)
            
            # Open the JSONL file and count the lines
            with open(file_path, 'r') as file:
                total_lines += sum(1 for line in file)
    
    return total_lines

# Specify the directory containing your JSONL files
directory_path = '/path/to/jsonl_directory'  # Update with the actual directory path
result = count_lines_in_jsonl_files(directory_path)
print(f"Total number of lines in all JSONL files: {result}")

In [None]:
# Convert JSONL files into CSV files

def same_length(flattened: dict):
    max_len = max((len(v) for v in flattened.values() if isinstance(v, list)), default=0)
    for key in flattened.keys():
        if isinstance(flattened[key], list) and len(flattened[key]) < max_len:
            flattened[key].extend([None] * (max_len - len(flattened[key])))
    return flattened

def process_value(keys, value, flattened):
    if isinstance(value, dict):
        for key in value.keys():
            process_value(keys + [key], value[key], flattened)
    elif isinstance(value, list):
        for v in value:
            process_value(keys, v, flattened)
    else:
        jkey = '__'.join(keys)
        if jkey in flattened:
            if isinstance(flattened[jkey], list):
                flattened[jkey].append(value)
            else:
                flattened[jkey] = [flattened[jkey], value]
        else:
            flattened[jkey] = value

def flatten_json(json_data):
    flattened_result = {}
    json_list = json_data if isinstance(json_data, list) else [json_data]
    for j in json_list:
        for key in j.keys():
            process_value([key], j[key], flattened_result)
    return flattened_result

# Folder containing JSONL files
input_folder = "/path/to/input_folder/"  # Update with the actual input folder path

# Folder to store the output files
output_folder = "/path/to/output_folder/"  # Update with the actual output folder path

# Check if the output folder exists, if not, create it
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Get all .jsonl files in the folder
input_files = [os.path.join(input_folder, f) for f in os.listdir(input_folder) if f.endswith('.jsonl')]

for input_file in input_files:
    flattened_list = []

    try:
        with open(input_file, "r") as f:
            for line in f:
                try:
                    json_obj = json.loads(line.strip())  # Load each JSON object
                    flat = flatten_json(json_obj)  # Flatten using the flatten_json function
                    flattened_list.append(same_length(flat))  # Ensure same-length columns
                except json.JSONDecodeError:
                    print(f"Skipping invalid JSON line in {input_file}")

    except (FileNotFoundError, PermissionError, OSError):
        print(f"Error opening file: {input_file}")
        continue

    # Convert list of dicts to DataFrame using pandas
    df = pd.DataFrame(flattened_list)

    # Adjust partitioning as needed based on dataset size
    ddf = dd.from_pandas(df, npartitions=50)

    # Output file path with the same name as the input file but with .csv extension
    output_file = os.path.join(output_folder, os.path.splitext(os.path.basename(input_file))[0] + ".csv")

    # Write the DataFrame to a CSV file using Dask
    ddf.to_csv(output_file, index=False, encoding='utf-8', single_file=True)
    print(f"Successfully written to {output_file}")

In [None]:
# Merge all csv files

files = glob.glob("/path/to/input_folder/*.csv") # Update with actual path
output_file = "/path/to/output_folder/output.csv" # Update with actual path

# Process in chunks and append data
with open(output_file, "w", encoding="utf-8", newline='') as outfile:
    first_file = True  # Flag to track if it's the first file
    for file in files:
        for chunk in pd.read_csv(file, dtype=str, low_memory=False, chunksize=100000):  # Process 100K rows at a time
            chunk.to_csv(outfile, index=False, header=first_file, mode="a")  # Only first file has headers
            first_file = False  # Disable headers for the next files

print("Merging completed successfully")

In [None]:
# Get the total number of rows in the CSV files

csv_files = glob.glob("/path/to/csv_folder/*.csv") # Update with actual path

row_count = 0

for file in csv_files:
    with open(file, "r", encoding="utf-8", errors="ignore") as f:
        reader = csv.reader(f)  # Correctly reads CSV with multi-line rows
        header = next(reader, None)  # Skip header if present
        row_count += sum(1 for _ in reader)  # Count rows properly

print(f"Total number of rows: {row_count}")