In [1]:
import numpy as np
import pandas as pd
import polars as pl
import os
from pathlib import Path
from tqdm.notebook import tqdm
import math

print(f'numpy={np.__version__}')
print(f'pandas={pd.__version__}')
print(f'polars={pl.__version__}')

numpy=1.23.5
pandas=1.5.3
polars=0.16.16


In [4]:
# Расположение папок с исходными данными
CONFIG_ORIG_DATA_PATH = 'data/competition_data_final_pqt'

# Расположеение рабочих папок
CONFIG_DATA_ENCODED_LIGHT_PARQUET_PATH = 'data_encoded_light_parquet'
CONFIG_DICT_PATH = 'dicts'

In [None]:
os.makedirs(CONFIG_DICT_PATH, exist_ok=True)
os.makedirs(CONFIG_DATA_ENCODED_LIGHT_PARQUET_PATH, exist_ok=True)

# Подготовим справочник категорий

In [6]:
category_columns = ['region_name', 'city_name', 'cpe_manufacturer_name', 'cpe_model_name', 'url_host', 'cpe_type_cd', 'cpe_model_os_type', 'part_of_day', 'date']

for category in category_columns:
    print(f'process "{category}"')
    
    data = []
    for i, file in enumerate(Path(CONFIG_ORIG_DATA_PATH).glob('*.parquet')):
        print(f'\treading {file}')
        q = pl.scan_parquet(file).with_columns(pl.lit(i).alias('file_id')).select(pl.col(['user_id', category, 'file_id']))
        data.append(q)

    data = pl.concat(data).groupby(category).agg(
        [
            pl.count().alias('count'),
            pl.col('user_id').n_unique().alias('user_id_count'),
            pl.col('file_id').n_unique().alias('file_id_count'),
        ]).sort('count', descending=True).with_columns([pl.arange(low=0, high=pl.count()).cast(pl.Int32).alias(f'{category}_id')])
    
    print('collecting data')                                         
    data = data.collect()
    print(data)
    
    data.write_parquet(str(Path(CONFIG_DICT_PATH) / f'category_dict_{category}.parquet'))
    del data

process "region_name"
	reading data/competition_data_final_pqt/part-00000-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00001-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00002-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00003-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00004-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00005-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00006-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00007-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
	reading data/competition_data_final_pqt/part-00008-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parqu



# Исправляем cpe_model_os_type. Заменяеем Apple iOS на iOS

In [7]:
dict_path = str(Path(CONFIG_DICT_PATH) / f'category_dict_cpe_model_os_type.parquet')
cpe_model_os_type_dict = pl.read_parquet(dict_path)
print(cpe_model_os_type_dict)
cpe_model_os_type_dict = cpe_model_os_type_dict.with_columns(pl.when(pl.col('cpe_model_os_type_id') == 2).then(1).otherwise(pl.col('cpe_model_os_type_id')).alias('cpe_model_os_type_id'))
print(cpe_model_os_type_dict)
cpe_model_os_type_dict.write_parquet(dict_path)

shape: (3, 5)
┌───────────────────┬───────────┬───────────────┬───────────────┬──────────────────────┐
│ cpe_model_os_type ┆ count     ┆ user_id_count ┆ file_id_count ┆ cpe_model_os_type_id │
│ ---               ┆ ---       ┆ ---           ┆ ---           ┆ ---                  │
│ str               ┆ u32       ┆ u32           ┆ u32           ┆ i32                  │
╞═══════════════════╪═══════════╪═══════════════╪═══════════════╪══════════════════════╡
│ Android           ┆ 197859185 ┆ 281650        ┆ 10            ┆ 0                    │
│ iOS               ┆ 122764510 ┆ 132183        ┆ 10            ┆ 1                    │
│ Apple iOS         ┆ 2275740   ┆ 1489          ┆ 10            ┆ 2                    │
└───────────────────┴───────────┴───────────────┴───────────────┴──────────────────────┘
shape: (3, 5)
┌───────────────────┬───────────┬───────────────┬───────────────┬──────────────────────┐
│ cpe_model_os_type ┆ count     ┆ user_id_count ┆ file_id_count ┆ cpe_model_os_typ

# Кодирование исходных данных - заменяем текст на категории

In [9]:
category_columns = ['region_name', 'city_name', 'cpe_manufacturer_name', 'cpe_model_name', 'url_host', 'cpe_type_cd', 'cpe_model_os_type', 'part_of_day']

for i, file in enumerate(Path(CONFIG_ORIG_DATA_PATH).glob('*.parquet')):
    print(f'reading {file}')
    data_item = pl.read_parquet(file)
    
    category_list = []
    category_id_list = []
    for category in category_columns:
        print(f'process "{category}"')
        category_id = f'{category}_id'
        
        category_list.append(category)
        category_id_list.append(category_id)
        
        category_data = pl.read_parquet(str(Path(CONFIG_DICT_PATH) / f'category_dict_{category}.parquet'))
        data_item = data_item.join(category_data.select(pl.col([category_id, category])), on=category)
        del category_data
        
    data_item = data_item.with_columns([
        pl.col('user_id').cast(pl.Int32()).alias('user_id'),        
        pl.col('request_cnt').cast(pl.Int32()).alias('request_cnt'),                
        pl.col('price').fill_null(pl.lit(0)).cast(pl.Int32()).alias('price'),
        pl.col('date').cast(pl.Int32()).alias('date_int'),        
    ])
        
    data_item = data_item.sample(frac=1., shuffle=True, seed=10)
    data_item.select(pl.exclude(category_list)).write_parquet(Path(CONFIG_DATA_ENCODED_LIGHT_PARQUET_PATH) / file.name)   

    del data_item

reading data/competition_data_final_pqt/part-00000-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
process "region_name"
process "city_name"
process "cpe_manufacturer_name"
process "cpe_model_name"
process "url_host"
process "cpe_type_cd"
process "cpe_model_os_type"
process "part_of_day"
reading data/competition_data_final_pqt/part-00001-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
process "region_name"
process "city_name"
process "cpe_manufacturer_name"
process "cpe_model_name"
process "url_host"
process "cpe_type_cd"
process "cpe_model_os_type"
process "part_of_day"
reading data/competition_data_final_pqt/part-00002-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parquet
process "region_name"
process "city_name"
process "cpe_manufacturer_name"
process "cpe_model_name"
process "url_host"
process "cpe_type_cd"
process "cpe_model_os_type"
process "part_of_day"
reading data/competition_data_final_pqt/part-00003-aba60f69-2b63-4cc1-95ca-542598094698-c000.snappy.parque