In [10]:
import pandas as pd
import modin.pandas as mpd
import json
import os
import gc
import time
import logging
import ast
import warnings

input_file = "processed_open_library_data_dump.csv"
output_file = "cleaned_open_library_data.csv"

chunk_size = 10000
column_names = ["book_id", "revision", "timestamp", "json_data"]

first_chunk = not os.path.exists(output_file)
chunk_count = 0

logging.getLogger("distributed.utils_perf").setLevel(logging.ERROR)

Normalize the data

In [2]:
for chunk in pd.read_csv(input_file, chunksize=chunk_size, names=column_names, header=None):
    chunk_count += 1
    print(f"Processing chunk {chunk_count}...", end="\r")

    chunk["json_data"] = chunk["json_data"].apply(lambda x: json.loads(x) if isinstance(x, str) else {})
    normalized_df = pd.json_normalize(chunk["json_data"])
    final_df = pd.concat([chunk.drop(columns=["json_data"]), normalized_df], axis=1)

    final_df.to_csv(output_file, mode="w" if first_chunk else "a", index=False, header=first_chunk)
    first_chunk = False

    del chunk, final_df, normalized_df
    gc.collect()
    time.sleep(0.2)

print(f"\nFinished processing! {chunk_count} chunks processed. Data saved to {output_file}.")

Processing chunk 5315...
Finished processing! 5315 chunks processed. Data saved to cleaned_open_library_data.csv.


Convert list columns to strings

In [11]:
warnings.filterwarnings("ignore", category=pd.errors.DtypeWarning)

input_file = "cleaned_open_library_data.csv"
output_file = "converted_open_library_data.csv"

chunk_size = 10000
first_chunk = not os.path.exists(output_file)
chunk_count = 0
total_rows = 0

def safe_eval_list(x):
    try:
        val = ast.literal_eval(x)
        return ", ".join(map(str, val)) if isinstance(val, list) else x
    except:
        return x

def safe_extract_keys(x):
    try:
        val = ast.literal_eval(x)
        return ", ".join(d["key"] for d in val if isinstance(d, dict) and "key" in d) if isinstance(val, list) else x
    except:
        return x

In [12]:
for chunk in pd.read_csv(input_file, chunksize=chunk_size, low_memory=False, on_bad_lines='skip'):
    chunk_count += 1
    total_rows += len(chunk)
    print(f"Post-processing chunk {chunk_count} (Total rows: {total_rows})...", end="\r")

    list_columns = ["isbn_13", "isbn_10", "publishers", "oclc_numbers"]
    for col in list_columns:
        if col in chunk.columns:
            chunk[col] = chunk[col].apply(safe_eval_list)

    dict_list_columns = ["authors", "works"]
    for col in dict_list_columns:
        if col in chunk.columns:
            chunk[col] = chunk[col].apply(safe_extract_keys)

    chunk.to_csv(output_file, mode="w" if first_chunk else "a", index=False, header=first_chunk)
    first_chunk = False

    del chunk
    gc.collect()
    time.sleep(0.2)

print(f"\nPost-processing complete! {chunk_count} chunks and {total_rows} total rows saved to {output_file}.")


Post-processing chunk 10623 (Total rows: 106226248)...
Post-processing complete! 10623 chunks and 106226248 total rows saved to final_open_library_data.csv.


Drop unnecessary columns and convert isbn_10 to isbn_13

In [15]:
import pandas as pd
import gc
import time

input_file = 'final_open_library_data.csv'
output_file = 'simplified_open_library_data.csv'

chunk_size = 10000
first_chunk = True
chunk_count = 0
total_rows = 0

main_columns = [
    'book_id', 'title', 'full_title', 'isbn_13', 'isbn_10', 'key', 'works',
    'publishers', 'publish_date', 'publish_country', 'edition_name',
    'authors', 'by_statement',
    'subjects', 'genres', 'identifiers.goodreads',
    'identifiers.google', 'identifiers.doi',
    'identifiers.wikidata', 'identifiers.librarything', 'identifiers.better_world_books',
    'url', 'first_sentence', 'description'
]

def isbn10_to_isbn13(isbn10):
    isbn10 = str(isbn10)
    if len(isbn10) != 10 or not isbn10[:-1].isdigit():
        return ''
    isbn13 = '978' + isbn10[:-1]
    total = sum((3 if i % 2 else 1) * int(num) for i, num in enumerate(isbn13))
    check_digit = (10 - (total % 10)) % 10
    return isbn13 + str(check_digit)

for chunk in pd.read_csv(input_file, chunksize=chunk_size, low_memory=False, on_bad_lines='skip'):
    chunk_count += 1
    total_rows += len(chunk)
    print(f'Processing chunk {chunk_count} (Total rows: {total_rows})...', end='\r')

    chunk['isbn_13'] = chunk['isbn_10'].apply(isbn10_to_isbn13)
    final_df = chunk[main_columns]
    final_df.to_csv(output_file, mode='w' if first_chunk else 'a', index=False, header=first_chunk)
    first_chunk = False

    del chunk, final_df
    gc.collect()
    time.sleep(0.2)

print(f'\nFinished processing! {chunk_count} chunks processed. Data saved to {output_file}.')

Processing chunk 10623 (Total rows: 106226248)...
Finished processing! 10623 chunks processed. Data saved to simplified_open_library_data.csv.


Keep one book for each work


In [16]:
import pandas as pd
import gc
import time

input_file = 'simplified_open_library_data.csv'
output_file = 'unique_works_open_library_data.csv'

chunk_size = 10000
first_chunk = True
chunk_count = 0

def keep_one_book_per_work(chunk):
    return chunk.drop_duplicates(subset=['works'], keep='first')

for chunk in pd.read_csv(input_file, chunksize=chunk_size, low_memory=False, on_bad_lines='skip'):
    chunk_count += 1
    print(f'Processing chunk {chunk_count}...', end='\r')

    unique_chunk = keep_one_book_per_work(chunk)
    unique_chunk.to_csv(output_file, mode='w' if first_chunk else 'a', index=False, header=first_chunk)
    first_chunk = False

    del chunk, unique_chunk
    gc.collect()
    time.sleep(0.2)

print(f'\nFinished processing! {chunk_count} chunks processed. Data saved to {output_file}.')

Processing chunk 10623...
Finished processing! 10623 chunks processed. Data saved to unique_works_open_library_data.csv.
