In [3]:
import  requests
def events_getter ():
    url = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events"

    while True:
        response = requests.get(url)
        data = response.json()
        yield data

        if 'next' not in response.links:
            break
        url = response.links['next']['url']

In [4]:
all_data = []
pages = events_getter()
for page in pages:
    all_data.extend(page)
len(all_data)

299

In [5]:
from datetime import datetime
def process_events(event):
    result = {}
    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.fromisoformat(event['created_at'])
    result['create_at'] = parsed_timestamp.timestamp()

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('topics', [])

    return result, topics


In [None]:
processed_events = []
processed_topics = []
for event in all_data:
    processed_event, topics = process_events(event)
    processed_events.append(processed_event)
    processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

LOADING DATA INTO DUCKDB

Nếu SQLite là "vị vua" trong thế giới dữ liệu giao dịch (OLTP) cho ứng dụng nhỏ, thì DuckDB được mệnh danh là "SQLite dành cho phân tích dữ liệu" (OLAP).
Dưới đây là những đặc điểm cốt lõi giúp bạn hiểu rõ DuckDB là gì:
1. In-process Database (Cơ sở dữ liệu nhúng)
Giống như SQLite, DuckDB không cần server. Nó không chạy như một dịch vụ riêng biệt (như PostgreSQL hay MySQL). Nó tích hợp trực tiếp vào ứng dụng của bạn (ví dụ: chạy ngay bên trong script Python).
Ưu điểm: Không cần cài đặt server phức tạp, không tốn thời gian kết nối qua mạng, dữ liệu nằm ngay trong bộ nhớ hoặc file cục bộ.
2. Columnar Storage (Lưu trữ dạng cột)
Đây là điểm khác biệt lớn nhất giữa DuckDB và SQLite:
SQLite (Row-based): Lưu dữ liệu theo từng dòng. Rất nhanh khi bạn muốn tìm 1 dòng cụ thể, nhưng chậm khi tính toán trên hàng triệu dòng.
DuckDB (Column-based): Lưu dữ liệu theo từng cột. Khi bạn tính SUM(doanh_thu), DuckDB chỉ đọc đúng cột doanh_thu và bỏ qua các cột khác. Điều này giúp nó cực kỳ nhanh cho các tác vụ phân tích (Analytics).
3. Tối ưu cho OLAP (Xử lý phân tích trực tuyến)
DuckDB được thiết kế để chạy các câu lệnh SQL phức tạp trên các tập dữ liệu lớn (hàng triệu đến hàng trăm triệu dòng) ngay trên máy tính cá nhân. Nó sử dụng công nghệ Vectorized Query Execution (thực thi truy vấn theo vector), giúp tận dụng tối đa sức mạnh của CPU hiện đại.
4. Khả năng "đọc mọi thứ" (Swiss Army Knife)
DuckDB cực kỳ mạnh mẽ trong việc đọc và truy vấn trực tiếp các định dạng file dữ liệu phổ biến mà không cần nạp (import) vào database:
Đọc trực tiếp file Parquet, CSV, JSON.
Đọc dữ liệu trực tiếp từ Pandas DataFrame, Polars, hoặc Arrow.
Có thể kết nối và truy vấn file trên S3, HTTP, hoặc Hugging Face.
5. Tại sao Data Engineer lại yêu thích DuckDB?
Cực kỳ nhanh: Trên máy tính cá nhân, nó có thể xử lý dữ liệu nhanh ngang ngửa hoặc hơn cả các cụm Spark lớn nếu dữ liệu vừa phải (vài GB đến vài chục GB).
Cài đặt trong 1 giây: Chỉ cần pip install duckdb.
SQL chuẩn: Hỗ trợ SQL rất đầy đủ và hiện đại.
Tích hợp sâu với Python: Bạn có thể dùng SQL để truy vấn một biến Pandas DataFrame và trả về kết quả dưới dạng một DataFrame khác.

In [27]:
import duckdb

# create a connection to a DuckDB database
conn = duckdb.connect("github_events.db")

In [28]:
processed_events[0]

