In [None]:
# !pip install transformers torch

## 통계 결과:
## 1. 원본 → Remove Short:
- 최소 감소폭: 4
- 최대 감소폭: 133
- 평균 감소폭: 66.1
- 최소 감소율: 1.94%
- 최대 감소율: 8.22%
- 평균 감소율: 4.32%
## 2. Remove Short → DBSCAN:
- 최소 감소폭: 0 (변화 없음)
- 최대 감소폭: 413
- 평균 감소폭: 219.35
- 최소 감소율: 0%
- 최대 감소율: 22.88%
- 평균 감소율: 14.07%
## 3. 원본 → DBSCAN:
- 최소 감소폭: 4
- 최대 감소폭: 482
- 평균 감소폭: 285.45
- 최소 감소율: 1.94%
- 최대 감소율: 26.83%
- 평균 감소율: 17.76%
- 요약:
  - Remove Short 단계에서 평균적으로 약 4.32%의 뉴스가 제거되었고, 최대 8.22%까지 감소했습니다.
  - DBSCAN 단계에서 더 많은 뉴스가 제거되며, 평균적으로 14.07%가 감소했고, 최대 22.88%까지 감소했습니다.
  - **전체 과정(원본 → DBSCAN)**에서는 평균적으로 17.76%의 뉴스가 제거되었으며, 최대 26.83%까지 감소했습니다.

In [72]:
import os
import json
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
import numpy as np
from tqdm import tqdm

# 모델 및 토크나이저 로드
from transformers import BertTokenizer, BertForSequenceClassification
import torch

