# 1. 데이터 로드

In [2]:
# #  버전 설치
# !pip install -r requirements.txt

In [1]:
import os
import asyncio
import shutil
import pandas as pd
import pickle
from pytz import timezone
from datetime import datetime,timedelta
import nest_asyncio
from utils.api_set import APIClient
import utils.utils as utils
import utils.formatting as formatting
import models.crawling.trend as trend 
from models.crawling.collect_keywords import collect_keywords
from models.crawling.google_trend import collect_rising_keywords
from models.naver.news import main_news 
from models.anaysis import execute_analysis , process_results ,process_results_month

Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



# 2. API설정

In [2]:

# API 설정
BASE_URL = utils.get_secret("BASE_URL")
CUSTOMER_ID = utils.get_secret("CUSTOMER_ID")
API_KEY = utils.get_secret("API_KEY")
SECRET_KEY = utils.get_secret("SECRET_KEY")
URI = utils.get_secret("URI")
METHOD = utils.get_secret("METHOD")
# API 클라이언트 인스턴스 생성
api_client = APIClient(BASE_URL, CUSTOMER_ID, API_KEY, SECRET_KEY,URI,METHOD)
# 키 로드
keywords_data = utils.load_keywords('main_keyword.json')

# 오늘의 날짜 가져오기
formatted_today, day = utils.get_today_date()


utils.make_directory('./data')
utils.make_directory('./data/rl_srch')
utils.make_directory(f'./data/rl_srch/{day}')  # 키워드별 연관검색어 리스트 저장

# 검색어 리스트와 결과 저장 경로 설정
srch_keyword = ['keyword_final']  
save_path = './data/rl_srch/'  


# 3. 연관검색어 수집

In [3]:


nest_asyncio.apply()

async def main(srch_keyword, day):
    # 오늘 날짜로 폴더 경로 생성
    folder_path = './data/rl_srch/' + datetime.now().strftime('%y%m%d')
    file_path = f"{folder_path}/collected_keywords.csv"
    
    # 폴더가 존재하는지 확인
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    
    # 파일이 존재하는지 확인
    if os.path.isfile(file_path):
        # 파일이 존재하면, 데이터를 읽어옵니다.
        collected_keywords_data = pd.read_csv(file_path)
    else:
        # 파일이 없으면, collect_keywords 함수를 호출해서 데이터를 수집합니다.
        collected_keywords_data = await collect_keywords(srch_keyword, day)
        # 결과를 CSV로 저장
        collected_keywords_data.to_csv(file_path, index=False)
    
    return collected_keywords_data
collected_keywords_data=asyncio.run(main(srch_keyword, day))

collected_keywords_dat_copy=asyncio.run(main(srch_keyword, day))

Unexpected response structure for keyword '세금': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '외국인순매수': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '미국주식': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '달러환율': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '개인연금': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '채권': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '조각투자': {'code': 429, 'message': 'Too Many Requests'}
Unexpected response structure for keyword '건강보험': {'code': 429, 'message': 'Too Many Requests'}


In [4]:
## 중복검색어컬럼 생성
temp_df = utils.generate_unique_search_terms(collected_keywords_dat_copy)

## 중복제거하고 50개씩 뽑는 로직 
##  check_list: 2250개의 데이터(선별함수에 사용될 검색어별 50개의 연관키워드)를 담고있는 리스트 ,
##  collected_keywords_data: 사용할 df
collected_keywords_data,check_list = utils.get_top_50_unique_items(collected_keywords_data,temp_df)
collected_keywords_data= utils.add_client_info(collected_keywords_data)
new_columns = ['일별급상승', '주별급상승', '월별급상승', '주별지속상승', '월별지속상승', '월별규칙성']

for column in new_columns:
    collected_keywords_data[column] = 0
def groupped_df(name,collected_keywords_data):
    grouped = collected_keywords_data.groupby(name)
    df_list = [group for _, group in grouped]
    return df_list
df_list=groupped_df('id',collected_keywords_data)
n=len(df_list)


In [5]:

