In [1]:
!nvcc -V

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2023 NVIDIA Corporation
Built on Wed_Nov_22_10:17:15_PST_2023
Cuda compilation tools, release 12.3, V12.3.107
Build cuda_12.3.r12.3/compiler.33567101_0


In [2]:
#!pip install --extra-index-url=https://pypi.nvidia.com polars cudf-polars-cu11
!pip install polars[gpu] --extra-index-url=https://pypi.nvidia.com 

Looking in indexes: https://pypi.org/simple, https://pypi.nvidia.com
Collecting cudf-polars-cu12 (from polars[gpu])
  Downloading https://pypi.nvidia.com/cudf-polars-cu12/cudf_polars_cu12-24.10.1-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.4/53.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hINFO: pip is looking at multiple versions of cudf-polars-cu12 to determine which version is compatible with other requirements. This could take a while.
  Downloading https://pypi.nvidia.com/cudf-polars-cu12/cudf_polars_cu12-24.8.3-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.8/53.8 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cudf-cu12==24.8.* (from cudf-polars-cu12->polars[gpu])
  Downloading https://pypi.nvidia.com/cudf-cu12/cudf_cu12-24.8.3-cp310-cp310-manylinux_2_28_x86_64.whl (517.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m517.8/517.8 MB[0m

In [3]:
import polars as pl
import json
from pathlib import Path
import gc

In [4]:
def transform_and_save_to_parquet(file, output_dir):
    # LazyFrame으로 NDJSON 파일 읽기
    df = pl.scan_ndjson(file)
        
    # 각 세션의 이벤트를 분리하여 2차원 배열 구조로 변환
    expanded_df = df.explode("events").select(
        pl.col("session"),
        pl.col("events").struct.field("aid").alias("aid"),
        pl.col("events").struct.field("ts").alias("ts"),
        pl.col("events").struct.field("type").alias("type")
    )
    
    # type을 0, 1, 2, 3으로 치환
    expanded_df = expanded_df.with_columns(
        pl.when(pl.col("type") == "clicks").then(0)
        .when(pl.col("type") == "carts").then(1)
        .when(pl.col("type") == "orders").then(2)
        .otherwise(3).alias("type")
    )
    
    # 데이터 타입 최적화
    expanded_df = expanded_df.with_columns([
        pl.col("session").cast(pl.UInt32),
        pl.col("aid").cast(pl.UInt32),
        (pl.col("ts") // 1000).cast(pl.UInt32),     # timestamp를 초 단위로 줄임 (밀리초 -> 초 변환)
        pl.col("type").cast(pl.UInt8)
    ])
    
    # LazyFrame 병합 후 Parquet 파일로 저장
    combined_df = expanded_df.collect(engine="gpu")  # 데이터 수집
    combined_df.write_parquet(f"{output_dir}/test.parquet", compression="snappy")
    
    # 메모리 정리
    del combined_df
    gc.collect()

# NDJSON 파일을 읽어오기
file = Path("/kaggle/input/recsys-dataset/otto-recsys-test.jsonl")

# Parquet 파일로 저장할 디렉토리 설정
output_dir = "/kaggle/working"
Path(output_dir).mkdir(exist_ok=True)

# Parquet 파일로 저장
%time transform_and_save_to_parquet(file, output_dir)

CPU times: user 47.8 s, sys: 7.19 s, total: 55 s
Wall time: 20.3 s


In [5]:
def process_large_ndjson(file, output_dir, chunk_size):
    with open(file, 'r') as f:
        file_counter = 0

        while True:
            chunk = []
            for _ in range(chunk_size):
                line = f.readline()
                if not line:
                    break
                # JSON 파싱
                data = json.loads(line.strip())
                chunk.append(data)

            if not chunk:
                break

            df = pl.DataFrame(chunk)
            expanded_df = df.explode("events").select(
                pl.col("session"),
                pl.col("events").struct.field("aid").alias("aid"),
                pl.col("events").struct.field("ts").alias("ts"),
                pl.col("events").struct.field("type").alias("type")
            )

            # 이벤트 type 변환 및 데이터 타입 최적화
            expanded_df = expanded_df.with_columns([
                pl.col("session").cast(pl.UInt32),
                pl.col("aid").cast(pl.UInt32),
                (pl.col("ts") // 1000).cast(pl.UInt32),     # timestamp를 초 단위로 줄임 (밀리초 -> 초 변환)
                pl.when(pl.col("type") == "clicks").then(0)
                .when(pl.col("type") == "carts").then(1)
                .when(pl.col("type") == "orders").then(2)
                .otherwise(3)
                .cast(pl.UInt8)
                .alias("type")
            ])

            output_file = Path(output_dir) / f"transformed_chunk_{file_counter}.parquet"
            expanded_df.write_parquet(output_file, compression="snappy")

            del expanded_df
            del df
            gc.collect()

            file_counter += 1

# NDJSON 파일을 읽어오기
file = Path("/kaggle/input/recsys-dataset/otto-recsys-train.jsonl")

# Parquet 파일로 저장할 디렉토리 설정
output_dir = "/kaggle/working"
Path(output_dir).mkdir(exist_ok=True)

# JSON 처리
%time process_large_ndjson(file, output_dir, chunk_size=1300000)


CPU times: user 15min 48s, sys: 1min 14s, total: 17min 3s
Wall time: 16min 54s


In [6]:
train_df = pl.read_parquet("/kaggle/working/transformed_chunk_*.parquet")
train_df.write_parquet(f"{output_dir}/train.parquet", compression="snappy")
train = train_df.to_pandas()

In [7]:
train.head()

Unnamed: 0,session,aid,ts,type
0,0,1517085,1659304800,0
1,0,1563459,1659304904,0
2,0,1309446,1659367439,0
3,0,16246,1659367719,0
4,0,1781822,1659367871,0