{'id': '5803927209',
 'type': 'WatchEvent',
 'public': True,
 'create_at': 1768390569.0,
 'actor__id': 104862743,
 'actor__login': 'felixjunious',
 'repo__id': 419661684}

In [29]:
# create table
conn.execute("""
CREATE TABLE IF NOT EXISTS github_events (
    id TEXT PRIMARY KEY,
    type TEXT,
    public BOOLEAN,
    create_at DOUBLE,
    actor__id BIGINT,
    actor__login TEXT
);
""")

<_duckdb.DuckDBPyConnection at 0x16b94741630>

In [30]:
fattened_data = [
    (
        record["id"],
        record["type"],
        record["public"],
        record["create_at"],
        record["actor__id"],
        record["actor__login"]
    )
    for record in processed_events
]


# insert data into the "github_events" table
# Tại sao dùng ?: Thay vì viết trực tiếp dữ liệu vào câu lệnh SQL, bạn dùng dấu hỏi để thư viện (như duckdb hoặc sqlite) tự động điền dữ liệu từ danh sách Python vào. Cách này giúp bảo mật (chống SQL Injection) và xử lý đúng các kiểu dữ liệu (chuỗi, số, ngày tháng).
# ON CONFLICT (id) DO NOTHING: nếu có id (khóa chính) trùng thì bỏ qua dòng đó
conn.executemany("""
INSERT INTO github_events (id, type, public, create_at, actor__id, actor__login)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING
""", fattened_data)

<_duckdb.DuckDBPyConnection at 0x16b94741630>

In [31]:
df = conn.execute("""SELECT * FROM github_events""").df()
df.head()

Unnamed: 0,id,type,public,create_at,actor__id,actor__login
0,5803927209,WatchEvent,True,1768391000.0,104862743,felixjunious
1,5802916513,WatchEvent,True,1768388000.0,40302117,Benjaminmoukan
2,5802634022,WatchEvent,True,1768387000.0,80692412,MGmahmoud
3,5802338067,WatchEvent,True,1768386000.0,254880454,chaitras8970
4,5802085931,WatchEvent,True,1768386000.0,153520240,mouaad-here


In [22]:
conn.close()

# DYNAMIC SCHEMA MANAGEMENT

In [32]:
from datetime import datetime
def process_events(event):
    result = {}
    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.fromisoformat(event['created_at'])
    result['create_at'] = parsed_timestamp.timestamp()

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    result['repo__id'] = event['repo']['id']

    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('topics', [])

    return result, topics


In [33]:
processed_events = []
processed_topics = []
for event in all_data:
    processed_event, topics = process_events(event)
    processed_events.append(processed_event)
    processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

[{'id': '5803927209', 'type': 'WatchEvent', 'public': True, 'create_at': 1768390569.0, 'actor__id': 104862743, 'actor__login': 'felixjunious', 'repo__id': 419661684}, {'id': '5802916513', 'type': 'WatchEvent', 'public': True, 'create_at': 1768387747.0, 'actor__id': 40302117, 'actor__login': 'Benjaminmoukan', 'repo__id': 419661684}, {'id': '5802634022', 'type': 'WatchEvent', 'public': True, 'create_at': 1768387002.0, 'actor__id': 80692412, 'actor__login': 'MGmahmoud', 'repo__id': 419661684}, {'id': '5802338067', 'type': 'WatchEvent', 'public': True, 'create_at': 1768386201.0, 'actor__id': 254880454, 'actor__login': 'chaitras8970', 'repo__id': 419661684}, {'id': '5802085931', 'type': 'WatchEvent', 'public': True, 'create_at': 1768385570.0, 'actor__id': 153520240, 'actor__login': 'mouaad-here', 'repo__id': 419661684}]
[]


In [34]:
import duckdb

# create a connection to a DuckDB database
conn = duckdb.connect("github_events.db")

1. PRAGMA table_info(github_events)
Đây là một câu lệnh SQL đặc biệt (gọi là PRAGMA) dùng để truy vấn siêu dữ liệu (metadata) của một bảng. Thay vì trả về dữ liệu bên trong bảng, nó trả về thông tin về cấu trúc của bảng đó.
Kết quả của lệnh này thường trả về các cột như sau:
cid: ID của cột.
name: Tên của cột (Đây là cái chúng ta cần).
type: Kiểu dữ liệu (INTEGER, VARCHAR, v.v.).
notnull: Cột có bắt buộc (NOT NULL) hay không.
dflt_value: Giá trị mặc định.
pk: Có phải là Khóa chính (Primary Key) không.

