In [1]:
!pip install -q duckdb
!pip install -q duckdb pyarrow

import os
import subprocess
import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time

# Константы путей
INPUT_DIR = '/kaggle/input/kkbox-churn-prediction-challenge'
WORKING_DIR = '/kaggle/working'

In [2]:
# -----------------------------------------------------------
# 1. Простая функция (Распакуй -> Конвертируй -> Удали)
# Для файлов, которые влезают на диск (train, members, transactions)
# -----------------------------------------------------------
def process_simple(archive_name, csv_name, parquet_name):
    print(f"\nОБРАБОТКА: {parquet_name}")
    start = time.time()
    
    # 1. Распаковка
    print(f"Распаковка...")
    os.system(f"7z x {INPUT_DIR}/{archive_name} -o{WORKING_DIR} -y > /dev/null")
    
    # 2. Конвертация DuckDB
    print(f"SQL Convert...")
    con = duckdb.connect()
    con.execute(f"""
        COPY (SELECT * FROM '{WORKING_DIR}/{csv_name}') 
        TO '{WORKING_DIR}/{parquet_name}' 
        (FORMAT PARQUET, COMPRESSION 'ZSTD')
    """)
    con.close()
    
    # 3. Удаление CSV (чтобы освободить место для следующего)
    print(f"Удаление CSV...")
    os.remove(f"{WORKING_DIR}/{csv_name}")
    
    print(f"Готово за {(time.time() - start)/60:.1f} мин.")

# -----------------------------------------------------------
# 2. ПОТОКОВАЯ ФУНКЦИЯ (Streaming)
# Только для user_logs. Читает архив напрямую в память.
# -----------------------------------------------------------
def process_streaming(archive_name, parquet_name):
    print(f"\nОБРАБОТКА (STREAMING): {parquet_name}")
    start = time.time()
    
    archive_path = f"{INPUT_DIR}/{archive_name}"
    parquet_path = f"{WORKING_DIR}/{parquet_name}"
    
    print(f"Запуск трубы (7z -> Python -> Parquet)...")
    
    process = subprocess.Popen(
        ['7z', 'x', archive_path, '-so'], 
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    
    chunk_iter = pd.read_csv(process.stdout, chunksize=1_000_000)
    
    writer = None
    
    for i, chunk in enumerate(chunk_iter):
        table = pa.Table.from_pandas(chunk)
        if writer is None:
            writer = pq.ParquetWriter(parquet_path, table.schema, compression='ZSTD')
        
        writer.write_table(table)
        
        if i % 10 == 0:
            print(f"   ...обработано {i} млн строк", end='\r')

    if writer:
        writer.close()
    
    process.terminate()
    print(f"\nСтриминг завершен за {(time.time() - start)/60:.1f} мин.")

# ===========================================================
# ЗАПУСК (По порядку, чтобы экономить место)
# ===========================================================

# 1. Train
process_simple('train.csv.7z', 'train.csv', 'train.parquet')

# 2. Members
process_simple('members_v3.csv.7z', 'members_v3.csv', 'members.parquet')

# 3. Transactions
process_simple('transactions.csv.7z', 'transactions.csv', 'transactions.parquet')

# 4. User Logs
process_streaming('user_logs.csv.7z', 'user_logs.parquet')

print("\nВСЕ ГОТОВО!")
!ls -lh /kaggle/working/*.parquet


ОБРАБОТКА: train.parquet
Распаковка...
SQL Convert...
Удаление CSV...
Готово за 0.1 мин.

ОБРАБОТКА: members.parquet
Распаковка...
SQL Convert...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Удаление CSV...
Готово за 0.4 мин.

ОБРАБОТКА: transactions.parquet
Распаковка...
SQL Convert...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Удаление CSV...
Готово за 1.3 мин.

ОБРАБОТКА (STREAMING): user_logs.parquet
Запуск трубы (7z -> Python -> Parquet)...
   ...обработано 390 млн строк
Стриминг завершен за 22.7 мин.

ВСЕ ГОТОВО!
-rw-r--r-- 1 root root 226M Jan 27 16:22 /kaggle/working/members.parquet
-rw-r--r-- 1 root root  32M Jan 27 16:22 /kaggle/working/train.parquet
-rw-r--r-- 1 root root 727M Jan 27 16:23 /kaggle/working/transactions.parquet
-rw-r--r-- 1 root root 7.3G Jan 27 16:46 /kaggle/working/user_logs.parquet


In [3]:
files = ['train', 'members', 'transactions', 'user_logs']

print(f"{'ФАЙЛ':<15} | {'РАЗМЕР (MB)':<12} | {'СТРОК (ROWS)':<15}")
print("-" * 50)

for name in files:
    path = f"/kaggle/working/{name}.parquet"

    if os.path.exists(path):
        size_mb = os.path.getsize(path) / (1024 * 1024)

        try:
            row_count = duckdb.query(f"SELECT COUNT(*) FROM '{path}'").fetchone()[0]
            print(f"{name:<15} | {size_mb:<12.1f} | {row_count:<15}")
        except Exception as e:
            print(f"{name:<15} | {size_mb:<12.1f} | ОШИБКА ЧТЕНИЯ")
    else:
        print(f"{name:<15} | НЕ НАЙДЕН")

ФАЙЛ            | РАЗМЕР (MB)  | СТРОК (ROWS)   
--------------------------------------------------
train           | 31.5         | 992931         
members         | 225.2        | 6769473        
transactions    | 726.7        | 21547746       
user_logs       | 7381.7       | 392106543      