class NewsPreprocess:
    
    def __init__(self, type) -> None:
        # 경로 설정
        self.type = type
        self.industry_news_directory = os.path.join(os.getcwd(), 'data', 'newstock_industry_news')
        self.stock_news_directory = os.path.join(os.getcwd(), 'data', 'newstock_stock_news')
        self.news = []
        self.limit_page = 200
        self.device = None
        self.batch_size = -1
    
    # 로컬에서 데이터를 가져오는 함수
    # date ex : 20230101
    def get_news(self, start_date, end_date, mode="local"):
        if mode == 'local':
            # 'industry' 또는 'news'에 따라 적절한 디렉토리 선택
            if self.type == 'industry':
                news_directory = self.industry_news_directory
            elif self.type == 'news':
                news_directory = self.stock_news_directory
            else:
                raise ValueError("Invalid type. Available options: 'industry', 'news'")
            
            # 경로에서 시작일과 종료일에 맞는 파일을 가져오는 로직
            news_files = []
            
            # 시작일과 종료일 범위에 맞는 파일만 검색
            for file_name in os.listdir(news_directory):
                file_date = file_name.split('.')[0]  # 파일명에서 날짜 추출
                if start_date <= file_date <= end_date:
                    file_path = os.path.join(news_directory, file_name)
                    news_files.append(file_path)
                    
                    # JSON 파일 로드
                    with open(file_path, 'r', encoding='utf-8') as json_file:
                        try:
                            data = json.load(json_file)
                            self.news.append(data)  # JSON 데이터를 리스트에 저장
                        except json.JSONDecodeError:
                            print(f"Error decoding JSON from {file_name}")
        
        elif mode == 's3':
            pass  # 추후 구현
        
        elif mode == 'volume':
            pass  # 추후 구현
        
        else:
            raise ValueError("Invalid mode. Available options: 'local', 's3', 'volume'")
    # 본문이 너무 짧은 데이터 처리!
    def remove_short(self):
        # 한 달 분량 뉴스 처리
        for idx, news_day in tqdm(enumerate(self.news), total=len(self.news), desc="Processing News for Remove Short"):
            # 하루 단위 뉴스 처리
            filteded_news = []
            for news in news_day['data']:
                # 글자 수가 특정 글자수보다 크거나 같으면
                if len(news['article']) >= self.limit_page:
                    filteded_news.append(news)
            
            # print(f"삭제 전 : {len(news_day['data'])}, 삭제 후 : {len(filteded_news)}")
            self.news[idx] = filteded_news
            
    
    # title을 기준으로 처리
    def remove_with_dbscan(self):
        # 한 달 분량 뉴스 처리
        for idx, news_day in tqdm(enumerate(self.news), total=len(self.news), desc="Processing News for DBSCAN"):
            # 뉴스에서 우선 타이틀만을 뽑자.
            titles = [news['title'] for news in news_day]  # 타이틀만 추출

            # 타이틀을 기준으로 진행
            tfidf_matrix = self.get_tfidf_vector(titles)

            # DBSCAN 클러스터링 실행
            dbscan = DBSCAN(eps=0.5, metric='cosine')
            clusters = dbscan.fit_predict(tfidf_matrix)

            # 클러스터 번호를 저장할 SET
            cluster_set = set()
            filteded_news = []

            # 클러스터 결과 처리
            for news, cluster in zip(news_day, clusters):
                if cluster == -1:
                    # 클러스터가 -1 (노이즈)이면 바로 추가
                    filteded_news.append(news)
                else:
                    # 해당 클러스터가 이미 SET에 있는지 확인
                    if cluster not in cluster_set:
                        # SET에 없으면 추가하고 해당 뉴스도 필터 리스트에 추가
                        cluster_set.add(cluster)
                        filteded_news.append(news)

            # 일 단위로 self.news 갱신
            # print(f"삭제 전 : {len(news_day)}, 삭제 후 : {len(filteded_news)}")
            self.news[idx] = filteded_news



    def get_tfidf_vector(self, titles: list):
        vectorizer = TfidfVectorizer(stop_words='english')
        tfidf_matrix = vectorizer.fit_transform(titles)

        return tfidf_matrix

    def set_model_config(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tokenizer = BertTokenizer.from_pretrained('snunlp/KR-FinBert-SC')
        self.model = BertForSequenceClassification.from_pretrained('snunlp/KR-FinBert-SC')
        self.model.to(self.device)
        self.batch_size = 64

    # 감정 분석 처리
    def sentimental_analysis(self):
        for idx, news_day in tqdm(enumerate(self.news), total=len(self.news), desc="Processing Sentimental Analysis"):
            news_with_prediction = []
            # 뉴스에서 타이틀만을 뽑자.
            titles = [news['title'] for news in news_day]  # 타이틀만 추출
            predictions = self.analyze_sentiment_batch(titles)  # 타이틀을 가지고 예측

            for prediction, news in zip(predictions, news_day):
                # 예측 결과를 뉴스에 추가
                news['sentiment'] = prediction
                news_with_prediction.append(news)

            self.news[idx] = news_with_prediction

    def analyze_sentiment_batch(self, titles):
        predictions = []
        # confidences = []

        # 텍스트를 배치로 나누어 처리
        for i in tqdm(range(0, len(titles), self.batch_size)):
            batch_texts = titles[i:i + self.batch_size]
            
            # 배치 텍스트를 토큰화
            inputs = self.tokenizer(batch_texts, padding=True, truncation=True, max_length=512, return_tensors='pt').to(self.device)
            
            with torch.no_grad():
                outputs = self.model(**inputs)
            
            logits = outputs.logits
            probabilities = torch.softmax(logits, dim=1)
            
            batch_predictions = torch.argmax(probabilities, dim=1).tolist()
            # batch_confidences = probabilities.max(dim=1).values.tolist()
            
            predictions.extend(batch_predictions)
            # confidences.extend(batch_confidences)
        
        return predictions



In [93]:
# 사용 예시
preprocess = NewsPreprocess(type="industry")

# Industry 뉴스 가져오기
preprocess.get_news("20240801", "20240831", mode="local")

# print(len(preprocess.news[0]))
# industry 뉴스 중에서 필요 없는 뉴스 처리
preprocess.remove_short()
print("걸러내기 완료")

# 분류처리!
preprocess.remove_with_dbscan()
print("분류 완료")
# 모델 설정
preprocess.set_model_config()
print("모델 로드 완료")
preprocess.sentimental_analysis()
print("감정분석")



# Stock 뉴스 가져오기
# stock_news = preprocess.get_news("20240701", "20240731", type="news", mode="local")

Processing News for Remove Short: 100%|██████████| 31/31 [00:00<00:00, 6199.86it/s]


걸러내기 완료


Processing News for DBSCAN: 100%|██████████| 31/31 [00:00<00:00, 43.91it/s]


분류 완료
모델 로드 완료


100%|██████████| 25/25 [00:19<00:00,  1.27it/s] | 0/31 [00:00<?, ?it/s]
100%|██████████| 16/16 [00:12<00:00,  1.27it/s] | 1/31 [00:19<09:49, 19.64s/it]
100%|██████████| 3/3 [00:02<00:00,  1.25it/s]   | 2/31 [00:32<07:29, 15.50s/it]
100%|██████████| 8/8 [00:05<00:00,  1.37it/s]   | 3/31 [00:34<04:26,  9.51s/it]
100%|██████████| 23/23 [00:19<00:00,  1.18it/s] | 4/31 [00:40<03:37,  8.07s/it]
100%|██████████| 23/23 [00:17<00:00,  1.31it/s] | 5/31 [00:59<05:16, 12.18s/it]
100%|██████████| 23/23 [00:17<00:00,  1.28it/s] | 6/31 [01:17<05:50, 14.03s/it]
100%|██████████| 26/26 [00:20<00:00,  1.29it/s] | 7/31 [01:35<06:07, 15.32s/it]
100%|██████████| 19/19 [00:15<00:00,  1.26it/s] | 8/31 [01:55<06:27, 16.85s/it]
100%|██████████| 3/3 [00:02<00:00,  1.41it/s]   | 9/31 [02:10<05:58, 16.31s/it]
100%|██████████| 8/8 [00:06<00:00,  1.31it/s]   | 10/31 [02:12<04:10, 11.93s/it]
100%|██████████| 21/21 [00:17<00:00,  1.23it/s] | 11/31 [02:19<03:22, 10.14s/it]
100%|██████████| 26/26 [00:22<00:00,  1.16it/s

감정분석





In [79]:
print(len(preprocess.news[0][0]))

10


# 2. Hbase 진행

In [94]:
tmp_news_data = preprocess.news # 하루치만
len(tmp_news_data)

31

In [95]:
tmp_news_data[2][11]

{'title': '상반기 경상수지 성적표 공개...“전망치 초과 달성 확실시”[주간 금융 돋보기]',
 'subtitle': '한국은행, 오는 7일 ‘6월 국제수지’ 발표\n5월까지 254억7000만달러 흑자 기록\n6월에도 호조 지속 전망..."전망치 넘어선다"',
 'media': '파이낸셜뉴스',
 'description': '3일 한은에 따르면 상반기 경상수지 흑자폭이 담긴 ‘2024년 6월 국제수지’ 잠정치가 오는 7일 공개된다. 지난 5월 경상수지는 89억2000만달러 흑자를 나타내며 지난',
 'thumbnail': 'https://img2.daumcdn.net/thumb/R658x0.q70/?fname=https://t1.daumcdn.net/news/202408/03/fnnewsi/20240803060031930hcbc.jpg',
 'uploadDatetime': '2024-08-03 06:00:00',
 'article': '<ImageTag>https://img2.daumcdn.net/thumb/R658x0.q70/?fname=https://t1.daumcdn.net/news/202408/03/fnnewsi/20240803060031930hcbc.jpg</ImageTag>\n상반기 경상수지 성적표 공개...“전망치 초과 달성\n\n3일 한은에 따르면 상반기 경상수지 흑자폭이 담긴 ‘2024년 6월 국제수지’ 잠정치가 오는 7일 공개된다. 지난 5월 경상수지는 89억2000만달러 흑자를 나타내며 지난 2021년 9월(95억1000만달러) 이후 32개월 만에 최대 흑자를 나타냈다.\n\n특히 경상수지에서 가장 큰 비중을 차지하는 상품수지가 87억5000만달러 흑자를 기록하며 2021년 9월(95억4000만달러) 이후 최대 흑자를 기록했다. 14개월 연속 흑자폭을 기록한 상품수지는 흑자폭이 전월(51억1000만달러)과 전년 동월(18억8000만달러)과 비교할 때 모두 늘었다.\n\n본원소득수지도 배당소득을 중심으로 17억6000만달러 흑자를 나

In [96]:
len(preprocess.news[0])

1557

In [10]:
# news_id_hash = hashlib.md5("20240701193118874".encode()).hexdigest()
# news_id_hash

## 시황 테이블

In [102]:
# 시황 테이블 고고고여기다 시발

import requests
import json
import base64
import time
import hashlib
from datetime import datetime, timezone
import pytz

class HBaseConnection:
    
    type_mapping = {
        "finance": "T00001",
        "industry": "T00002",
        "employ": "T00003",
        "autos": "T00004",
        "stock": "T00005",
        "estate": "T00006",
        "consumer": "T00007",
        "worldeconomy": "T00008",
        "coin": "T00009",
        "pension": "T00010",
        "policy": "T00011",
        "startup": "T00012"
    }

    def __init__(self, table_name: str, key_table_name: str):
        # self.rest_api_url = 'http://localhost:9090'
        self.rest_api_url = 'http://j11c207a.p.ssafy.io:9090'

        self.table_name = table_name
        self.key_table_name = key_table_name
    
    def table_exists(self, table_name: str) -> bool:
        """Check if the table exists in HBase."""
        url = f"{self.rest_api_url}/{table_name}/schema"
        response = requests.get(url)
        
        if response.status_code == 200:
            print(f"Table '{table_name}' exists.")
            return True
        elif response.status_code == 404:
            print(f"Table '{table_name}' does not exist.")
            return False
        else:
            print(f"Error occurred: {response.status_code} - {response.text}")
            return False
    
    def get_table_schema(self, table_name):
        """Get the schema of the specified table."""
        url = f"{self.rest_api_url}/{table_name}/schema"
        headers = {'Accept': 'application/json', 
                   'Content-Type': 'application/json'}
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            schema = response.json()  # JSON으로 변환
            print(f"Schema for table '{table_name}': {schema}")
            return schema
        else:
            print(f"Error occurred while fetching schema: {response.status_code} - {response.text}")
            return None
    
    def create_table(self, table_name) -> bool:
        """Create a new table with the provided schema."""
        url = f"{self.rest_api_url}/{table_name}/schema"
        headers = {
            "Accept": "text/xml",
            "Content-Type": "text/xml"
        }

        table_schema = f'''<?xml version="1.0" encoding="UTF-8"?>
        <TableSchema name="{table_name}">
            <ColumnSchema name="cf" KEEP_DELETED_CELLS="true" />
        </TableSchema>'''
        response = requests.post(url, headers=headers, data=table_schema)

        if response.status_code == 201 or response.status_code == 200:
            print(f"Table '{table_name}' created successfully.")
            return True
        elif response.status_code == 400:
            print("Error: Bad Request -", response.text)
            return False
        else:
            print(f"Error: {response.status_code} -", response.text)
            return False

    def get_row_key(self, article_id):
        """Generate a row key based on user ID (예시로 단순한 변환 사용)."""
        return f"{article_id}"  # 필요한 경우 적절한 변환 로직으로 수정

    def convert_to_utc_timestamp(self, datetime_str: str) -> int:
        """
        주어진 datetime 문자열을 UTC timestamp로 변환.
        :param datetime_str: '%Y-%m-%d %H:%M:%S' 형식의 문자열
        :return: UTC timestamp
        """
        naive_datetime = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
        return int(naive_datetime.timestamp())
    
    def hash_news_id(self, news_id: str) -> str:
        """Generate a 16-byte hash from the news ID."""
        return hashlib.md5(news_id.encode()).hexdigest()
    
    def get_industry_news_table_key(self, news_type, datetime_str, news_id):
        # type 매핑
        mapped_type = self.type_mapping.get(news_type, "UnknownType")

        # UTC timestamp 변환 메소드 호출
        new_timestamp = self.convert_to_utc_timestamp(datetime_str)

        # newsId를 16바이트 크기의 해시로 변환
        news_id_hash = hashlib.md5(news_id.encode()).hexdigest()

        # salt 계산 (news_id_hash의 mod 10)
        salt = str(int(news_id_hash, 16) % 10).zfill(2)
        
        return f"{salt}{mapped_type}{new_timestamp}{news_id_hash}"
    
    def get_industry_news_key_table_key(self, news_type, news_id):        
        # type 매핑
        mapped_type = self.type_mapping.get(news_type, "UnknownType")
        # newsId를 16바이트 크기의 해시로 변환
        news_id_hash = hashlib.md5(news_id.encode()).hexdigest()

        return f"{news_id_hash}{mapped_type}"
    
    def encode_base64(self, value: str) -> str:
        """Base64 인코딩을 수행하는 메서드."""
        return base64.b64encode(value.encode()).decode()

    def add_cf_prefix(self, column: str) -> str:
        """열 이름에 'cf:' 접두사를 추가하는 메서드."""
        return f"cf:{column}"


    def insert_key_data(self, schema, data):
        """
        Random Access를 위해 별도의 키 저장
        :param schema: 삽입할 데이터의 스키마 (dict 형식, keys가 HBase의 columns)
        :param data: 삽입할 데이터 (dict 형식)
        """
        # HBase에 데이터 삽입
        row_key = self.get_industry_news_key_table_key(
                                                    datetime_str=data['uploadDatetime'],
                                                    news_id = data['newsId'])
        encoded_row_key = self.encode_base64(row_key)  # Base64 인코딩된 행 키
        
        # HBase에 데이터 삽입을 위한 URL
        url = f"{self.rest_api_url}/{self.key_table_name}/{row_key}"  # 인코딩된 키 사용 X
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }

        # 삽입할 데이터 준비 (Base64 인코딩 및 timestamp 포함)
        cells = []
        for key in schema:  # schema를 순회하며 Cell 객체 생성
            column = self.encode_base64(self.add_cf_prefix(key))  # "cf:" 접두사를 붙여서 Base64 인코딩
            value = self.encode_base64(str(data[key]))  # Base64 인코딩된 데이터

            cells.append({
                'column': column,
                '$': value,
            })

        # 최종 데이터 구조 생성
        payload = {
            'Row': [
                {
                    'key': encoded_row_key,  # Base64 인코딩된 행 키
                    'Cell': cells
                }
            ]
        }


        # PUT 요청 전송
        response = requests.put(url, headers=headers, data=json.dumps(payload))


        # 응답 처리
        if response.status_code == 200 or response.status_code == 201:  # 201 Created
            pass
            # print(f"Data inserted successfully for news ID: {data['newsId']}")
        else:
            print(f"Error inserting data for news ID {data['newsId']}: {response.status_code}, {response.text}")
        return payload


    def insert_data(self, schema, data, table_name):
        """
        HBase에 데이터를 삽입하는 메소드
        :param schema: 삽입할 데이터의 스키마 (dict 형식, keys가 HBase의 columns)
        :param data: 삽입할 데이터 (dict 형식)
        """
        # HBase에 데이터 삽입
        if table_name == self.table_name:
            row_key = self.get_industry_news_table_key(news_type=data['industry'],
                                                    datetime_str=data['uploadDatetime'],
                                                    news_id = data['newsId'])  # newsId를 사용하여 행 키 생성
        else:
            row_key = self.get_industry_news_key_table_key(news_type=data['industry'],
                                                             news_id = data['newsId'])  # newsId를 사용하여 행 키 생성
        encoded_row_key = self.encode_base64(row_key)  # Base64 인코딩된 행 키

        # HBase에 데이터 삽입을 위한 URL
        url = f"{self.rest_api_url}/{table_name}/{row_key}"  # 인코딩된 키 사용 X
        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json'
        }

        # 삽입할 데이터 준비 (Base64 인코딩 및 timestamp 포함)
        cells = []
        for key in schema:  # schema를 순회하며 Cell 객체 생성
            column = self.encode_base64(self.add_cf_prefix(key))  # "cf:" 접두사를 붙여서 Base64 인코딩
            value = self.encode_base64(str(data[key]))  # Base64 인코딩된 데이터

            cells.append({
                'column': column,
                '$': value,
            })

        # 최종 데이터 구조 생성
        payload = {
            'Row': [
                {
                    'key': encoded_row_key,  # Base64 인코딩된 행 키
                    'Cell': cells
                }
            ]
        }


        # PUT 요청 전송
        response = requests.put(url, headers=headers, data=json.dumps(payload))


        # 응답 처리
        if response.status_code == 200 or response.status_code == 201:  # 201 Created
            pass
            # print(f"Data inserted successfully for news ID: {data['newsId']}")
        else:
            print(f"Error inserting data for news ID {data['newsId']}: {response.status_code}, {response.text}")
        return payload


In [103]:
hbase_connection = HBaseConnection(table_name='industry_news', key_table_name='industry_news_keys')

print(hbase_connection.rest_api_url)
table_exists = hbase_connection.table_exists(hbase_connection.table_name)

# 메인 테이블 존재 확인
if table_exists:
    hbase_connection.get_table_schema(hbase_connection.table_name)
else:
    hbase_connection.create_table(hbase_connection.table_name)

# 키 테이블 존재 확인
key_table_exists = hbase_connection.table_exists(hbase_connection.key_table_name)
if key_table_exists:
    hbase_connection.get_table_schema(hbase_connection.key_table_name)
else:
    hbase_connection.create_table(hbase_connection.key_table_name)


if len(tmp_news_data) > 0:
    table_schema = tmp_news_data[0][0].keys()
    key_table_schema = ['uploadDatetime']

    # 데이터
    for day_data in tqdm(tmp_news_data):
        for data in day_data:
            hbase_connection.insert_data(table_schema, data, hbase_connection.table_name)
            hbase_connection.insert_data(key_table_schema, data, hbase_connection.key_table_name)


http://j11c207a.p.ssafy.io:9090
Table 'industry_news' exists.
Schema for table 'industry_news': {'name': 'industry_news', 'ColumnSchema': [{'name': 'cf', 'BLOOMFILTER': 'ROW', 'VERSIONS': '1', 'IN_MEMORY': 'false', 'KEEP_DELETED_CELLS': 'true', 'DATA_BLOCK_ENCODING': 'NONE', 'TTL': '2147483647', 'COMPRESSION': 'NONE', 'MIN_VERSIONS': '0', 'BLOCKCACHE': 'true', 'BLOCKSIZE': '65536', 'REPLICATION_SCOPE': '0'}], 'IS_META': 'false'}
Table 'industry_news_keys' exists.
Schema for table 'industry_news_keys': {'name': 'industry_news_keys', 'ColumnSchema': [{'name': 'cf', 'BLOOMFILTER': 'ROW', 'VERSIONS': '1', 'IN_MEMORY': 'false', 'KEEP_DELETED_CELLS': 'true', 'DATA_BLOCK_ENCODING': 'NONE', 'TTL': '2147483647', 'COMPRESSION': 'NONE', 'MIN_VERSIONS': '0', 'BLOCKCACHE': 'true', 'BLOCKSIZE': '65536', 'REPLICATION_SCOPE': '0'}], 'IS_META': 'false'}


100%|██████████| 31/31 [27:53<00:00, 53.97s/it]


## Random Access 접근하기

In [305]:
import hashlib

type_mapping = {
    "finance": "T00001",
    "industry": "T00002",
    "employ": "T00003",
    "autos": "T00004",
    "stock": "T00005",
    "estate": "T00006",
    "consumer": "T00007",
    "worldeconomy": "T00008",
    "coin": "T00009",
    "pension": "T00010",
    "policy": "T00011",
    "startup": "T00012"
}

def convert_to_utc_timestamp(datetime_str: str) -> int:
    """
    Convert the given datetime string to a UTC timestamp.
    :param datetime_str: String in the format '%Y-%m-%d %H:%M:%S'
    :return: UTC timestamp
    """
    naive_datetime = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
    return int(naive_datetime.timestamp())

def hash_news_id(news_id: str) -> str:
    """Generate a 16-byte hash from the news ID."""
    return hashlib.md5(news_id.encode()).hexdigest()

def get_salt(news_id_hash):
    return str(int(news_id_hash, 16) % 10).zfill(2)

def get_mapped_type(news_type):
    """Generate the industry news table key."""
    # Type mapping
    return type_mapping.get(news_type, "UnknownType")

def get_timestamp_by_key(row_key: str):
    """Get a row from HBase by its key."""
    hbase_url = 'http://j11c207a.p.ssafy.io:9090'
    url = f"{hbase_url}/industry_news_keys/{row_key}"
    headers = {
        'Accept': 'application/json'
    }
    
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        # Extracting the base64 encoded value
        base64_value = data['Row'][0]['Cell'][0]['$']
        # Decoding the base64 value => 여기서 우선 datetime str나옴('2024-07-01 15:53:00')
        datetime_str = decode_base64(base64_value)
        timestamps = convert_to_utc_timestamp(datetime_str)
        return timestamps
    else:
        print(f"Error fetching data for row key {row_key}: {response.status_code}, {response.text}")
        return None

def decode_base64(encoded_value: str) -> str:
    """Decode a base64 encoded string."""
    return base64.b64decode(encoded_value).decode('utf-8')

def get_news_by_key(news_type: str, timestamps: str, news_id_hash: str, column_family: str):
    """Get a row from the industry_news table by constructing the row key."""
    salt = get_salt(news_id_hash)
    
    # Constructing the row key
    row_key = f"{salt}{news_type}{timestamps}{news_id_hash}"

    # HBase REST API endpoint
    hbase_url = 'http://j11c207a.p.ssafy.io:9090'
    url = f"{hbase_url}/industry_news/{row_key}"
    headers = {
        'Accept': 'application/json'
    }
    
    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        data = response.json()
        result = {}
        
        if 'Row' in data and len(data['Row']) > 0:
            row_data = data['Row'][0]
            
            for cell in row_data['Cell']:
                column = cell['column']  # Get the column
                decoded_column = base64.b64decode(column).decode('utf-8')  # Decode the column
                value = cell['$']  # Get the value
                decoded_value = base64.b64decode(value).decode('utf-8')  # Decode the value
                
                # Remove 'cf:' from the column name for the result dictionary
                clean_column_name = decoded_column.replace(f"{column_family}:", "")
                result[clean_column_name] = decoded_value

        return result


In [306]:
# Random Access

# Client한테 들어왔다 가정하기
news_id = "20240701112610199"
news_type = 'estate'

# 1. 우선 클라이언트로부터 받은 정보를 가지고 timestamp 추출
news_id_hash = hash_news_id(news_id)
mapped_type = get_mapped_type(news_type)


row_key = news_id_hash + mapped_type

# 2. industry_news_keys 테이블 찌름
timestamps = get_timestamp_by_key(row_key)

# 2. industry_news찌름
column_family = 'cf'
article = get_news_by_key(mapped_type, timestamps, news_id_hash, column_family)
article

{'article': '<ImageTag>https://img4.daumcdn.net/thumb/R658x0.q70/?fname=https://t1.daumcdn.net/news/202407/01/ned/20240701112611630psrx.jpg</ImageTag>\n상도푸르지오클라베뉴 투시도 [대우건설 제공]\n\n분양가가 오르면서 골칫거리로 취급되던 미분양도 순차적으로 계약이 완료되고 있다. 특히 수도권은 고분양가 논란이 일며 오랜 기간 판매가 지지부진했던 단지들도 ‘완판’에 한 걸음 다가서는 모양새다.\n\n1일 분양업계에 따르면 경기 옥정신도시 양주옥정LH엘리프는 지난 5월부터 선착순 계약을 실시했다. 이 단지는 지난해 7월 입주를 시작했지만 선착순 계약을 시작할 당시 전체 70% 상당이 미분양 상태였다. 이에 한국토지주택공사(LH)는 발코니 확장을 옵션을 무상으로 지원하고 잔금 일부에 대해 무이자 금융 혜택을 제공하며 계약률을 높여 왔다. 그러다 최근 상황이 급변했다. 계약 상담이 늘어나면서 총 1409가구 가운데 900가구에 달하는 전용 59㎡가 모두 계약이 완료된 것이다. 이 평형 분양가는 2억7130만~2억9497만원대다. 전용 51㎡ 또한 70여가구만 남았다. LH 관계자는 “이달 계약 문의가 급격하게 늘었고, 상담 예약도 꾸준히 잡히고 있다”면서 “현재 계약 진행 중으로 잔여물량은 향후 더 줄어들 예정”이라고 설명했다.\n\n고분양가 논란이 일며 임의공급 7차까지 간 서울시 동작구 상도동 상도푸르지오클라베뉴도 ‘완판’을 앞두고 있다. 업계에 따르면 시행사 물량까지 총 30여가구만 남은 상황으로 전해진다. 단지는 지난해 9월 700가구 규모로 분양을 시작했고 10월부터 선착순 분양에 돌입했다. 후분양이라 자금 계획이 빠듯하고, 전용 84㎡가 12억원을 웃돌면서 부정적인 시각도 나왔지만, 결국 전 물량 계약에 가까워지고 있는 것이다.\n\n임의공급 12차까지 간 구로구 가리봉동 남구로역 동일 센타시아도 미분양 물량이 한 자릿수(6가구) 대로 

In [88]:
news_id_hash

'56213c8806308805ae0e9013a7e58510'

##  Timestamp

In [71]:
from datetime import datetime, timezone

def convert_to_utc_timestamps(start, end):
    # 문자열을 UTC datetime 객체로 변환
    start_dt = datetime.strptime(start, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
    end_dt = datetime.strptime(end, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
    
    # Unix timestamp로 변환
    start_timestamp = int(start_dt.timestamp())
    end_timestamp = int(end_dt.timestamp())
    
    return start_timestamp, end_timestamp

# 사용 예시
start = '2024-07-01 02:31:00'  # UTC 시간
end = '2024-07-01 05:41:00'    # UTC 시간
timestamps = convert_to_utc_timestamps(start, end)

print(timestamps)


(1719801060, 1719812460)


In [302]:
import requests
import base64
from tqdm import tqdm

# HBase REST API 설정
# base_url = 'http://localhost:9090'
base_url = 'http://j11c207a.p.ssafy.io:9090'
table_name = 'industry_news'

# SALT 범위 및 시간 설정
max_salt = 10
start_row_prefix = 'T00001'
stop_row_prefix = 'T00001'
start_time = '1719801060'
stop_time = '1722442203' # 24/7/31

# 결과를 저장할 리스트
results = []

# 스캔 요청을 위한 함수
def scan_hbase(salt):
    start_row = f"{salt:02d}{start_row_prefix}{start_time}"
    stop_row = f"{salt:02d}{stop_row_prefix}{stop_time}"
    
    # HBase REST API URL
    scan_url = f"{base_url}/{table_name}/scanner"
    
    # 요청 본문 생성 시 base64로 인코딩
    scan_body = {
        "startRow": base64.b64encode(start_row.encode()).decode(),
        "endRow": base64.b64encode(stop_row.encode()).decode(),
    }
    
    # 스캔 요청
    headers = {
        "Accept": "application/json",  # JSON 응답을 요청
        "Content-Type": "application/json"  # JSON 데이터 형식으로 전송
    }
    response = requests.post(scan_url, json=scan_body)
    
    if response.status_code == 201:
        # JSON 응답에서 Location 헤더를 통해 scanner_id 추출
        location = response.headers['Location']
        results.append(location)
        print(f"SALT {salt}: Data retrieved.")
    else:
        print(f"Error scanning SALT {salt}: {response.status_code}, {response.text}")

# 스캔 요청 수행
for salt in tqdm(range(max_salt + 1)):
    scan_hbase(salt)




 45%|████▌     | 5/11 [00:00<00:00, 40.86it/s]

SALT 0: Data retrieved.
SALT 1: Data retrieved.
SALT 2: Data retrieved.
SALT 3: Data retrieved.
SALT 4: Data retrieved.
SALT 5: Data retrieved.
SALT 6: Data retrieved.
SALT 7: Data retrieved.


100%|██████████| 11/11 [00:00<00:00, 41.11it/s]

SALT 8: Data retrieved.
SALT 9: Data retrieved.
SALT 10: Data retrieved.





In [298]:
# 각 scanner_id에 대해 GET 요청하여 결과 저장
final_results = []

# JSON 응답을 요청하는 헤더
headers = {
    "Accept": "application/json",  # JSON 응답을 요청
}

for scanner_id in results:
    while True:
        # GET 요청을 통해 데이터 가져오기
        get_url = scanner_id
        response = requests.get(get_url, headers=headers)
        
        if response.status_code == 200:
            # 성공적으로 데이터를 가져온 경우
            data = response.json()  # 응답을 JSON으로 파싱
            final_results.append(data)  # 결과를 저장
            # print(f"Data retrieved from {scanner_id}: {data}")
        elif response.status_code == 204:
            # 스캐너가 모두 소진된 경우
            print(f"No more data for {scanner_id}. Deleting scanner...")
            # DELETE 요청으로 자원 해제
            delete_response = requests.delete(scanner_id)
            if delete_response.status_code == 200:
                print(f"Scanner {scanner_id} deleted successfully.")
            else:
                print(f"Error deleting scanner {scanner_id}: {delete_response.status_code}, {delete_response.text}")
            break
        else:
            # 오류 발생 시 응답 출력
            print(f"Error retrieving data from {scanner_id}: {response.status_code}, {response.text}")
            break

# 최종 결과 출력
print("Final results:")
# print(final_results)


No more data for http://j11c207a.p.ssafy.io:9090/industry_news/scanner/172728448510561d7a7a. Deleting scanner...
Scanner http://j11c207a.p.ssafy.io:9090/industry_news/scanner/172728448510561d7a7a deleted successfully.
No more data for http://j11c207a.p.ssafy.io:9090/industry_news/scanner/172728448534776801cf7. Deleting scanner...
Scanner http://j11c207a.p.ssafy.io:9090/industry_news/scanner/172728448534776801cf7 deleted successfully.
No more data for http://j11c207a.p.ssafy.io:9090/industry_news/scanner/17272844853702fadb62d. Deleting scanner...
Scanner http://j11c207a.p.ssafy.io:9090/industry_news/scanner/17272844853702fadb62d deleted successfully.
No more data for http://j11c207a.p.ssafy.io:9090/industry_news/scanner/17272844853925e8757df. Deleting scanner...
Scanner http://j11c207a.p.ssafy.io:9090/industry_news/scanner/17272844853925e8757df deleted successfully.
No more data for http://j11c207a.p.ssafy.io:9090/industry_news/scanner/17272844854153e673493. Deleting scanner...
Scanner 

In [299]:
print(len(results))

11


In [300]:
print(len(final_results))

10


In [301]:
for result in final_results:
    print(len(result['Row']))

356
359
381
374
348
333
356
369
359
370


In [272]:
final_results[0]['Row']

[{'key': 'MDJUMDAwMDExNzE5ODExODAwZTNkNTgwNTQ0ODcxOWNlMzFiNGRlMmNlMjIyYWRiYjA=',
  'Cell': [{'column': 'Y2Y6YXJ0aWNsZQ==',
    'timestamp': 1727274257686,
    '$': 'PEltYWdlVGFnPmh0dHBzOi8vaW1nNC5kYXVtY2RuLm5ldC90aHVtYi9SNjU4eDAucTcwLz9mbmFtZT1odHRwczovL3QxLmRhdW1jZG4ubmV0L25ld3MvMjAyNDA3LzAxL3Nlb3VsZWNvbm9teS8yMDI0MDcwMTA1MzAyNzY5OGdwcW8uanBnPC9JbWFnZVRhZz4KCu2VmOuCmOq4iOycteq3uOujueydgCDsp4DrgpztlbQgRVNHIO2ZnOuPmSDrsI8g7ISx6rO866W8IOuLtOydgCAnMjAyMyDsp4Dsho3qsIDriqXqsr3smIHrs7Tqs6DshJwn66W8IOuwnOqwhO2WiOuLpOqzoCAzMOydvCDrsJ3tmJTri6QuCgrtlZjrgpjquIjsnLXsnYAg7Jis7ZW066GcIOyXtOydvOqzsSDrsojsp7gg67Cc6rCE7J2EIOunnuydgCDsnbTrsogg67O06rOg7ISc7JeQICfrqqjrkZDsnZgg6riw7IGoLCDqt7gg7ZWY64KY66W8IOychO2VmOyXrCfrnbzripQg6rK97JiB7JuQ7LmZ7J2EIOq4sOuwmOycvOuhnCDsgqztmozqs7Xrj5nssrTsmYDsnZgg64+Z67CY7ISx7J6l7J2EIOy2lOq1rO2VmOuKlCDtlZjrgpjquIjsnLXqt7jro7kg6rOE7Je07IKs7J2YIOuLpOyWke2VnCBFU0cg6rK97JiB7ISx6rO87JmAIOuFuOugpeydhCDri7TslZjri6QuCgrtirntnogg7KeA7Jet7IKs7ZqMIEVTRyDsnoTtjKntirgg7LC97Lac7J2EIOychO2

In [182]:
resp = requests.get(results[5])
resp.content

b'<ImageTag>https://img1.daumcdn.net/thumb/R658x0.q70/?fname=https://t1.daumcdn.net/news/202407/01/chosunbiz/20240701060153208hcam.jpg</ImageTag>\n\xec\xa1\xb0\xec\x84\xa0DB\n\n\xec\x97\xac\xec\x95\xbc \xec\x9d\x98\xec\x9b\x90\xeb\x93\xa4\xec\x9d\xb4 \xec\x98\x88\xea\xb8\x88\xec\x9e\x90 \xeb\xb3\xb4\xed\x98\xb8 \xed\x95\x9c\xeb\x8f\x84\xeb\xa5\xbc 1\xec\x96\xb5\xec\x9b\x90 \xec\x9d\xb4\xec\x83\x81\xec\x9c\xbc\xeb\xa1\x9c \xec\x98\xac\xeb\xa6\xac\xeb\x8a\x94 \xeb\x82\xb4\xec\x9a\xa9\xec\x9d\x98 \xec\x98\x88\xea\xb8\x88\xec\x9e\x90\xeb\xb3\xb4\xed\x98\xb8\xeb\xb2\x95 \xea\xb0\x9c\xec\xa0\x95\xec\x95\x88\xec\x9d\x84 \xec\x9e\x87\xeb\x8b\xac\xec\x95\x84 \xeb\xb0\x9c\xec\x9d\x98\xed\x95\x98\xea\xb3\xa0 \xec\x9e\x88\xeb\x8b\xa4. \xea\xb5\xad\xeb\x82\xb4 \xea\xb8\x88\xec\x9c\xb5\xec\x86\x8c\xeb\xb9\x84\xec\x9e\x90\xeb\x8a\x94 \xec\x98\x88\xea\xb8\x88\xec\x9d\x84 \xeb\xa7\xa1\xea\xb8\xb4 \xea\xb8\x88\xec\x9c\xb5\xed\x9a\x8c\xec\x82\xac\xea\xb0\x80 \xec\x98\x81\xec\x97\x85\xec\x9d\x84 \xec\xa4\

In [192]:
resp = requests.get(results[0])
resp.content

b'finance'

In [193]:
for resp in results:
    a = requests.delete(resp)
    print(a.status_code)

200
200
200
200
200
200
200
200
200
200
200
