- Здесь непосредственно загружаем внешнюю информацию об SKU.
- Тут так же можно запускать весь ноутбук, правда есть несколько нюансов. (о них ниже)

In [2]:
import pickle
import pandas as pd
import numpy as np
import requests
from requests.exceptions import ConnectTimeout, ChunkedEncodingError, ConnectionError, ReadTimeout, JSONDecodeError
# from tqdm import tqdm
from tqdm.notebook import tqdm
import sys
from multiprocessing.dummy import Pool as ThreadPool
from time import sleep as time_sleep
from pathlib import Path
from datetime import datetime
from pytz import timezone
from pandarallel import pandarallel

tqdm.pandas()
requests.packages.urllib3.disable_warnings()
pandarallel.initialize()

INFO: Pandarallel will run on 16 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


In [3]:
start = datetime.strftime(datetime.now(timezone('Europe/Moscow')), '%Y-%b-%d %H:%M:%S')

In [4]:
def _save_df_to_zip(df_: pd.DataFrame, archive_name: str = 'archive', folder: str='data', replace: bool=False) -> None:
    # Путь к файлу
    file_path = Path(folder).joinpath(archive_name + '.zip')
    Path(folder).mkdir(exist_ok=True)
    # Проверяем, существует ли файл
    if file_path.exists() and not replace:
        # Получаем время создания файла
        time = datetime.fromtimestamp(file_path.lstat().st_atime).strftime('%Y-%m-%d %H-%M')

        # Создаем новое имя файла с добавлением времени Unix
        new_file_name = file_path.stem + " " + str(time) + file_path.suffix

        # Создаем новый путь для переименованного файла
        new_file_path = file_path.with_name(new_file_name)
        # Переименовываем файл
        file_path.rename(new_file_path)

# to csv
    compression_opts = dict(method='zip', archive_name=f'{archive_name}.csv')
    df_.to_csv(f'{folder}/{archive_name}.zip', index=False, compression=compression_opts, encoding='utf-8')

In [5]:
seg_df = pd.read_csv('data/segments.zip')
ret_df = pd.read_csv('data/retailers.zip')
loc_df = pd.read_csv('data/located_list.zip')
seg_id_df = pd.read_csv('data/segments_id.zip')[['uuid', 'name']].drop_duplicates(ignore_index=True)

# Function

## Запрос

In [6]:
def separation_frame(input_df: pd.DataFrame, sep: int=5) -> list[pd.DataFrame]:
    """
        Функция для дробления DataFrame, что бы потом паралельно обрабатывать их паралельно.
        sep: Сколько процентов будет занимать сегмент.
    """
    out_list = list()
    chunk_size = input_df.shape[0] // int(100/sep)
    for index in range(0, input_df.shape[0], chunk_size):
        out_list.append(input_df.iloc[index:index+chunk_size].reset_index())
    return out_list

