In [1]:
import sqlite3
import pandas as pd

In [2]:
conn = sqlite3.connect("data/sqlite/jobs.db")
cur = conn.cursor()


In [5]:
# show all tables name
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cur.fetchall()
print("Tables:", tables)

Tables: [('job_links',), ('sqlite_sequence',)]


In [6]:
cur.execute("""
INSERT INTO job_links (url, normalized_url, status, attempts, last_error, next_try_at)
VALUES 
('https://example.com/job/1', 'https://example.com/job/1', 'pending', 0, NULL, '2025-11-11 10:00:00'),
('https://example.com/job/2', 'https://example.com/job/2', 'failed', 3, 'Timeout error', '2025-11-11 11:00:00'),
('https://example.com/job/3', 'https://example.com/job/3', 'completed', 1, NULL, NULL),
('https://example.com/job/4', 'https://example.com/job/4', 'pending', 0, NULL, '2025-11-12 09:00:00'),
('https://example.com/job/5', 'https://example.com/job/5', 'failed', 2, 'Connection refused', '2025-11-12 10:30:00');
""")
conn.commit()

In [8]:
cur.execute("select * from job_links")
jobs = cur.fetchall()
print("Jobs:", jobs)


Jobs: [(1, 'https://example.com/job/1', 'https://example.com/job/1', 'pending', 0, None, '2025-11-11 10:00:00', '2025-11-10 13:21:20', '2025-11-10 13:21:20'), (2, 'https://example.com/job/2', 'https://example.com/job/2', 'failed', 3, 'Timeout error', '2025-11-11 11:00:00', '2025-11-10 13:21:20', '2025-11-10 13:21:20'), (3, 'https://example.com/job/3', 'https://example.com/job/3', 'completed', 1, None, None, '2025-11-10 13:21:20', '2025-11-10 13:21:20'), (4, 'https://example.com/job/4', 'https://example.com/job/4', 'pending', 0, None, '2025-11-12 09:00:00', '2025-11-10 13:21:20', '2025-11-10 13:21:20'), (5, 'https://example.com/job/5', 'https://example.com/job/5', 'failed', 2, 'Connection refused', '2025-11-12 10:30:00', '2025-11-10 13:21:20', '2025-11-10 13:21:20')]


In [11]:
jobs_df = pd.DataFrame(jobs)
print(jobs_df.head())

   0                          1                          2          3  4  \
0  1  https://example.com/job/1  https://example.com/job/1    pending  0   
1  2  https://example.com/job/2  https://example.com/job/2     failed  3   
2  3  https://example.com/job/3  https://example.com/job/3  completed  1   
3  4  https://example.com/job/4  https://example.com/job/4    pending  0   
4  5  https://example.com/job/5  https://example.com/job/5     failed  2   

                    5                    6                    7  \
0                None  2025-11-11 10:00:00  2025-11-10 13:21:20   
1       Timeout error  2025-11-11 11:00:00  2025-11-10 13:21:20   
2                None                 None  2025-11-10 13:21:20   
3                None  2025-11-12 09:00:00  2025-11-10 13:21:20   
4  Connection refused  2025-11-12 10:30:00  2025-11-10 13:21:20   

                     8  
0  2025-11-10 13:21:20  
1  2025-11-10 13:21:20  
2  2025-11-10 13:21:20  
3  2025-11-10 13:21:20  
4  2025-11-10 1

In [12]:
cur.close()
conn.close()

In [None]:
from datetime import timedelta
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

# đường dẫn tới file chứa keywords (1 dòng = 1 keyword)
KEYWORDS_FILE = "/path/to/keywords.txt"  # <-- đổi path này

# timezone (theo dev metadata của bạn)
tz = pendulum.timezone("Asia/Bangkok")

# đọc keyword từ file txt
def read_keywords(path):
    with open(path, "r", encoding="utf-8") as f:
        # loại bỏ dòng rỗng và trim
        return [line.strip() for line in f if line.strip()]

keywords = read_keywords(KEYWORDS_FILE)

# hàm thực thi crawl cho 1 keyword (thay bằng hàm crawl thực của bạn)
def crawl_keyword(keyword, **context):
    # ví dụ: in ra, thực tế bạn gọi logic crawl ở đây
    print(f"Start crawling for keyword: {keyword}")
    # TODO: call your crawler logic (requests/bs4/scrapy)
    # raise Exception(...) để test retry
    return f"done: {keyword}"

# cấu hình mặc định DAG
default_args = {
    "owner": "you",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# base start date (ngày bắt đầu chung). Điều chỉnh nếu muốn bắt đầu từ ngày khác.
# lưu ý: minute offset sẽ cộng vào giờ này
base_start = tz.datetime(2025, 11, 11, 0, 0, 0)  # ví dụ bắt đầu từ 2025-11-11 00:00 Asia/Bangkok

# tạo nhiều DAG động
for idx, kw in enumerate(keywords):
    # offset 15 phút cho mỗi keyword
    offset_minutes = (idx * 15) % 60
    offset_hours = (idx * 15) // 60

    # start_date cho DAG này (shift bằng offset)
    start_date = base_start.add(hours=offset_hours, minutes=offset_minutes)

    dag_id = f"crawl_keyword_{idx+1}_{kw.replace(' ', '_').replace('/', '_')}".lower()

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description=f"Crawl for keyword: {kw}",
        schedule_interval=timedelta(days=1),  # chạy hàng ngày
        start_date=start_date,
        catchup=False,               # tránh chạy các dag runs cũ khi deploy lần đầu
        max_active_runs=1,
        tags=["crawler", "keywords"],
    )

    # mỗi DAG có 1 task hoặc nhiều task tuỳ nhu cầu
    with dag:
        crawl = PythonOperator(
            task_id="crawl_keyword",
            python_callable=crawl_keyword,
            op_kwargs={"keyword": kw},
            provide_context=True,
        )

    # bắt buộc: expose DAG vào globals để Airflow load được (pattern thường dùng)
    globals()[dag_id] = dag


In [11]:
link = "https://www.topcv.vn/viec-lam/data-analyst-hr/1933814.html?ta_source=JobSearchList_LinkDetail&u_sr_id=ZHh0k8QsiUUy9gvUe1LljByND22jhDhvPAolZ07R_1762749550"

print(link.split('?')[0].split('/')[-2: ])

['data-analyst-hr', '1933814.html']


In [None]:
CREATE TABLE job_links_topcv( 
id integer primary key autoincrement,
url text not null,
hash_value text not null,
status text not null default 'pending',
attempts integer not null default 0, 
last_error text,
next_try_at datetime,
created_at datetime default current_timestamp,
updated_at datetime default current_timestamp, source varchar(50),
unique(hash_value)
);