In [1]:
from skt.gcp import (
    PROJECT_ID,
    bq_insert_overwrite,
    bq_to_df,
    bq_to_pandas,
    get_bigquery_client,
    bq_table_exists,
    get_max_part,
    load_query_result_to_table,
    pandas_to_bq,
    pandas_to_bq_table,
    load_bigquery_ipython_magic,
    get_bigquery_client,
    _print_query_job_results,
    load_query_result_to_partitions
    
)

from skt.ye import (
    get_hdfs_conn,
    get_spark,
    hive_execute,
    hive_to_pandas,
    pandas_to_parquet,
    slack_send
)
from skt.github_utils import GithubUtil
from skt.vault_utils import get_secrets


In [2]:
from datetime import date, datetime, timedelta

import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from dateutil.relativedelta import relativedelta
from pyhive import hive

from copy import deepcopy
from joblib import Parallel, delayed
import os
import sys
from git import Repo
from contextlib import contextmanager
from tqdm.notebook import tqdm
import warnings

In [3]:
secrets = get_secrets('github/sktaiflow')
token = secrets['token']

proxies = {
    'http': secrets['proxy'],
    'https': secrets['proxy'],
}

# Customer func for cloning git modules

In [4]:
@contextmanager
def proxy(proxies):
    env_backup = dict(os.environ)
    os.environ["HTTP_PROXY"] = proxies["http"]
    os.environ["HTTPS_PROXY"] = proxies["https"]
    yield
    os.environ.clear()
    os.environ.update(env_backup)

In [5]:
def slack_sending(channel_name:str, msg:str="test", is_adot:bool=True):
    if "#" not  in channel_name:
        channel_name += "#" + channel_name

    slack_send(
        text=msg,
        username="SKT",
        channel=channel_name,
        icon_emoji=":large_blue_circle:",
        blocks=None,
        dataframe=False,
        adot=is_adot
    )


In [6]:
class GithubUtil_custom(GithubUtil):
    def __init__(self, token, proxies, **kwargs):
        super().__init__(token, proxies)
    
    def clone_from_repo(self, git_url, branch="main", git_save_path="/temp"):
        try:
            if self._proxies:
                with proxy(self._proxies):
                    response = Repo.clone_from(git_url, git_save_path, branch=branch)
                    return {"code": "200", "response": response}
            else:
                msg = f"proxy must be passed"
                raise Exception(msg)    
        except Exception as e:
            msg = f"cloning git repo:{git_url} branch:{branch} failed {e}"
            slack_sending(msg=msg, channel_name=channel_name, is_adot=True)
            raise Exception(msg)

In [7]:
git_url ='https://github.com/sktaiflow/onemodelV3-opensearch-engine.git'
branch = 'develop'
git_save_path = '/home/x1112436/shared/1112436/git'

In [8]:
import shutil
shutil.rmtree(git_save_path)
os.makedirs(git_save_path, exist_ok=True)

In [9]:
gitobj= GithubUtil_custom(token=token, proxies=proxies)

In [10]:
response = gitobj.clone_from_repo(git_url=git_url, branch=branch, git_save_path=git_save_path)

In [11]:
module_path = os.path.join(git_save_path, "dags")
sys.path.append(module_path)
sys.path.append(git_save_path)

In [12]:
#!pip install datasets==2.19.1
#!pip install pydantic==2.7.1
#!pip install loguru==0.7.2


In [13]:
## import from module
from onemodelV3.opensearch_engine.indexing_engine.preprocessor import OpensearchPreprocessor
from onemodelV3.opensearch_engine.indexing_engine.func import *




In [14]:
# get file list
def get_gzip_files(directory='./temp/indexing/input'):
    from pathlib import Path
    gzip_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".gzip"):
                file_path = os.path.join(root, file)
                gzip_files.append(file_path)

    return gzip_files

file_list = get_gzip_files(directory="/home/x1112436/shared/1112436/indexing_data")

In [15]:
file_list[0]

'/home/x1112436/shared/1112436/indexing_data/emb_result_25.parquet.gzip'

In [16]:
from onemodelV3.opensearch_engine.indexing_engine.preprocessor import AbstractPreprocessor
from typing import List, Union
from datasets import (
    load_dataset, 
    Dataset, 
    DatasetDict,
    IterableDatasetDict
)
from torch.utils.data import (
    IterableDataset, 
    Dataset
)
class OpensearchPreprocessor(AbstractPreprocessor):
    index_name = "onemodelV3"
    
    def __init__(self, args, **kwargs):        
        super().__init__(args)
    
    @classmethod
    def load(cls, file_path:Union[str, List], split:str=None, keep_in_memory:bool=True, is_cache:bool=True) -> IterableDataset:        
        stream = True
        dataset = super(OpensearchPreprocessor, cls).load(
                file_path=file_path, 
                split=split, 
                stream=stream, 
                keep_in_memory=keep_in_memory,
                is_cache=is_cache
        )
        return dataset
        

In [17]:
data = file_list[0]

In [18]:
dataset = OpensearchPreprocessor.load(data)

In [160]:
data_list = []
for i, data in enumerate(dataset):
    data_list.append(data['adot_profile_feature'])    

In [43]:
#data = [datum next(iter(dataset))

In [20]:
from dags.onemodelV3.opensearch_engine.mapper import (
    DEFAULT_VALUES, 
    MnoprofileKeys, 
    select_default_value, 
    MnoprofileKeysKeysModel, 
    mno_profile_mappings,
    new_mno_profile_mappings
)