def get_data(pool_input: tuple[tqdm, tuple[int, str]]):
    
    
    def get_response(url, params, headers):
        for _ in range(5):
            try:
                respons = requests.get(url, params=params, headers=headers, timeout=(10, 30), verify=False)
            except ConnectTimeout:
                time_sleep(1)
                continue
            except ChunkedEncodingError:
                time_sleep(1)
                continue
            except ConnectionError:
                time_sleep(1)
                continue
            except ReadTimeout:
                time_sleep(1)
                continue
            except Exception:
                exc_type, exc_obj, exc_tb = sys.exc_info()
                print(exc_type.__name__)
                return None
            else:    
                try:
                    return respons.json().get('items', None)
                except JSONDecodeError:
                    return None
        else:
            return None    
    
    
        
    pbar, (index, item) = pool_input
    
    out_list = list()
    
    res = loc_df[loc_df['slug'] == item.sity].min()
    headers = {
                'x-locality-geoid': str(res.geoId),
                'x-position-latitude': f'{res.lat:.5f}',
                'x-position-longitude': f'{res.lng:.5f}'
                }
        
    url = f'https://search.edadeal.io/api/v4/retailer/{item.data_uuid}/items'

    params = {  'addContent': ['true'],
                'checkAdult': ['true'],
                'excludeSegmentSlug': ['alcohol', 'pt_alcool', 'en_alcohol', 'es_alcohol', 'tr_alkol'],
                'groupBy': ['sku_or_meta'],
                'numdoc': ['599'],
                'page': ['0'],
                'segmentUuid': []
             }

    # for su1 in tqdm(seg_df['uuid'].unique()[:], desc=f'{item.sity} / {item.market}', leave=False):
    for su1 in seg_df['uuid'].unique()[:]:
        params['segmentUuid'] = [su1]
        
        respons_1 = get_response(url, params, headers)
        
        if respons_1:
            tmp_list_01 = list()
            
            try:
                su2_list = list(set([(x.get('segmentUuids', []))[-1] for x in respons_1]))
                len_sku_list = len(su2_list)
            except Exception:
                su2_list = None
            
            if not su2_list or (len_sku_list>550):
                su2_list = seg_df[seg_df['uuid'] == su1]['uuid level 02'].unique()
            
            for su2 in su2_list:
                params['segmentUuid'] = [su2]

                respons_2 = get_response(url, params, headers)
                                
                if respons_2:
                    tmp_list_02 = list()
                    try:
                        su3_list = list(set([(x.get('segmentUuids', []))[-1] for x in respons_2]))
                        len_sku_list = len(su3_list)
                    except Exception:
                        su3_list = None
                    
                    if not su3_list or (len_sku_list>550):
                        su3_list = seg_df[(seg_df['uuid'] == su1) & (seg_df['uuid level 02'] == su2)]['uuid level 03'].unique()
                    
                    for su3 in su3_list:
                        params['segmentUuid'] = [su3]
                        respons_3 = get_response(url, params, headers)

                        if respons_3:
                            tmp_list_02.append((su3, respons_3))
                    
                    if tmp_list_02:
                        tmp_list_01.extend(tmp_list_02)
                    else:
                        tmp_list_01.append((su2, respons_2))

            if tmp_list_01:
                out_list.extend(tmp_list_01)
            else:
                out_list.append((su1, respons_1))
        else:
            continue
            
    pbar.update(1)
    return (item, out_list)

def run_get_price(list_input, cores: int=2):
    # Первый нюанс!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    # Функция деления на потоки.
    # где cores - это количество потоков
    with tqdm(total=len(list_input), desc='Download', leave=False) as pbar:
        list_to_pool = list(zip(([pbar]*len(list_input)), list_input))
        with ThreadPool(cores) as pool:
            work_return = pool.map(get_data, list_to_pool)
    return work_return