In [35]:
current_columns = conn.execute("""PRAGMA table_info(github_events)""").fetchall()
print(current_columns)

[(0, 'id', 'VARCHAR', True, None, True), (1, 'type', 'VARCHAR', False, None, False), (2, 'public', 'BOOLEAN', False, None, False), (3, 'create_at', 'DOUBLE', False, None, False), (4, 'actor__id', 'BIGINT', False, None, False), (5, 'actor__login', 'VARCHAR', False, None, False)]


In [36]:
current_columns = {row[1] for row in conn.execute("""PRAGMA table_info(github_events)""").fetchall()}
print(current_columns)

{'actor__id', 'actor__login', 'public', 'create_at', 'type', 'id'}


Giải thích ngắn gọn từng bước:
1. Duyệt dữ liệu: Lặp qua các bản ghi trong processed_events (bắt đầu từ bản ghi thứ 11).
Kiểm tra cột mới: Với mỗi khóa (key) trong bản ghi, nó kiểm tra xem khóa đó đã tồn tại trong bảng dữ liệu (current_columns) hay chưa.
2. Xác định kiểu dữ liệu: Nếu là cột mới, nó kiểm tra giá trị của dữ liệu để chọn kiểu SQL phù hợp:
True/False → BOOLEAN
Số nguyên → BIGINT
Số thập phân → DOUBLE
Còn lại → TEXT (mặc định)
3. Cập nhật bảng: Chạy lệnh ALTER TABLE ... ADD COLUMN để thêm cột mới đó vào cơ sở dữ liệu DuckDB ngay lập tức.

4. Mục đích: Giúp hệ thống của bạn "linh hoạt", tự động thích ứng khi nguồn dữ liệu (API) thay đổi cấu trúc mà không cần bạn phải sửa code thủ công.

In [38]:
# 3. Detect and add new columns dynamically
# chạy qua 10 dòng đầu tiên
for record in processed_events[10:]:
    for key in record.keys():
        if key not in current_columns:
            col_type = "TEXT" # Default type
            if isinstance(record[key], bool):
                col_type = "BOOLEAN"
            elif isinstance(record[key], int):
                col_type = "BIGINT"
            elif isinstance(record[key], float):
                col_type = "DOUBLE"
            print(f"ALTER TABLE github_events ADD COLUMN {key} {col_type};")
            alter_query = f"ALTER TABLE github_events ADD COLUMN {key} {col_type};"
            conn.execute(alter_query)
            print(f"Added new column: {key} ({col_type})")
            current_columns.add(key) # update shema tracking

ALTER TABLE github_events ADD COLUMN repo__id BIGINT;
Added new column: repo__id (BIGINT)


1. columns = sorted(current_columns): Sắp xếp tên các cột theo thứ tự bảng chữ cái. Điều này cực kỳ quan trọng để đảm bảo dữ liệu của mọi dòng luôn khớp đúng thứ tự cột trong câu lệnh INSERT.
2. record.get(col, None): Với mỗi cột, nó sẽ lấy giá trị từ bản ghi. Nếu bản ghi đó thiếu cột này (do API không trả về), nó sẽ tự động điền None (tương đương với NULL trong SQL) thay vì báo lỗi.
3. tuple(...): Chuyển dữ liệu của một dòng từ dạng Dictionary (từ điển) sang Tuple (bộ giá trị). DuckDB cần dạng Tuple để thực hiện việc chèn dữ liệu hàng loạt.
4. flattened_data = [...]: Kết quả cuối cùng là một danh sách các Tuple, sẵn sàng để nạp vào hàm executemany.

5. Mục đích: Đưa các bản ghi có thể "lồi lõm" (thiếu cột này, thừa cột kia) về một ma trận dữ liệu đồng nhất (giống như một bảng Excel) để nạp vào database một cách an toàn.

