In [None]:
import os
import ijson
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm

RAW_DIR = "./data/raw"
OUTPUT_FILE = "./data/label.parquet"


def extract_labels_from_json(file_path):
    ranker_id = os.path.splitext(os.path.basename(file_path))[0]
    results = []
    # group_id = 0
    try:
        with open(file_path, "rb") as f:
            flights = ijson.items(f, "data.$values.item")
            for flight in flights:
                legs = flight.get("legs", [])

                for pricing in flight.get("pricings", []):
                    results.append(
                        {
                            "ranker_id": ranker_id,
                            # "group_id": group_id,
                            "row_id": pricing.get("ordinal", 0) + 1,
                            "totalPrice": float(pricing.get("totalPrice", 0.0)),
                            "category": pricing.get("category", None),
                            "labels": pricing.get("labels", []),
                        }
                    )
                # group_id += 1
    except Exception as e:
        print(f"Error: {e}")
    return results


def main():
    json_files = [
        os.path.join(RAW_DIR, f) for f in os.listdir(RAW_DIR) if f.endswith(".json")
    ]

    all_results = []
    with ProcessPoolExecutor(max_workers=16) as executor:
        futures = {
            executor.submit(extract_labels_from_json, fp): fp for fp in json_files
        }
        for future in tqdm(as_completed(futures), total=len(json_files)):
            result = future.result()
            if result:
                all_results.extend(result)

    return pd.DataFrame(all_results)


if __name__ == "__main__":
    extract_df = main()

In [11]:
print(len(extract_df))

21347281


In [12]:
print(extract_df.head())

                          ranker_id  row_id  totalPrice  category  \
0  b960cbc710b64950b1d76d1f06ab7968       1      8150.0       283   
1  b960cbc710b64950b1d76d1f06ab7968       2     10205.0       256   
2  b960cbc710b64950b1d76d1f06ab7968       3     25660.0       256   
3  99d7fb3f49c642f0b4d45d2156271d56       1      3717.0        15   
4  99d7fb3f49c642f0b4d45d2156271d56       2      4317.0         0   

     legs0_departureAt      legs0_arrivalAt legs1_departureAt legs1_arrivalAt  \
0  2024-09-28T04:45:00  2024-09-28T08:10:00              None            None   
1  2024-09-28T04:45:00  2024-09-28T08:10:00              None            None   
2  2024-09-28T04:45:00  2024-09-28T08:10:00              None            None   
3  2024-09-23T23:55:00  2024-09-24T01:30:00              None            None   
4  2024-09-23T23:55:00  2024-09-24T01:30:00              None            None   

  legs0_segments0_flightNumber legs1_segments0_flightNumber  \
0                         5310     

In [None]:
extract_df.to_parquet(OUTPUT_FILE)

In [None]:
import polars as pl

# df = pl.read_csv(OUTPUT_FILE, schema_overrides={"totalPrice": pl.Float64})
df = pl.read_parquet(OUTPUT_FILE)
print(df)

shape: (21_347_281, 4)
┌─────────────────────────────────┬────────┬──────────┬─────────────────────────────────┐
│ ranker_id                       ┆ row_id ┆ category ┆ labels                          │
│ ---                             ┆ ---    ┆ ---      ┆ ---                             │
│ str                             ┆ i64    ┆ i64      ┆ list[str]                       │
╞═════════════════════════════════╪════════╪══════════╪═════════════════════════════════╡
│ 8d114ce973cf41c4907f5f7f4fd6a6… ┆ 0      ┆ 23       ┆ ["BestPrice", "BestPriceTravel… │
│ 8d114ce973cf41c4907f5f7f4fd6a6… ┆ 1      ┆ 0        ┆ []                              │
│ 8d114ce973cf41c4907f5f7f4fd6a6… ┆ 2      ┆ 0        ┆ []                              │
│ 8d114ce973cf41c4907f5f7f4fd6a6… ┆ 3      ┆ 264      ┆ ["BestPriceCorporateTariff", "… │
│ 8d114ce973cf41c4907f5f7f4fd6a6… ┆ 4      ┆ 256      ┆ ["MinTime"]                     │
│ …                               ┆ …      ┆ …        ┆ …                    