In [None]:
symbols=["AAA","AAM","ABS","ABT","ACB","ACC","ACL","ADG","ADP","ADS","AGG","AGR","ANV","APG","APH","ASM","ASP","AST","BAF","BCE","BCM","BFC","BIC","BID","BKG","BMC","BMI","BMP","BRC","BSI","BTP","BVH","BWE","C32","CCL","CDC","CII","CLC","CLL","CMG","CMX","CNG","CRC","CRE","CSM","CSV","CTD","CTF","CTG","CTI","CTR","CTS","D2D","DAH","DBC","DBD","DBT","DC4","DCL","DCM","DGC","DGW","DHA","DHC","DHM","DIG","DMC","DPG","DPM","DPR","DRC","DRL","DSC","DSE","DSN","DTA","DVP","DXG","DXS","EIB","ELC","EVE","EVF","FCM","FCN","FIR","FIT","FMC","FPT","FRT","FTS","GAS","GDT","GEE","GEX","GIL","GMD","GSP","GVR","HAG","HAH","HAP","HAR","HAX","HCD","HCM","HDB","HDC","HDG","HHP","HHS","HHV","HID","HII","HMC","HPG","HPX","HQC","HSG","HSL","HT1","HTG","HTI","HTN","HUB","HVH","ICT","IDI","IJC","ILB","IMP","ITC","ITD","JVC","KBC","KDC","KDH","KHG","KHP","KMR","KOS","KSB","LAF","LBM","LCG","LHG","LIX","LPB","LSS","MBB","MCM","MCP","MHC","MIG","MSB","MSH","MSN","MWG","NAB","NAF","NBB","NCT","NHA","NHH","NKG","NLG","NNC","NO1","NSC","NT2","NTL","OCB","OGC","ORS","PAC","PAN","PC1","PDR","PET","PGC","PHC","PHR","PIT","PLP","PLX","PNJ","POW","PPC","PTB","PTC","PTL","PVD","PVP","PVT","QCG","RAL","REE","RYG","SAB","SAM","SAV","SBG","SBT","SCR","SCS","SFC","SFG","SGN","SGR","SGT","SHB","SHI","SIP","SJD","SJS","SKG","SMB","SSB","SSI","ST8","STB","STK","SVT","SZC","SZL","TCB","TCH","TCI","TCL","TCM","TCO","TCT","TDC","TDG","TDP","TEG","THG","TIP","TLD","TLG","TLH","TMT","TNH","TNI","TNT","TPB","TRC","TSC","TTA","TTF","TV2","TVS","TYA","UIC","VCA","VCB","VCG","VCI","VDS","VFG","VGC","VHC","VHM","VIB","VIC","VIP","VIX","VJC","VMD","VND","VNL","VNM","VNS","VOS","VPB","VPG","VPH","VPI","VRC","VRE","VSC","VTO","VTP","YBM","YEG"]
len(symbols)

In [None]:
import requests, time, threading
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