In [162]:
from collections import defaultdict
def profile_normalize(profile:str, delimiter='<|n|>'):
        """성별, 나이"""
        mno_profile = profile["mno_profile_feature"]
        adot_profile = profile["adot_profile_feature"]
        ##

        mno_profiles = mno_profile.split(delimiter)
        mno_profile_dict = dict()
        for profile in mno_profiles:
            key, val = profile.split(':')
            null_values = mno_profile_mappings[key]
            if val in select_default_value(field_name=null_values):
                continue
            elif val.strip() =="있음":
                mno_profile_dict[key] = key.split("이력")[0].strip()
            else:
                mno_profile_dict[key] = val

        mno_template_dict = defaultdict(list)
        for key, val in new_mno_profile_mappings.items():
            mno_template_dict[val] = []

        for key, val in mno_profile_dict.items():
            new_feature = new_mno_profile_mappings[key]
            mno_template_dict[new_feature].append(val)
        
        mno_preferences = mno_template_dict.get('preference', [])
        mno_preference_template = ''
        if mno_preferences:
            mno_preference_dict = defaultdict(set)
            mno_preference = mno_preferences[0]
            mnopreference_list = mno_preference.split(',')
            for mno_prefernce in mnopreference_list:
                split_mno_preference = mno_prefernce.split('_')
                if len(split_mno_preference) == 2:
                    upper_cate, lower_cate = split_mno_preference
                else:
                    upper_cate = split_mno_preference[0]
                mno_preference_dict[upper_cate].add(lower_cate)
                
            for key, val in mno_preference_dict.items():
                val_str = ','.join(val)
                if mno_preference_template == '': mno_preference_template = f"{key}: {val_str}"
                else: mno_preference_template += '\n' + f"{key}: {val_str}"
        else:
            mno_preference_template = ''
        mno_template_dict['preference'] = mno_preference_template
        return mno_template_dict

In [152]:
from collections import defaultdict
def profile_normalize(profile:str, delimiter='<|n|>'):
        """성별, 나이"""
        mno_profile = profile["mno_profile_feature"]
        adot_profile = profile["adot_profile_feature"]
        ##

        adot_profiles = adot_profile.split(delimiter)
        adot_profile_dict = dict()

In [163]:
dict(profile_normalize(next(iter(dataset))))

{'preference': '쇼핑: 오픈마켓,해외직구\n미디어/엔터테인먼트: OTT',
 'gender': ['여자'],
 'age': ['42세'],
 'service_duration': ['11년'],
 'days_after_change': ['1355일'],
 'mno_status': ['베이직플러스', '고가', 'samsung', 'vip', '가족 결합'],
 'plan_price': ['59000원'],
 'avg_data_usage': ['13gb']}

In [40]:
adot_profile = data["adot_profile_feature"]
adot_profiles = adot_profile.split('<|n|>')

In [164]:
data_list[0]

'선호 도메인 : 없음<|n|>선호 카테고리 : 없음<|n|>선호 아이템 : 없음<|n|>성별 : 여성<|n|>나이 : 42<|n|>활성 상태 : 복귀<|n|>다중 도메인 성향 : 없음<|n|>인기 컨텐츠 선호도 : 없음<|n|>사용성 기준 : 라이트유저<|n|>헤비유저인 도메인 : T 서비스'

In [157]:
preferred_category = set()
preferred_item = set()
preferred_domain =set()
for data in data_list:
    preferred_domain.add(data.split('<|n|>')[0])
    preferred_category.add(data.split('<|n|>')[1])
    preferred_item.add(data.split('<|n|>')[2])
    

In [159]:
preferred_domain

{'선호 도메인 : 게임, 전화',
 '선호 도메인 : 날씨',
 '선호 도메인 : 없음',
 '선호 도메인 : 운세',
 '선호 도메인 : 음악',
 '선호 도메인 : 전화',
 '선호 도메인 : 포토',
 '선호 도메인 : 포토, 게임'}

In [158]:
preferred_category

{'선호 카테고리 : 게임(하이퍼캐쥬얼, 심리테스트) ',
 '선호 카테고리 : 없음',
 '선호 카테고리 : 음악(장르 : 국내 댄스/일렉, OST/BGM, 국내 팝/어쿠스틱 & 가수 : 르세라핌, 악뮤, (여자)아이들) ',
 '선호 카테고리 : 음악(장르 : 국내 알앤비, 국내 발라드, 국내 댄스/일렉 & 가수 : 비비, 아이유, 트와이스) ',
 '선호 카테고리 : 음악(장르 : 해외 팝, 해외 락,  & 가수 : 스콜피온스, 산타나, 신디 로퍼) '}

In [100]:
preferred_item

{'선호 아이템 : 게임(미니펫, 스택폴, 진짜 모습 테스트) ',
 '선호 아이템 : 없음',
 "선호 아이템 : 음악(Perfect Night, Love Lee, Turn Up The Sunshine (PNAU Remix / From 'Minions: The Rise of Gru' Soundtrack)) ",
 '선호 아이템 : 음악(Wind Of Change, Love of My Life (Live), Smooth) ',
 '선호 아이템 : 음악(밤양갱, Discord, Love wins all) '}

In [30]:
mno_profile_mappings

{'관심사': 'interests',
 '성별': 'gender',
 '나이': 'age',
 '서비스 사용 기간': 'service_duration',
 '기변 후 경과일': 'days_after_change',
 '요금제 이름': 'plan_name',
 '요금제 가격': 'plan_price',
 '3개월 평균 데이터 사용량': 'avg_data_usage',
 '단말기 가격': 'device_price',
 '단말기 제조사': 'device_manufacturer',
 '멤버십 등급': 'membership_level',
 '멤버십 사용 이력': 'membership_history',
 '가족 결합 이력': 'family_bundle_history',
 '로밍 사용 이력': 'roaming_history',
 '세컨디바이스 보유 여부': 'second_device',
 '소액 및 DCB 결제 이력': 'micropayment_history'}