# 데이터를 로드하거나 크롤링하여 반환하는 비동기 함수
async def load_or_crawl_data(df_list, clients):
    today_date_str = datetime.now().strftime("%y%m%d")
    directory = f"./data/trend_data/{today_date_str}"
    save_path = f"{directory}/data_{today_date_str}.pkl"
    
    # 파일이 존재하면 데이터 로드
    if os.path.exists(save_path):
        with open(save_path, 'rb') as f:
            results = pickle.load(f)
    else:
        # 파일이 없으면 비동기 크롤링 시작
        results = await run_all(df_list, clients)
        # 결과 데이터 저장
        if not os.path.exists(directory):
            os.makedirs(directory)
        with open(save_path, 'wb') as f:
            pickle.dump(results, f)
    
    return results

# 비동기 크롤링 함수
async def trend_main(df, clients):
    params = {
        "search_keywords": list(df['연관키워드']),
        "id": df['id'].iloc[0],
        "pw": df['pw'].iloc[0],
        "api_url": "https://openapi.naver.com/v1/datalab/search",
        "name": '연관검색어'
    }
    api_url = "https://openapi.naver.com/v1/datalab/search"
    
    # trend_maincode 함수 실행
    results = await trend.trend_maincode(params, clients, api_url)
    return results

async def run_all(df_list, clients):
    tasks = [trend_main(df, clients) for df in df_list]
    results = await asyncio.gather(*tasks)
    return results

clients = utils.get_secret("clients")  # clients 정보를 로드

# 이벤트 루프 실행 및 데이터 로드 또는 크롤링
trend_main_data = asyncio.run(load_or_crawl_data(df_list, clients))
results = trend_main_data.copy()

병렬로 처리

In [6]:

# 전역 변수로 리스트 초기화
month_rule_list_a = []
rising_list_a = [[], []]  # 주별 상승, 월별 상승
select_list_a = [[], [], []]  # 일별 선택, 주별 선택, 월별 선택
execute_analysis(results,month_rule_list_a,rising_list_a,select_list_a)


# month_rule_list=[]
select_list=[[],[],[]]

rising_list=[[],[]]
rising_month_list=[]


# 각 리스트를 처리
select_list[0] = process_results(select_list_a[0])
select_list[1] = process_results(select_list_a[1])
select_list[2] = process_results(select_list_a[2])

rising_list[0] = process_results(rising_list_a[0])
rising_list[1] = process_results(rising_list_a[1])

 # month_rule_list_a를 처리하면서 추가 데이터 처리를 포함
month_rule_list = []  # 초기화가 필요할 수 있습니다.
for result in month_rule_list_a:
    if not all(value is None for value in result) and result[0] is not None:
        column_names = result[0].columns
        data_values_list = result[0][column_names].values.flatten()  # 데이터를 1D 배열로 변환
        additional_data = {
            'Indicator': data_values_list.tolist(),  # numpy 배열을 리스트로 변환
            'RisingMonth': result[3],
            '유형': '월별규칙성'  # 모든 결과에 대해 '유형'을 '월별규칙성'으로 설정
        }
        month_rule_list += process_results_month([result], additional_data=additional_data)


In [8]:

# 리스트와 유형을 매핑
lists_and_types = [
    (select_list[0], '일별급상승'),
    (select_list[1], '주별급상승'),
    (select_list[2], '월별급상승'),
    (rising_list[0], '주별지속상승'),
    (rising_list[1], '월별지속상승'),
    (month_rule_list, '월별규칙성')
]


# 모든 리스트를 처리하고 하나의 데이터프레임으로 병합
processed_dfs = [utils.process_and_concat(df_list, label) for df_list, label in lists_and_types]


# 비어 있지 않은 DataFrame들만 병합
graph_result = pd.concat([df for df in processed_dfs if not df.empty]).reset_index(drop=True)

graph_result.reset_index(drop=True, inplace=True)
# 불필요한 컬럼 삭제 및 '주간지속상승'을 '주별지속상승'으로 수정


graph_result['유형'].replace({'주간지속상승': '주별지속상승'}, inplace=True)
graph_result['RisingMonth'] = graph_result['RisingMonth'].replace({None: 0})
graph_result['RisingMonth'] = graph_result['RisingMonth'].fillna(' ')
# # 정렬
graph_result.sort_values(by=['연관검색어', '유형', '검색일자'], ascending=[True, True, True], inplace=True)

# # 최종 결과 출력 ( 그래프 함수  )
graph_result.reset_index(drop=True, inplace=True)
print(1)
############################################################################################################