In [42]:
# 4. prepare data for insertion (handle missing fields)
columns = sorted(current_columns) # Maintain consistent order
print(columns)
flattened_data = [
    tuple(record.get(col, None) for col in columns) for record in processed_events
]


['actor__id', 'actor__login', 'create_at', 'id', 'public', 'repo__id', 'type']


[(104862743,
  'felixjunious',
  1768390569.0,
  '5803927209',
  True,
  419661684,
  'WatchEvent'),
 (40302117,
  'Benjaminmoukan',
  1768387747.0,
  '5802916513',
  True,
  419661684,
  'WatchEvent'),
 (80692412,
  'MGmahmoud',
  1768387002.0,
  '5802634022',
  True,
  419661684,
  'WatchEvent'),
 (254880454,
  'chaitras8970',
  1768386201.0,
  '5802338067',
  True,
  419661684,
  'WatchEvent'),
 (153520240,
  'mouaad-here',
  1768385570.0,
  '5802085931',
  True,
  419661684,
  'WatchEvent'),
 (116809492,
  'zhyj900519',
  1768385317.0,
  '5801978555',
  True,
  419661684,
  'WatchEvent'),
 (110913416,
  'yogesh5121',
  1768383235.0,
  '5801190875',
  True,
  419661684,
  'ForkEvent'),
 (110913416,
  'yogesh5121',
  1768383220.0,
  '5801185153',
  True,
  419661684,
  'WatchEvent'),
 (113888475,
  'Aykutofficial',
  1768383033.0,
  '5801112536',
  True,
  419661684,
  'WatchEvent'),
 (32595233,
  'angelo96m',
  1768382988.0,
  '5801094694',
  True,
  419661684,
  'WatchEvent'),
 (96

In [50]:
# 5. Construct dynamic SQL for insertion
# Tạo ra chuỗi các dấu hỏi ?, ?, ? tương ứng với số lượng cột.
# nó sẽ chèn dấu , cách vào giũa các phần tử trong mảng
placeholders = ", ".join(["?" for _ in columns])

# tương tự chèn dấu , cách vào các cột
columns_str = ", ".join(columns)


insert_query = f"""
INSERT INTO github_events ({columns_str})
Values ({placeholders})
ON CONFLICT (id) DO UPDATE SET {", ".join(f"{col}=excluded.{col}" for col in columns if col != "id")}
"""

     - ON CONFLICT (id): Nếu ID đã tồn tại trong bảng.
     - DO UPDATE SET: Thay vì bỏ qua, nó sẽ cập nhật dữ liệu mới vào các dòng cũ.
     - f"{col}=excluded.{col}":
        excluded là một từ khóa đặc biệt trong SQL. Nó đại diện cho dữ liệu mới mà bạn đang định chèn vào.
        Câu này có nghĩa là: "Lấy giá trị mới (excluded.col) đè lên giá trị cũ đang có trong bảng (col)".
     - if col != "id": Điều kiện này để đảm bảo chúng ta không cập nhật cột ID (vì ID là khóa chính, dùng để so khớp nên không cần và không nên ghi đè chính nó).

    # Ví dụ câu lệnh hoàn chỉnh mà nó sinh ra sẽ trông như thế này:
    INSERT INTO github_events (id, type, public)
    VALUES (?, ?, ?)
    ON CONFLICT (id) DO UPDATE SET
        type = excluded.type,
        public = excluded.public

In [51]:
conn.executemany(insert_query, flattened_data)

<_duckdb.DuckDBPyConnection at 0x16b804253f0>

In [52]:
df = conn.execute("""SELECT * FROM github_events""").df()
df.head()

Unnamed: 0,id,type,public,create_at,actor__id,actor__login,repo__id
0,5803927209,WatchEvent,True,1768391000.0,104862743,felixjunious,419661684
1,5802916513,WatchEvent,True,1768388000.0,40302117,Benjaminmoukan,419661684
2,5802634022,WatchEvent,True,1768387000.0,80692412,MGmahmoud,419661684
3,5802338067,WatchEvent,True,1768386000.0,254880454,chaitras8970,419661684
4,5802085931,WatchEvent,True,1768386000.0,153520240,mouaad-here,419661684