In [7]:
def convert_data_to_df(input_list: list, seg_id_df) -> pd.DataFrame:
    import pandas as pd
    import numpy as np
    def convert_to_df(x: dict[str,dict[str,dict]]):
        if not pd.isnull(x):
            tmp_df = pd.Series({
                                            'title': x.get('title', np.nan),
                                            'sku_id': x.get('uuid', np.nan),
                                            'dateStart': pd.to_datetime(x.get('dateStart', np.nan), unit='ms'),
                                            'dateEnd': pd.to_datetime(x.get('dateEnd', np.nan), unit='ms'),
                                            'price': x.get('priceData', {}).get('new', {}).get('from', np.nan),
                                            'price_from': x.get('priceData', {}).get('new', {}).get('to', np.nan),
                                            'price_to': x.get('priceData', {}).get('new', {}).get('value', np.nan),
                                            'discountPercent': x.get('discountPercent', np.nan),
                                            'quantity': x.get('quantity', np.nan),
                                            'quantityUnit': x.get('quantityUnit', np.nan),
                                            'segmentUuids': x.get('segmentUuids', np.nan),
            })
            
        else:
            tmp_df = pd.Series({
                                            'title': np.nan,
                                            'sku_id': np.nan,
                                            'dateStart': pd.to_datetime(np.nan, unit='ms'),
                                            'dateEnd': pd.to_datetime(np.nan, unit='ms'),
                                            'price': np.nan,
                                            'price_from': np.nan,
                                            'price_to': np.nan,
                                            'discountPercent': np.nan,
                                            'quantity': np.nan,
                                            'quantityUnit': np.nan,
                                            'segmentUuids': np.nan,
            })
        return tmp_df
        
    
    tmp_df = pd.Series(input_list, name='tmp').to_frame()
    tmp_df[['market', 'tmp']] = tmp_df['tmp'].apply(lambda x: pd.Series(x))
    tmp_df = pd.concat((tmp_df['market'].apply(lambda x: x), tmp_df['tmp']), axis=1)
    tmp_df['date'] = pd.to_datetime(tmp_df['date'])
    tmp_df = tmp_df.explode('tmp', ignore_index=True)
    tmp_df[['uuid', 'tmp']] = tmp_df['tmp'].apply(lambda x: pd.Series(x))
    tmp_df = tmp_df.explode('tmp', ignore_index=True)
    # Второй нюанс!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    # Для ускорения работы обработки данных использую паралельное вычисление,
    # Закоментирован вариант без паралельной обработки.
    # tmp_df = pd.concat((tmp_df, tmp_df['tmp'].apply(convert_to_df)), axis=1)
    tmp_df = pd.concat((tmp_df, tmp_df['tmp'].parallel_apply(convert_to_df)), axis=1)
    tmp_df[['su1', 'su2', 'su3']] = tmp_df['segmentUuids'].apply(lambda x: pd.Series(x))
    tmp_df['title'] = tmp_df['title'].str.replace('\n', ' ')
    
    tmp_df = tmp_df[[
                    # 'index',
                    'date',
                    'sity',
                    'market',
                    'discounts',
                    'data_uuid',
                    'href',
                    # 'tmp',
                    # 'uuid',
                    'su1',
                    'su2',
                    'su3',
                    'title',
                    'sku_id',
                    'dateStart',
                    'dateEnd',
                    'price',
                    'price_from',
                    'price_to',
                    'discountPercent',
                    'quantity',
                    'quantityUnit',
                    # 'segmentUuids',
                    ]]
    
    tmp_df = tmp_df.drop_duplicates(ignore_index=True)
    
    tmp_df['su1'] = tmp_df['su1'].to_frame().merge(
                                                    right=seg_id_df,
                                                    how='left',
                                                    left_on='su1',
                                                    right_on='uuid'
                                                    )['name']

    tmp_df['su2'] = tmp_df['su2'].to_frame().merge(
                                                    right=seg_id_df,
                                                    how='left',
                                                    left_on='su2',
                                                    right_on='uuid'
                                                    )['name']

    tmp_df['su3'] = tmp_df['su3'].to_frame().merge(
                                                    right=seg_id_df,
                                                    how='left',
                                                    left_on='su3',
                                                    right_on='uuid'
                                                    )['name']
    
    tmp_df['sity'] = tmp_df['sity'].to_frame().merge(
                                                right=loc_df,
                                                how='left',
                                                left_on='sity',
                                                right_on='slug'
                                                )['localityName']
    
    
    return tmp_df

In [8]:
%%time
frame_list = separation_frame(ret_df, 3)
with tqdm(total=len(frame_list), desc='Total') as pbar:
    for index, df in enumerate(frame_list):
        out = run_get_price(list(df.iterrows()), cores=200)
        out = convert_data_to_df(out, seg_id_df)
        _save_df_to_zip(out, f'result {index:04d}')
        pbar.update(1)

Total:   0%|          | 0/34 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/162 [00:00<?, ?it/s]

Download:   0%|          | 0/8 [00:00<?, ?it/s]

CPU times: total: 1h 18min
Wall time: 4h 21min 56s


## Time Info

In [9]:
end = datetime.strftime(datetime.now(timezone('Europe/Moscow')), '%Y-%b-%d %H:%M:%S')
print(f'Начало: {start}')
print(f'Конец: {end}')

Начало: 2023-Oct-03 10:26:17
Конец: 2023-Oct-03 14:48:14
