In [3]:
import os
import json
import gzip
import time
import pandas as pd
import re
from pathlib import Path
from tqdm import tqdm
from memory_profiler import memory_usage


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [7]:
import json

def count_records_in_raw_files(raw_dir):
    record_counts = {}

    for subfolder in Path(raw_dir).iterdir():
        if not subfolder.is_dir():
            continue

        for file in subfolder.iterdir():
            count = 0
            if file.suffix == ".csv":
                df = pd.read_csv(file)
                count = len(df)

            elif file.suffix == ".json":
                with open(file, "r") as f:
                    for line in f:
                        try:
                            json.loads(line.strip())  # Verify line is valid JSON
                            count += 1
                        except json.JSONDecodeError:
                            continue  # Ignore bad lines

            record_counts[file] = count

    return record_counts

# Run check
raw_counts = count_records_in_raw_files(RAW_DIR)
print(f" Raw File Counts: {raw_counts}")


 Raw File Counts: {PosixPath('raw_data/tracks/tracks_1.json'): 350300, PosixPath('raw_data/tracks/tracks_0.csv'): 350300, PosixPath('raw_data/tracks/tracks_2.csv'): 350300, PosixPath('raw_data/tracks/tracks_4.csv'): 350300, PosixPath('raw_data/tracks/tracks_3.json'): 350300, PosixPath('raw_data/playlist_track/playlist_track_0.csv'): 871500, PosixPath('raw_data/playlist_track/playlist_track_2.csv'): 871500, PosixPath('raw_data/playlist_track/playlist_track_4.csv'): 871500, PosixPath('raw_data/playlist_track/playlist_track_1.json'): 871500, PosixPath('raw_data/playlist_track/playlist_track_3.json'): 871500, PosixPath('raw_data/orders/orders_3.json'): 224000, PosixPath('raw_data/orders/orders_2.csv'): 224000, PosixPath('raw_data/orders/orders_0.csv'): 224000, PosixPath('raw_data/orders/orders_4.csv'): 224000, PosixPath('raw_data/orders/orders_1.json'): 224000, PosixPath('raw_data/track_facts/track_facts_0.csv'): 350300, PosixPath('raw_data/track_facts/track_facts_2.csv'): 350300, PosixPat

In [9]:
import pandas as pd
import json

def compare_files(file1, file2):
    """Checks if two files contain the same records."""
    
    # Load CSV if it's a CSV file
    if file1.suffix == ".csv":
        df1 = pd.read_csv(file1)
    else:
        json_records = []
        with open(file1, "r") as f:
            for line in f:
                try:
                    json_records.append(json.loads(line.strip()))
                except json.JSONDecodeError:
                    continue
        df1 = pd.DataFrame(json_records)

    # Load second file
    if file2.suffix == ".csv":
        df2 = pd.read_csv(file2)
    else:
        json_records = []
        with open(file2, "r") as f:
            for line in f:
                try:
                    json_records.append(json.loads(line.strip()))
                except json.JSONDecodeError:
                    continue
        df2 = pd.DataFrame(json_records)

    identical = df1.equals(df2)
    print(f" Comparing {file1.name} vs. {file2.name}: {' Identical' if identical else ' Different'}")
    return identical

# Run comparisons within the "tracks" dataset
tracks_files = list(Path("raw_data/tracks").iterdir())
for i in range(len(tracks_files) - 1):
    compare_files(tracks_files[i], tracks_files[i + 1])


 Comparing tracks_1.json vs. tracks_0.csv:  Identical
 Comparing tracks_0.csv vs. tracks_2.csv:  Identical
 Comparing tracks_2.csv vs. tracks_4.csv:  Identical
 Comparing tracks_4.csv vs. tracks_3.json:  Identical


In [20]:
playlist_track_files = list(Path("raw_data/playlist_track").iterdir())
for i in range(len(playlist_track_files) - 1):
    compare_files(playlist_track_files[i], playlist_track_files[i + 1])

 Comparing playlist_track_0.csv vs. playlist_track_2.csv:  Identical
 Comparing playlist_track_2.csv vs. playlist_track_4.csv:  Identical
 Comparing playlist_track_4.csv vs. playlist_track_1.json:  Identical
 Comparing playlist_track_1.json vs. playlist_track_3.json:  Identical


We only process one file per subfolder because all the records are duplicated in each file as we investigated above. We're taking a composite key in a few files because a primary key is not identified for it. This is to ensure deduplication.

In [21]:
from pathlib import Path
import gzip
import json
import pandas as pd

def normalize_column_names(record):
    """Removes table prefixes from column names."""
    return {k.split(".")[-1]: v for k, v in record.items()}  # Keeps only the last part of the column name

def process_subfolder_fixed(subfolder_path, output_dir, error_log_file):
    """Processes **only one file per subfolder**, ensuring correct deduplication using primary keys."""
    subfolder_name = subfolder_path.name
    output_file = output_dir / f"{subfolder_name}.json.gz"

    output_dir.mkdir(parents=True, exist_ok=True)

    processed_count = 0
    seen_records = set()

    # Pick one representative file (CSV preferred over JSON if both exist)
    files = sorted(subfolder_path.iterdir())
    file_to_process = next((f for f in files if f.suffix == ".csv"), None)  # Prefer CSV
    if file_to_process is None:
        file_to_process = next((f for f in files if f.suffix == ".json"), None)  # Fallback to JSON

    if not file_to_process:
        print(f"No valid files found in {subfolder_name}, skipping...")
        return

    print(f" Processing only: {file_to_process.name}")

    with gzip.open(output_file, "wt") as gz_file, open(error_log_file, "a") as log:
        if file_to_process.suffix == ".csv":
            for chunk in pd.read_csv(file_to_process, chunksize=1000):
                for record in chunk.to_dict(orient="records"):
                    record = normalize_column_names(record)  # Standardize column names

                    # Extract unique identifier based on dataset
                    if subfolder_name == "tracks":
                        record_key = (record["TrackId"],)  # Unique per track
                    elif subfolder_name == "playlist_track":
                        record_key = (record["PlaylistId"], record["TrackId"])  # Unique per playlist-track pair
                    elif subfolder_name == "track_facts":
                        record_key = (record["genre"], record["minutes"], record["price"])  # Unique per track fact
                    elif subfolder_name == "orders":
                        record_key = (
                            record["invoice_date"],
                            record["UnitPrice"],
                            record["genre"],
                            record["media_type"],
                            record["minutes"],
                        )  # Unique per order
                    else:
                        record_key = tuple(sorted(record.items()))  # Fallback

                    if record_key in seen_records:
                        continue  # Skip duplicate records
                    seen_records.add(record_key)

                    gz_file.write(json.dumps(record) + "\n")
                    processed_count += 1

        elif file_to_process.suffix == ".json":
            with open(file_to_process, "r") as json_file:
                for line in json_file:
                    try:
                        record = json.loads(line.strip())
                        record = normalize_column_names(record)  # Standardize column names

                        if subfolder_name == "tracks":
                            record_key = (record["TrackId"],)
                        elif subfolder_name == "playlist_track":
                            record_key = (record["PlaylistId"], record["TrackId"])
                        elif subfolder_name == "track_facts":
                            record_key = (record["genre"], record["minutes"], record["price"])
                        elif subfolder_name == "orders":
                            record_key = (
                                record["invoice_date"],
                                record["UnitPrice"],
                                record["genre"],
                                record["media_type"],
                                record["minutes"],
                            )
                        else:
                            record_key = tuple(sorted(record.items()))

                        if record_key in seen_records:
                            continue
                        seen_records.add(record_key)

                        gz_file.write(json.dumps(record) + "\n")
                        processed_count += 1

                    except json.JSONDecodeError:
                        log.write(f" Skipping malformed line in {file_to_process}\n")

    print(f"Processed {processed_count} unique records from {subfolder_name}")


In [22]:
for subfolder in RAW_DIR.iterdir():
    if subfolder.is_dir():
        process_subfolder_fixed(subfolder, PROCESSED_DIR, ERROR_LOG)


 Processing only: tracks_0.csv
Processed 3503 unique records from tracks
 Processing only: playlist_track_0.csv
Processed 8715 unique records from playlist_track
 Processing only: orders_0.csv
Processed 2142 unique records from orders
 Processing only: track_facts_0.csv
Processed 782 unique records from track_facts


In [23]:
from collections import defaultdict

def count_unique_records_per_dataset(raw_dir):
    """
    Counts unique records for each dataset before processing.
    """
    dataset_counts = {}
    
    for subfolder in raw_dir.iterdir():
        if subfolder.is_dir():
            files = sorted(subfolder.iterdir())
            file_to_process = next((f for f in files if f.suffix == ".csv"), None)  # Prefer CSV
            if file_to_process is None:
                file_to_process = next((f for f in files if f.suffix == ".json"), None)  # Fallback to JSON

            if not file_to_process:
                continue  # Skip empty subfolders

            seen_records = set()
            print(f" Checking unique records in {file_to_process.name}")

            if file_to_process.suffix == ".csv":
                for chunk in pd.read_csv(file_to_process, chunksize=1000):
                    for record in chunk.to_dict(orient="records"):
                        record = {k.split(".")[-1]: v for k, v in record.items()}  # Normalize column names
                        seen_records.add(tuple(record.items()))

            elif file_to_process.suffix == ".json":
                with open(file_to_process, "r") as json_file:
                    for line in json_file:
                        record = json.loads(line.strip())
                        record = {k.split(".")[-1]: v for k, v in record.items()}  # Normalize column names
                        seen_records.add(tuple(record.items()))

            dataset_counts[subfolder.name] = len(seen_records)

    total_unique_records = sum(dataset_counts.values())
    return total_unique_records, dataset_counts


def count_output_records(processed_dir):
    """
    Counts unique records in processed .json.gz files.
    """
    output_counts = defaultdict(int)

    for file in processed_dir.glob("*.json.gz"):
        seen_records = set()
        with gzip.open(file, "rt") as gz_file:
            for line in gz_file:
                record = json.loads(line.strip())
                record = {k.split(".")[-1]: v for k, v in record.items()}  # Normalize column names
                seen_records.add(tuple(record.items()))
        output_counts[file.name] = len(seen_records)

    total_output_records = sum(output_counts.values())
    return total_output_records, output_counts


In [24]:
unique_input_count, dataset_counts = count_unique_records_per_dataset(RAW_DIR)
output_total, output_details = count_output_records(PROCESSED_DIR)

print(f"> Unique Input Records (before processing): {unique_input_count}")
print(f"> Total Output Records (after processing): {output_total}")
print(f"> Per Dataset Input Counts: {dataset_counts}")
print(f"> Per Processed File Counts: {output_details}")
print(f">> Records Matched: {unique_input_count == output_total}")


 Checking unique records in tracks_0.csv
 Checking unique records in playlist_track_0.csv
 Checking unique records in orders_0.csv
 Checking unique records in track_facts_0.csv
> Unique Input Records (before processing): 15142
> Total Output Records (after processing): 15142
> Per Dataset Input Counts: {'tracks': 3503, 'playlist_track': 8715, 'orders': 2142, 'track_facts': 782}
> Per Processed File Counts: defaultdict(<class 'int'>, {'playlist_track.json.gz': 8715, 'orders.json.gz': 2142, 'track_facts.json.gz': 782, 'tracks.json.gz': 3503})
>> Records Matched: True