class NewsCrawler:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'Accept': 'text/html, */*;q=0.01', 'User-Agent': 'Mozilla/5.0',
            'X-Requested-With': 'XMLHttpRequest'
        })
        self.stock_codes = ['ACB','BCM','BID','CTG','DCG','FPT','GAS','GVR','HDB','HPG',
                            'LPB','MBB','MSN','MWG','PLX','SAB','SHB','SSB','SSI','STB',
                            'TCB','TPB','VCB','VHM','VIB','VIC','VJC','VNM','VPB','VRE']
        self.params = {
            'view':'1','type':'1','fromDate':'01/01/2025',
            'toDate':datetime.now().strftime('%m/%d/%Y'),
            'channelID':'-1','page':'1','pageSize':'20'
        }
        self.crawled_ids, self.lock = set(), threading.Lock()

    def get_content(self, url):
        try:
            soup = BeautifulSoup(self.session.get(url).text, 'html.parser')
            date, content_div = soup.find('span', class_=['datenew','date hidden-xs']).get_text(strip=True), soup.find('div', id='vst_detail')
            if not content_div: return None
            table = content_div.find('table')
            if table and 'T√†i li·ªáu ƒë√≠nh k√®m:' in table.get_text(strip=True):
                link = table.find('a')
                return {'type':'pdf','link':link['href'],'date':date} if link else None
            content = ' '.join(p.get_text(" ", strip=True) for p in content_div.find_all('p', class_='pBody'))
            return {'type':'text','content':content,'date':date}
        except: return None

    def get_data(self, code, params):
        try:
            soup = BeautifulSoup(self.session.get('https://finance.vietstock.vn/View/PagingNewsContent', params=params).text, 'html.parser')
            rows = soup.select('table.table-striped tr')
            if not rows: return False
            for r in rows:
                aid, link = r.find('a')['articleid'], urljoin('https:', r.find('a')['href'])
                with self.lock:
                    if aid in self.crawled_ids: continue
                content = self.get_content(link)
                if not content: continue
                res = {
                    'stock_code': code,'article_id': aid,'title': r.find('a').get_text(strip=True),
                    'link': link,'date': content['date'],'is_pdf': content['type']=='pdf',
                    'content': None if content['type']=='pdf' else content['content'],
                    'pdf_link': content['link'] if content['type']=='pdf' else None
                }
                with self.lock: self.crawled_ids.add(aid)
                print(res)
            return True
        except: return False

    def crawl_stock(self, code):
        params = {**self.params, 'code':code, 'page':'1'}
        while self.get_data(code, params):
            params['page'] = str(int(params['page'])+1); time.sleep(0.5)
        return code

    def crawl_all(self, max_workers=5):
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = {ex.submit(self.crawl_stock, c):c for c in self.stock_codes}
            for f in as_completed(futures):
                c = futures[f]
                try: f.result(); print(f"Ho√†n th√†nh crawl {c}")
                except Exception as e: print(f"L·ªói crawl {c}: {e}")

if __name__=="__main__":
    NewsCrawler().crawl_all()


In [None]:
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
import time

# Th·ª≠ k·∫øt n·ªëi nhi·ªÅu l·∫ßn v√¨ Scylla c√≥ th·ªÉ m·∫•t v√†i gi√¢y ƒë·ªÉ s·∫µn s√†ng
for i in range(5):
    try:
        print(f"üîå Attempt {i+1} connecting to Scylla...")
        cluster = Cluster(['localhost'], port=9042)
        session = cluster.connect()
        print("‚úÖ Connected successfully")
        break
    except Exception as e:
        print("‚ùå Failed:", e)
        time.sleep(5)
else:
    raise SystemExit("Could not connect to ScyllaDB")

# Th·ª≠ truy v·∫•n h·ªá th·ªëng
rows = session.execute(SimpleStatement("SELECT release_version FROM system.local"))
for row in rows:
    print("ScyllaDB version:", row.release_version)

cluster.shutdown()


In [None]:
import os
from confluent_kafka.admin import AdminClient, NewTopic

def create_topic(topic, partitions=12, replication=3, bootstrap=None):
    # L·∫•y bootstrap server t·ª´ bi·∫øn m√¥i tr∆∞·ªùng ho·∫∑c d√πng m·∫∑c ƒë·ªãnh
    bootstrap = bootstrap or os.getenv(
        'KAFKA_BOOTSTRAP_SERVERS',
        'localhost:29092,localhost:39092,localhost:49092'
    )

    # Kh·ªüi t·∫°o AdminClient
    admin = AdminClient({'bootstrap.servers': bootstrap})

    # Ki·ªÉm tra topic ƒë√£ t·ªìn t·∫°i ch∆∞a
    topics = admin.list_topics(timeout=5).topics
    if topic in topics:
        print(f"‚úÖ Topic '{topic}' ƒë√£ t·ªìn t·∫°i.")
        return

    # T·∫°o topic m·ªõi
    new_topic = NewTopic(
        topic=topic,
        num_partitions=partitions,
        replication_factor=replication
    )
    fs = admin.create_topics([new_topic])

    # Ki·ªÉm tra k·∫øt qu·∫£ t·∫°o
    for t, f in fs.items():
        try:
            f.result()  # N·∫øu c√≥ l·ªói s·∫Ω raise exception
            print(f"üéâ T·∫°o topic '{t}' th√†nh c√¥ng.")
        except Exception as e:
            print(f"‚ùå L·ªói khi t·∫°o topic '{t}': {e}")

if __name__ == "__main__":
    create_topic("yfinance", partitions=12, replication=3)


In [None]:
!pip install docling

In [None]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import datetime
import time

SCYLLA_NODES = ['localhost']  
SCYLLA_KEYSPACE = 'stock_data'
SCYLLA_PORT = 9042  