flags_and_lists = [
    ("일별 급상승", select_list[0]),
    ("주별 급상승", select_list[1]),
    ("주별 지속상승", rising_list[0]),
    ("월별 급상승", select_list[2]),
    ("월별 지속상승", rising_list[1]),
    ("월별 규칙성", month_rule_list),
]
for flag_name, data_list in flags_and_lists:
    # data_list가 리스트인지 확인
    if not isinstance(data_list, list):
        print(f"{flag_name}: data_list가 리스트가 아닙니다.")
        continue
    
    # data_list 내의 각 요소가 DataFrame인지, '연관검색어' 컬럼이 있는지 확인
    for idx, df in enumerate(data_list):
        if not isinstance(df, pd.DataFrame):
            print(f"{flag_name}: 인덱스 {idx}에 DataFrame이 아닌 요소가 있습니다.")
        elif "연관검색어" not in df.columns:
            print(f"{flag_name}: 인덱스 {idx}의 DataFrame에 '연관검색어' 컬럼이 없습니다.")

# utils.update_keywords_flag 함수를 호출하기 전에 각 data_list의 유효성 검사
for flag_name, data_list in flags_and_lists:
    # 데이터 프레임으로 구성된 리스트만 유지
    valid_data_list = [df for df in data_list if isinstance(df, pd.DataFrame) and "연관검색어" in df.columns]
    
    # 유효한 데이터 리스트만을 사용하여 키워드 플래그 업데이트
    utils.update_keywords_flag(collected_keywords_data, valid_data_list, flag_name)
 # process_data : 지정된 조건에 따라 데이터를 필터링하고, 추가 처리를 통해 최종 데이터프레임을 반환하는 함수.
def safe_process_data(process_function, data, category1, category2, selection):
    """
    process_function: 데이터 처리 함수 (예: utils.process_data)
    data: 처리할 데이터프레임
    category1, category2: 데이터 처리 함수에 전달될 카테고리 인자
    selection: 데이터 처리 함수에 전달될 선택 리스트 또는 기타 인자
    
    반환값: 처리된 데이터프레임 또는 빈 데이터프레임
    """
    if data is not None and not data.empty:
        try:
            return process_function(data, category1, category2, selection)
        except Exception as e:
            print(f"Error processing data: {e}")
            # 처리 중 오류가 발생한 경우 빈 데이터프레임 반환
            return pd.DataFrame()
    else:
        print("No data available.")
        return pd.DataFrame()

info_result_daily_select = safe_process_data(utils.process_data, collected_keywords_data, '일별 급상승', '일별 급상승', select_list[0])

info_result_weekly_select = utils.process_data(collected_keywords_data, '주별 급상승', '주별 급상승', select_list[1])
info_result_monthly_select = utils.process_data(collected_keywords_data, '월별 급상승', '월별 급상승', select_list[2]) 

info_result_weekly_continuous = utils.process_data(collected_keywords_data, '주별 지속상승', '주별 지속상승', rising_list[0])

info_result_monthly_continuous = utils.process_data(collected_keywords_data, '월별 지속상승', '월별 지속상승', rising_list[1])

info_result_monthly_pattern = utils.process_data(collected_keywords_data, '월별 규칙성', '월별 규칙성', month_rule_list)

info_result_final = pd.concat([info_result_daily_select,info_result_weekly_select, info_result_monthly_select,\
                               info_result_weekly_continuous, info_result_monthly_continuous,\
                                  info_result_monthly_pattern]).reset_index(drop=True)


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  graph_result['유형'].replace({'주간지속상승': '주별지속상승'}, inplace=True)


1


# 구글/ 네이버 한꺼번에

##### 뉴스링크,제목 수집 (네이버)

In [9]:
async def collect_google_keywords(target_keywords):
    today_date = datetime.now().strftime("%y%m%d")
    directory_path = f"./data/trend_data/{today_date}"
    file_path = os.path.join(directory_path, f"google_data_{today_date}.pkl")
    
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
    
    if os.path.exists(file_path):
        with open(file_path, 'rb') as file:
            rising_keywords_results = pickle.load(file)
    else:
        rising_keywords_results = await collect_rising_keywords(target_keywords)
        with open(file_path, 'wb') as file:
            pickle.dump(rising_keywords_results, file)
    
    return rising_keywords_results

async def collect_news_keywords(target_keywords):
    today_date = datetime.now().strftime("%y%m%d")
    directory_path = f"./data/trend_data/{today_date}"
    file_path = os.path.join(directory_path, f"news_data_{today_date}.pkl")
    
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
    
    if os.path.exists(file_path):
        with open(file_path, 'rb') as file:
            news_data = pickle.load(file)
    else:
        news_data = await main_news(target_keywords)
        with open(file_path, 'wb') as file:
            pickle.dump(news_data, file)
    
    return news_data

# 메인 비동기 실행 함수
async def main(target_keywords):
    google_keywords_results, news_keywords_results = await asyncio.gather(
        collect_google_keywords(target_keywords),
        collect_news_keywords(target_keywords)
    )
    
    return google_keywords_results, news_keywords_results



target_keywords = list(set(info_result_final['연관키워드']))
rising_keywords_results,news_data=asyncio.run(main(target_keywords))

In [10]:

################################
#활동성 분석
################################
import subprocess
today_date = datetime.now().strftime("%y%m%d")
directory_path = f"./data/target_keywords/{today_date}"
file_path = os.path.join(directory_path, "target_keywords.txt")

if not os.path.exists(directory_path):
    # 디렉토리가 존재하지 않는 경우, 디렉토리 생성
    os.makedirs(directory_path)

# 파일이 존재하는지 확인
if not os.path.exists(file_path):
    # 키워드를 파일에 작성
    with open(file_path, 'w') as file:
        for keyword in target_keywords:
            file.write("%s\n" % keyword)
    result = f"{file_path}에 키워드 저장됨"
else:
    result = f"{file_path} 파일이 이미 존재합니다. 작업을 건너뜁니다."
print(result)

# 파일이 존재하지 않는 경우, blog_data_collector.py 스크립트를 실행
file_path = f"./data/target_keywords/{today_date}/keyword_activity_rates.csv"
if not os.path.exists(file_path):
    command = f"python  models/naver/blog.py"
    process = subprocess.run(command, shell=True, check=True)
    result = f"{file_path}에 작업 결과가 저장될 것입니다."
else:
    result = f"{file_path} 파일이 이미 존재합니다. 작업을 건너뜁니다."

print(result)

./data/target_keywords/240625\target_keywords.txt에 키워드 저장됨
./data/target_keywords/240625/keyword_activity_rates.csv에 작업 결과가 저장될 것입니다.


# merge 

In [11]:
merged_keyword_activity_rates=formatting.merge_data(news_data,directory_path)
final_merged_df_copy=formatting.merge_result(collected_keywords_dat_copy,info_result_final,merged_keyword_activity_rates,rising_keywords_results)
info_data,combined_df=formatting.reults_formatted(info_result_final,final_merged_df_copy,graph_result)

# 결과 저장 

In [16]:
result_csv = utils.make_csv(info_data)

# 현재 날짜를 'yyMMdd' 형식으로 포맷팅
today = datetime.now(timezone('Asia/Seoul'))
formatted_today = today.strftime('%y%m%d')

# 저장할 경로
save_path = f'./data/result_out/{formatted_today}'





# 해당 경로가 존재하지 않으면 생성
if not os.path.exists(save_path):
    os.makedirs(save_path)

# CSV 파일 저장
result_csv.to_csv(f'{save_path}/info_{formatted_today}.csv', encoding='utf-8-sig', index=False, header=False)

# 구분자 ,
combined_df.to_csv(f'{save_path}/graph_{formatted_today}_in.csv', encoding='utf-8-sig', index=False, header=False)
info_data.to_csv(f'{save_path}/info_{formatted_today}_in.csv', encoding='utf-8-sig', index=False, header=False)

In [14]:
result_graph = utils.make_csv(combined_df)


# 현재 날짜를 'yyMMdd' 형식으로 포맷팅
today = datetime.now(timezone('Asia/Seoul'))
formatted_today = today.strftime('%y%m%d')

# 저장할 경로
save_path = f'./data/result_out/{formatted_today}'

# 해당 경로가 존재하지 않으면 생성
if not os.path.exists(save_path):
    os.makedirs(save_path)

# CSV 파일 저장
result_graph.to_csv(f'{save_path}/graph_{formatted_today}.csv', encoding='utf-8-sig', index=False, header=False)