def connect_to_scylla():
    cluster = Cluster(SCYLLA_NODES, port=SCYLLA_PORT)
    session = cluster.connect()
    session.set_keyspace(SCYLLA_KEYSPACE)
    return session

def date_now():
    return datetime.datetime.now().strftime('%Y-%m-%d')

def current_timestamp_ms():
    """Tr·∫£ v·ªÅ th·ªùi gian hi·ªán t·∫°i d·∫°ng epoch milliseconds"""
    return int(time.time() * 1000)

if __name__ == "__main__":
    session = connect_to_scylla()

    # D·ªØ li·ªáu m·∫´u
    symbol = 'AAPL'
    timestamp = datetime.datetime.now().isoformat()
    price = 186.45
    exchange = 'NASDAQ'
    quote_type = 1
    market_hours = 1
    change_percent = 0.75
    day_volume = 15200000
    change = 1.38
    last_size = 100
    price_hint = '2'
    producer_timestamp = current_timestamp_ms()

    # Ch√®n d·ªØ li·ªáu v√†o b·∫£ng stock_prices
    session.execute("""
        INSERT INTO stock_prices (
            symbol, timestamp, price, exchange, quote_type, market_hours,
            change_percent, day_volume, change, last_size, price_hint, producer_timestamp
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        symbol, timestamp, price, exchange, quote_type, market_hours,
        change_percent, day_volume, change, last_size, price_hint, producer_timestamp
    ))

    print("‚úÖ D·ªØ li·ªáu ƒë√£ ƒë∆∞·ª£c ch√®n th√†nh c√¥ng v√†o b·∫£ng stock_prices.")

    # Truy v·∫•n ki·ªÉm tra
    rows = session.execute(f"SELECT * FROM stock_prices WHERE symbol='{symbol}' LIMIT 5;")
    for row in rows:
        print(row)


In [None]:
import pandas as pd

df = pd.read_csv('/home/obito/main/scylla-service/stock_daily_summary.csv')

df.info()

In [None]:
df.head()

In [None]:
df_ = df[df['trade_date'] == '2025-10-06']

In [None]:
import requests
symbols=["AAA","AAM","ABS","ABT","ACB","ACC","ACL","ADG","ADP","ADS","AGG","AGR","ANV","APG","APH","ASM","ASP","AST","BAF","BCE","BCM","BFC","BIC","BID","BKG","BMC","BMI","BMP","BRC","BSI","BTP","BVH","BWE","C32","CCL","CDC","CII","CLC","CLL","CMG","CMX","CNG","CRC","CRE","CSM","CSV","CTD","CTF","CTG","CTI","CTR","CTS","D2D","DAH","DBC","DBD","DBT","DC4","DCL","DCM","DGC","DGW","DHA","DHC","DHM","DIG","DMC","DPG","DPM","DPR","DRC","DRL","DSC","DSE","DSN","DTA","DVP","DXG","DXS","EIB","ELC","EVE","EVF","FCM","FCN","FIR","FIT","FMC","FPT","FRT","FTS","GAS","GDT","GEE","GEX","GIL","GMD","GSP","GVR","HAG","HAH","HAP","HAR","HAX","HCD","HCM","HDB","HDC","HDG","HHP","HHS","HHV","HID","HII","HMC","HPG","HPX","HQC","HSG","HSL","HT1","HTG","HTI","HTN","HUB","HVH","ICT","IDI","IJC","ILB","IMP","ITC","ITD","JVC","KBC","KDC","KDH","KHG","KHP","KMR","KOS","KSB","LAF","LBM","LCG","LHG","LIX","LPB","LSS","MBB","MCM","MCP","MHC","MIG","MSB","MSH","MSN","MWG","NAB","NAF","NBB","NCT","NHA","NHH","NKG","NLG","NNC","NO1","NSC","NT2","NTL","OCB","OGC","ORS","PAC","PAN","PC1","PDR","PET","PGC","PHC","PHR","PIT","PLP","PLX","PNJ","POW","PPC","PTB","PTC","PTL","PVD","PVP","PVT","QCG","RAL","REE","RYG","SAB","SAM","SAV","SBG","SBT","SCR","SCS","SFC","SFG","SGN","SGR","SGT","SHB","SHI","SIP","SJD","SJS","SKG","SMB","SSB","SSI","ST8","STB","STK","SVT","SZC","SZL","TCB","TCH","TCI","TCL","TCM","TCO","TCT","TDC","TDG","TDP","TEG","THG","TIP","TLD","TLG","TLH","TMT","TNH","TNI","TNT","TPB","TRC","TSC","TTA","TTF","TV2","TVS","TYA","UIC","VCA","VCB","VCG","VCI","VDS","VFG","VGC","VHC","VHM","VIB","VIC","VIP","VIX","VJC","VMD","VND","VNL","VNM","VNS","VOS","VPB","VPG","VPH","VPI","VRC","VRE","VSC","VTO","VTP","YBM","YEG"]

cookies = {
    '_ga': 'GA1.1.596312964.1755739379',
    '_gcl_au': '1.1.55003797.1755739380',
    'WORKING_DATE': '07-10-2025',
    'STOP_SERVICE_POPUP': 'OFF',
    '_ga_TC717PZXF6': 'GS2.1.s1759843233$o52$g1$t1759843250$j43$l0$h0',
}

headers = {
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'en-US,en;q=0.9,vi;q=0.8',
    'Connection': 'keep-alive',
    'Referer': 'https://kbbuddywts.kbsec.com.vn/HOSE',
    'Sec-Fetch-Dest': 'empty',
    'Sec-Fetch-Mode': 'cors',
    'Sec-Fetch-Site': 'same-origin',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
    'sec-ch-ua': '"Chromium";v="140", "Not=A?Brand";v="24", "Google Chrome";v="140"',
    'sec-ch-ua-mobile': '?0',
    'sec-ch-ua-platform': '"Windows"',
    # 'Cookie': '_ga=GA1.1.596312964.1755739379; _gcl_au=1.1.55003797.1755739380; WORKING_DATE=07-10-2025; STOP_SERVICE_POPUP=OFF; _ga_TC717PZXF6=GS2.1.s1759843233$o52$g1$t1759843250$j43$l0$h0',
}

params = {
    'sdate': '07-10-2025',
    'edate': '07-10-2025',
}

for symbol in symbols:  
    response = requests.get(
        'https://kbbuddywts.kbsec.com.vn/iis-server/investment/stocks/' + symbol + '/data_day',
        params=params,
        cookies=cookies,
        headers=headers,
    )
    if response.json()['data_day']:
        data = response.json()['data_day'][0]
        res = {}    
        res['symbol'] = f'{symbol}.VN'
        res['trade_date'] = data['t'].split(' ')[0]
        if df_[df_['symbol'] == f'{symbol}.VN'].empty:
            res['change'] = 0
            res['change_percent'] = 0
        else:
            res['change'] = round(float(df_[df_['symbol'] == f'{symbol}.VN']['close'].values[0]) - float(data['c']), 2)
            res['change_percent'] = round(float(res['change']) / float(df_[df_['symbol'] == f'{symbol}.VN']['close'].values[0]), 2)
        res['close'] = round(float(data['c']), 2)
        res['exchange'] = 'NASDAQ'
        res['high'] = round(float(data['h']), 2)
        res['low'] = round(float(data['l']), 2)
        res['market_hours'] = 1
        res['open'] = round(float(data['o']), 2)
        res['quote_type'] = 1
        res['volume'] = int(data['v'])
        res['vwap'] = round(float((res['high'] + res['low'] + res['close']) / 3), 2)

        # Th√™m v√†o dataframe
        df = pd.concat([df, pd.DataFrame([res])], ignore_index=True)


df.to_csv('/home/obito/main/scylla-service/stock_daily_summary_updated.csv', index=False)


In [None]:
/home/obito/main/scylla-cdc-printer/target/release/scylla-cdc-printer -k stock_data -t stock_prices -h localhost --window-size 120 --safety-interval 60 --sleep-interval 5

In [None]:
import pandas as pd 

df = pd.read_csv('/home/obito/main/scylla-service/stock_news.csv',on_bad_lines='skip')
df.info()


In [None]:
import pandas as pd

# ƒê∆∞·ªùng d·∫´n file CSV
csv1_path = "/home/obito/main/scylla-service/pdf_analysis_results.csv"
csv2_path = "/home/obito/main/scylla-service/stock_news copy.csv"

# ƒê·ªçc file CSV
df1 = pd.read_csv(csv1_path, on_bad_lines='skip')
df2 = pd.read_csv(csv2_path, on_bad_lines='skip')

# X√°c ƒë·ªãnh c√°c c·ªôt c·∫ßn so s√°nh
keys = ['stock_code', 'date', 'article_id']

# Lo·∫°i b·ªè c√°c d√≤ng trong df2 m√† tr√πng v·ªõi df1
df2_filtered = df2.merge(df1[keys], on=keys, how='left', indicator=True)
df2_filtered = df2_filtered[df2_filtered['_merge'] == 'left_only'].drop(columns=['_merge'])

# Xu·∫•t k·∫øt qu·∫£ ra file m·ªõi
output_path = "/home/obito/main/scylla-service/filtered_stock_news.csv"
df2_filtered.to_csv(output_path, index=False)

print(f"ƒê√£ l∆∞u k·∫øt qu·∫£ v√†o: {output_path}")


In [None]:
import pandas as pd 
df  = pd.read_csv('/home/obito/main/scylla-service/pdf_analysis_results.csv',on_bad_lines='skip')
df.info()

In [4]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('stock_data')

codes = ["AAA","AAM","ABS","ABT","ACB","ACC","ACL","ADG","ADP","ADS","AGG","AGR","ANV","APG","APH","ASM","ASP","AST","BAF","BCE","BCM","BFC","BIC","BID","BKG","BMC","BMI","BMP","BRC","BSI","BTP","BVH","BWE","C32","CCL","CDC","CII","CLC","CLL","CMG","CMX","CNG","CRC","CRE","CSM","CSV","CTD","CTF","CTG","CTI","CTR","CTS","D2D","DAH","DBC","DBD","DBT","DC4","DCL","DCM","DGC","DGW","DHA","DHC","DHM","DIG","DMC","DPG","DPM","DPR","DRC","DRL","DSC","DSE","DSN","DTA","DVP","DXG","DXS","EIB","ELC","EVE","EVF","FCM","FCN","FIR","FIT","FMC","FPT","FRT","FTS","GAS","GDT","GEE","GEX","GIL","GMD","GSP","GVR","HAG","HAH","HAP","HAR","HAX","HCD","HCM","HDB","HDC","HDG","HHP","HHS","HHV","HID","HII","HMC","HPG","HPX","HQC","HSG","HSL","HT1","HTG","HTI","HTN","HUB","HVH","ICT","IDI","IJC","ILB","IMP","ITC","ITD","JVC","KBC","KDC","KDH","KHG","KHP","KMR","KOS","KSB","LAF","LBM","LCG","LHG","LIX","LPB","LSS","MBB","MCM","MCP","MHC","MIG","MSB","MSH","MSN","MWG","NAB","NAF","NBB","NCT","NHA","NHH","NKG","NLG","NNC","NO1","NSC","NT2","NTL","OCB","OGC","ORS","PAC","PAN","PC1","PDR","PET","PGC","PHC","PHR","PIT","PLP","PLX","PNJ","POW","PPC","PTB","PTC","PTL","PVD","PVP","PVT","QCG","RAL","REE","RYG","SAB","SAM","SAV","SBG","SBT","SCR","SCS","SFC","SFG","SGN","SGR","SGT","SHB","SHI","SIP","SJD","SJS","SKG","SMB","SSB","SSI","ST8","STB","STK","SVT","SZC","SZL","TCB","TCH","TCI","TCL","TCM","TCO","TCT","TDC","TDG","TDP","TEG","THG","TIP","TLD","TLG","TLH","TMT","TNH","TNI","TNT","TPB","TRC","TSC","TTA","TTF","TV2","TVS","TYA","UIC","VCA","VCB","VCG","VCI","VDS","VFG","VGC","VHC","VHM","VIB","VIC","VIP","VIX","VJC","VMD","VND","VNL","VNM","VNS","VOS","VPB","VPG","VPH","VPI","VRC","VRE","VSC","VTO","VTP","YBM","YEG"]
for code in codes:
    session.execute("""
         SELECT * FROM stock_daily_summary
        WHERE symbol = %s
          AND trade_date >= '2025-09-28';
    """, [code])

len(codes)

285

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('stock_data')

rows = session.execute("""
    SELECT * FROM stock_news
    WHERE date >= '2025-09-29T00:00:00';
""")

for row in rows:
    print(row)

In [None]:
from cassandra.cluster import Cluster
from datetime import datetime, timedelta

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('stock_data')

codes = ["AAA","AAM","ABS","ABT","ACB","ACC","ACL","ADG","ADP","ADS","AGG","AGR","ANV","APG","APH","ASM","ASP","AST","BAF","BCE","BCM","BFC","BIC","BID","BKG","BMC","BMI","BMP","BRC","BSI","BTP","BVH","BWE","C32","CCL","CDC","CII","CLC","CLL","CMG","CMX","CNG","CRC","CRE","CSM","CSV","CTD","CTF","CTG","CTI","CTR","CTS","D2D","DAH","DBC","DBD","DBT","DC4","DCL","DCM","DGC","DGW","DHA","DHC","DHM","DIG","DMC","DPG","DPM","DPR","DRC","DRL","DSC","DSE","DSN","DTA","DVP","DXG","DXS","EIB","ELC","EVE","EVF","FCM","FCN","FIR","FIT","FMC","FPT","FRT","FTS","GAS","GDT","GEE","GEX","GIL","GMD","GSP","GVR","HAG","HAH","HAP","HAR","HAX","HCD","HCM","HDB","HDC","HDG","HHP","HHS","HHV","HID","HII","HMC","HPG","HPX","HQC","HSG","HSL","HT1","HTG","HTI","HTN","HUB","HVH","ICT","IDI","IJC","ILB","IMP","ITC","ITD","JVC","KBC","KDC","KDH","KHG","KHP","KMR","KOS","KSB","LAF","LBM","LCG","LHG","LIX","LPB","LSS","MBB","MCM","MCP","MHC","MIG","MSB","MSH","MSN","MWG","NAB","NAF","NBB","NCT","NHA","NHH","NKG","NLG","NNC","NO1","NSC","NT2","NTL","OCB","OGC","ORS","PAC","PAN","PC1","PDR","PET","PGC","PHC","PHR","PIT","PLP","PLX","PNJ","POW","PPC","PTB","PTC","PTL","PVD","PVP","PVT","QCG","RAL","REE","RYG","SAB","SAM","SAV","SBG","SBT","SCR","SCS","SFC","SFG","SGN","SGR","SGT","SHB","SHI","SIP","SJD","SJS","SKG","SMB","SSB","SSI","ST8","STB","STK","SVT","SZC","SZL","TCB","TCH","TCI","TCL","TCM","TCO","TCT","TDC","TDG","TDP","TEG","THG","TIP","TLD","TLG","TLH","TMT","TNH","TNI","TNT","TPB","TRC","TSC","TTA","TTF","TV2","TVS","TYA","UIC","VCA","VCB","VCG","VCI","VDS","VFG","VGC","VHC","VHM","VIB","VIC","VIP","VIX","VJC","VMD","VND","VNL","VNM","VNS","VOS","VPB","VPG","VPH","VPI","VRC","VRE","VSC","VTO","VTP","YBM","YEG"]
date_range = None
date_limit = datetime.now() + timedelta(days=1)
print(date_limit)
for code in codes:
    rows = session.execute("""
        SELECT date FROM stock_news
        WHERE stock_code = %s
        AND date <= %s
        LIMIT 1;
""", [code, date_limit])
    if rows:
        if date_range is None:
            date_range = rows[0].date
        else:
            if date_range < rows[0].date:
                date_range = rows[0].date

print(date_range)


In [7]:
import re
from cassandra.cluster import Cluster
import datetime

now = datetime.datetime.now()
    
# X√°c ƒë·ªãnh ng√†y giao d·ªãch g·∫ßn nh·∫•t (tr√°nh cu·ªëi tu·∫ßn)
trading_date = now.date()
while trading_date.weekday() >= 5:
    trading_date = trading_date - datetime.timedelta(days=1)

# N·∫øu ngo√†i gi·ªù giao d·ªãch (tr∆∞·ªõc 9h ho·∫∑c sau 15h), l·∫•y phi√™n giao d·ªãch tr∆∞·ªõc ƒë√≥
if now.hour < 9 or now.hour >= 15:
    trading_date = trading_date - datetime.timedelta(days=1)
    while trading_date.weekday() >= 5:
        trading_date = trading_date - datetime.timedelta(days=1)

# L·∫•y d·ªØ li·ªáu t·ª´ 9h ƒë·∫øn 15h c·ªßa ng√†y giao d·ªãch
start_dt = datetime.datetime.combine(trading_date, datetime.time(9, 0, 0))
end_dt = datetime.datetime.combine(trading_date, datetime.time(15, 0, 0))

start_timestamp_ms = str(int(start_dt.timestamp() * 1000))
end_timestamp_ms = str(int(end_dt.timestamp() * 1000))
print(f"start_timestamp_ms: {start_timestamp_ms}, end_timestamp_ms: {end_timestamp_ms}")
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('stock_data')
rows = session.execute(f"""
    SELECT * FROM stock_prices
    WHERE symbol = 'VIC.VN'
    AND timestamp >= %s AND timestamp <= %s;
""", [start_timestamp_ms, end_timestamp_ms])

def row_to_dict(row):
    return {
        "symbol": row.symbol,
        "timestamp": row.timestamp,
        "price": row.price,
        "change": row.change,
        "change_percent": row.change_percent,
        "day_volume": row.day_volume,
        "last_size": row.last_size,
    }
res = [row_to_dict(row) for row in rows.all()]
res[:10]






start_timestamp_ms: 1761271200000, end_timestamp_ms: 1761292800000


[{'symbol': 'VIC.VN',
  'timestamp': '1761291924000',
  'price': 219000.0,
  'change': 4000.0,
  'change_percent': 1.86,
  'day_volume': 3428419,
  'last_size': 1000},
 {'symbol': 'VIC.VN',
  'timestamp': '1761291923000',
  'price': 219000.0,
  'change': 4000.0,
  'change_percent': 1.86,
  'day_volume': 3284719,
  'last_size': 300},
 {'symbol': 'VIC.VN',
  'timestamp': '1761290997000',
  'price': 218900.0,
  'change': 3900.0,
  'change_percent': 1.81,
  'day_volume': 3282303,
  'last_size': 1000},
 {'symbol': 'VIC.VN',
  'timestamp': '1761290990000',
  'price': 218800.0,
  'change': 3800.0,
  'change_percent': 1.77,
  'day_volume': 3281303,
  'last_size': 700},
 {'symbol': 'VIC.VN',
  'timestamp': '1761290985000',
  'price': 218800.0,
  'change': 3800.0,
  'change_percent': 1.77,
  'day_volume': 3280603,
  'last_size': 0},
 {'symbol': 'VIC.VN',
  'timestamp': '1761290983000',
  'price': 218800.0,
  'change': 3800.0,
  'change_percent': 1.77,
  'day_volume': 3278991,
  'last_size': 100}

In [None]:
import psycopg2
from psycopg2.extras import execute_values
import os


WAREHOUSE_CONFIG = {
    'host': 'localhost',
    'database': 'warehouse',
    'user': 'warehouse_user',
    'password': 'warehouse_pass',
    'port': 5433
}
conn = psycopg2.connect(**WAREHOUSE_CONFIG)
cursor = conn.cursor()

cursor.execute("SELECT * FROM fact_news LIMIT 10")
latest_date_in_warehouse = cursor.fetchone()[0]
print(latest_date_in_warehouse)

In [None]:
# Chuy·ªÉn latest_date_in_warehouse sang datetime
from datetime import datetime, time
latest_date_in_warehouse = datetime(2014, 1, 1)
query_timestamp = datetime.combine(latest_date_in_warehouse, time.min)
print(query_timestamp)  
rows = session.execute(f"""
    SELECT article_id, stock_code, "date", content, sentiment_score, crawled_at
    FROM stock_news
    WHERE "date" >= %s
""", [query_timestamp]) 

rows_list = list(rows)
# rows_list

In [None]:
rows = session.execute("""
    SELECT symbol, trade_date, open, high, low, close, volume, 
            change, change_percent, vwap, exchange, quote_type, market_hours
    FROM stock_daily_summary
    WHERE symbol = 'ACB.VN' AND trade_date >= '2025-10-17'
""")

rows_list = list(rows)
rows_list[:10]

In [None]:
from datetime import datetime, timedelta

# Ng√†y g·ªëc (Unix epoch)
epoch_start = datetime(1970, 1, 1)

# T√≠nh to√°n ng√†y
date_1 = epoch_start + timedelta(days=20355)
date_1


In [3]:
from datetime import datetime
now = datetime.now()
weekday = now.weekday()
weekday


6