# **Data Engineering for ETL: Data Ingestion**


## **What is data ingestion?**  
Data ingestion is the process of **extracting** data from a source, transporting it to a suitable environment, and preparing it for use. This often includes **normalizing**, **cleaning**, and **adding metadata**.  






üí° **What is a schema?**  
A schema defines the expected format and structure of data, including field names, data types, and relationships.  



---

#### **3. When to use Batch vs. Streaming**  

| **Factor**        | **Batch processing**  | **Streaming processing** |
|------------------|------------------|-------------------|
| **Latency**      | High (minutes, hours) | Low (milliseconds, seconds) |
| **Data volume**  | Large batches | Continuous small events |
| **Use case**     | Reports, ETL, backups | Real-time analytics, event-driven apps |
| **Complexity**   | Easier to manage | Requires event-driven architecture |
| **Cost**         | Lower for periodic runs | Higher for always-on processing |


There are some limits in API:

1. **Hardware limits**: Be mindful of memory (RAM) and storage (disk space). Overloading these can crash your system.  
2. **Network reliability**: Networks can fail! Always account for retries to make your pipelines more robust.  
   - Tip: Use libraries like `dlt` that have built-in retry mechanisms.  
3. **API rate limits**: APIs often restrict the number of requests you can make in a given time.  
   - Tip: Check the API documentation to understand its limits (e.g., [Zendesk](https://developer.zendesk.com/api-reference/introduction/rate-limits/), [Shopify](https://shopify.dev/docs/api/usage/rate-limits)).  

There are even more challenges to consider when working with APIs ‚Äî such as **pagination and authentication**. Let‚Äôs explore how to handle these effectively when working with **REST APIs**.


In [3]:
import requests

result = requests.get("https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events").json()
print(result)

[{'id': '52054879166', 'type': 'WatchEvent', 'actor': {'id': 212767187, 'login': 'reyaleman-tech', 'display_login': 'reyaleman-tech', 'gravatar_id': '', 'url': 'https://api.github.com/users/reyaleman-tech', 'avatar_url': 'https://avatars.githubusercontent.com/u/212767187?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2025-07-14T12:25:55Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '52051650046', 'type': 'ForkEvent', 'actor': {'id': 91944358, 'login': 'qkrwnscjf', 'display_login': 'qkrwnscjf', 'gravatar_id': '', 'url': 'https://api.github.com/users/qkrwnscjf', 'avatar_url': 'https://avatars.githubusercontent.com/u/91944358?'}, 'repo': {'id': 419661684, 'name'

#### **Common Challenges**

#### **1. Rate limits**  

To avoid hitting these limits, we can:  
- **Monitor API rate limits** ‚Äì Some APIs provide headers that tell you how many requests you have left.  
- **Pause requests when needed** ‚Äì If we're close to the limit, we wait before making more requests.  
- **Implement automatic retries** ‚Äì If a request fails due to rate limiting, we can wait and retry after some time.  

Some APIs provide a **retry-after** header, which tells you how long to wait before making another request. Always check the API documentation for best practices!


In [4]:
import time
import requests


response = requests.get("https://api.github.com/rate_limit")
rate_limit = response.json()["rate"]["remaining"]
print("The rate limit remains: ", rate_limit)

if rate_limit == 0:
    time.sleep(60)  # Wait before making more requests

The rate limit remains:  60


---

#### **2. Authentication**  
Many APIs require an **API key or token** to access data securely. Without authentication, requests may be limited or denied.  

**Types of Authentication in APIs:**  
- **API Keys** ‚Äì A simple token included in the request header or URL.  
- **OAuth Tokens** ‚Äì A more secure authentication method requiring user authorization.  
- **Basic Authentication** ‚Äì Using a username and password (less common today).  



In [5]:
import os
import requests
from google.colab import userdata

API_TOKEN = userdata.get('ACCESS_TOKEN')


headers = {"Authorization": f"Bearer {API_TOKEN}"}
response = requests.get("https://api.github.com/user", headers=headers)
print(response.json())

{'login': 'vedantparmar12', 'id': 122482472, 'node_id': 'U_kgDOB0zvKA', 'avatar_url': 'https://avatars.githubusercontent.com/u/122482472?v=4', 'gravatar_id': '', 'url': 'https://api.github.com/users/vedantparmar12', 'html_url': 'https://github.com/vedantparmar12', 'followers_url': 'https://api.github.com/users/vedantparmar12/followers', 'following_url': 'https://api.github.com/users/vedantparmar12/following{/other_user}', 'gists_url': 'https://api.github.com/users/vedantparmar12/gists{/gist_id}', 'starred_url': 'https://api.github.com/users/vedantparmar12/starred{/owner}{/repo}', 'subscriptions_url': 'https://api.github.com/users/vedantparmar12/subscriptions', 'organizations_url': 'https://api.github.com/users/vedantparmar12/orgs', 'repos_url': 'https://api.github.com/users/vedantparmar12/repos', 'events_url': 'https://api.github.com/users/vedantparmar12/events{/privacy}', 'received_events_url': 'https://api.github.com/users/vedantparmar12/received_events', 'type': 'User', 'user_view_t

----
#### **3. Pagination**

Many APIs return data in **chunks (or pages)** rather than sending everything at once. This prevents **overloading the server** and improves performance, especially for large datasets. To retrieve **all the data**, we need to make multiple requests and keep track of pages until we reach the last one.


In [6]:
import requests
from google.colab import userdata

API_TOKEN = userdata.get('ACCESS_TOKEN')
BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events"


page_number = 1

while True:
    response = requests.get(BASE_URL, headers={"Authorization": f"Bearer {API_TOKEN}"})
    page_data = response.json()

    next_page = response.links.get("next", {}).get("url")

    print(f'Got page {page_number} with {len(page_data)} records')
    print(page_data)

    if not next_page:
        break

    page_number += 1
    BASE_URL = next_page

Got page 1 with 30 records
[{'id': '52054879166', 'type': 'WatchEvent', 'actor': {'id': 212767187, 'login': 'reyaleman-tech', 'display_login': 'reyaleman-tech', 'gravatar_id': '', 'url': 'https://api.github.com/users/reyaleman-tech', 'avatar_url': 'https://avatars.githubusercontent.com/u/212767187?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2025-07-14T12:25:55Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '52051650046', 'type': 'ForkEvent', 'actor': {'id': 91944358, 'login': 'qkrwnscjf', 'display_login': 'qkrwnscjf', 'gravatar_id': '', 'url': 'https://api.github.com/users/qkrwnscjf', 'avatar_url': 'https://avatars.githubusercontent.com/u/91944358?'}, 'repo



**What happens here:**  

- Starts at **page 1** and makes a **GET request** to the API.  
- Retrieves **JSON data**.  
- Looks for the **"next" page URL** in the response headers.  
- If a **next page exists**, updates `BASE_URL` and requests more data.  
- If there‚Äôs **no next page**, stops fetching and ends the loop.

üí° Different APIs handle pagination differently (some use offsets, cursors, page numbers, or tokens instead of links).

---

#### **4. Avoiding memory issues during extraction**  

To prevent your pipeline from crashing, we need to control memory usage.  

#### **Challenges with memory**  
- Many pipelines run on systems with limited memory, like serverless functions or shared clusters.  
- If you try to load all the data into memory at once, it can crash the entire system.  
- Even disk space can become an issue if you‚Äôre storing large amounts of data.  




**The solution: batch processing / streaming data**  

**Streaming** means processing data in small chunks or events, rather than loading everything at once. This keeps memory usage low and ensures your pipeline remains efficient.

As a data engineer, we will use streaming to transfer data between buffers, such as:  
- from APIs to local files;  
- from Webhooks to event queues;  
- from Event queues (like Kafka) to storage buckets.


In [7]:
from itertools import islice


# This function creates a generator that produces limit number of dictionaries.
def get_rows(limit):
    # Instead of storing all rows in memory, it yields one row at a time (lazy evaluation).
    yield from map(lambda n: {"row": n}, range(limit))


def data_chunked():
    rows = get_rows(10000)

    # Uses islice(rows, 1000) to take chunks of 1,000 rows at a time.
    while item_slice := list(islice(rows, 1000)):
        print(f"Got chunk of length {len(item_slice)}")
        # Each chunk is yielded as a list for further processing.
        yield item_slice


data = next(data_chunked())

Got chunk of length 1000


How it works:
- Calls `get_rows(10000)`, generating 10,000 rows lazily.
- Uses `islice(rows, 1000)` to take chunks of 1,000 rows at a time.
- The `while` loop continues until all rows are processed.
- Each chunk is yielded as a list for further processing.
- Memory-efficient: Instead of keeping all 10,000 rows, it works with just 1,000 at a time.

In [8]:
import requests
from google.colab import userdata

API_TOKEN = userdata.get('ACCESS_TOKEN')
BASE_URL = "https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events"


def events_getter():
    page_number = 1
    url = BASE_URL

    while True:
        response = requests.get(url, headers={"Authorization": f"Bearer {API_TOKEN}"})
        page_data = response.json()
        yield page_data

        next_page = response.links.get("next", {}).get("url")

        if not next_page:
            break

        page_number += 1
        url = next_page


for events_page in events_getter():
    print(events_page)

[{'id': '52054879166', 'type': 'WatchEvent', 'actor': {'id': 212767187, 'login': 'reyaleman-tech', 'display_login': 'reyaleman-tech', 'gravatar_id': '', 'url': 'https://api.github.com/users/reyaleman-tech', 'avatar_url': 'https://avatars.githubusercontent.com/u/212767187?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2025-07-14T12:25:55Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '52051650046', 'type': 'ForkEvent', 'actor': {'id': 91944358, 'login': 'qkrwnscjf', 'display_login': 'qkrwnscjf', 'gravatar_id': '', 'url': 'https://api.github.com/users/qkrwnscjf', 'avatar_url': 'https://avatars.githubusercontent.com/u/91944358?'}, 'repo': {'id': 419661684, 'name'

In this approach to grabbing data from APIs, there are both pros and cons:  

‚úÖ Pros: **Easy memory management** since the API returns data in small pages or events.  
‚ùå Cons: **Low throughput** because data transfer is limited by API constraints (rate limits, response time).


## **Normalizing data**  

Data cleaning typically involves two key steps:  

1. **Normalizing data** ‚Äì Structuring and standardizing data **without changing its meaning**.  
2. **Filtering data for a specific use case** ‚Äì Selecting or modifying data **in a way that changes its meaning** to fit the analysis.






---

### **Why prepare data? Why not use JSON directly?**  

While JSON is a great format for **data transfer**, it‚Äôs not ideal for analysis. Here‚Äôs why:  

‚ùå **No enforced schema** ‚Äì We don‚Äôt always know what fields exist in a JSON document.  
‚ùå **Inconsistent data types** ‚Äì A field like `age` might appear as `25`, `"twenty five"`, or `25.00`, which can break downstream applications.  
‚ùå **Hard to process** ‚Äì If we need to group data by day, we must manually convert date strings to timestamps.  
‚ùå **Memory-heavy** ‚Äì JSON requires reading the entire file into memory, unlike databases or columnar formats that allow scanning just the necessary fields.  
‚ùå **Slow for aggregation and search** ‚Äì JSON is not optimized for quick lookups or aggregations like columnar formats (e.g., Parquet).  


JSON is great for **data exchange** but **not for direct analytical use**. To make data useful, we need to **normalize it** ‚Äî flattening, typing, and structuring it for efficiency.


In [9]:
event = events_page[0]
event

{'id': '51586324900',
 'type': 'PullRequestEvent',
 'actor': {'id': 206166622,
  'login': 'chaiandmushrooms',
  'display_login': 'chaiandmushrooms',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/chaiandmushrooms',
  'avatar_url': 'https://avatars.githubusercontent.com/u/206166622?'},
 'repo': {'id': 419661684,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'opened',
  'number': 734,
  'pull_request': {'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/pulls/734',
   'id': 2634176404,
   'node_id': 'PR_kwDOGQOHdM6dAleU',
   'html_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734',
   'diff_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734.diff',
   'patch_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734.patch',
   'issue_url': 'https://api.github.com/repos/DataTal

In [10]:
event.keys()

dict_keys(['id', 'type', 'actor', 'repo', 'payload', 'public', 'created_at', 'org'])

The data we retrieved from the API has **a nested JSON format**. Let's unnest it!

> It means that any **nested structures** (like dictionaries and lists) have to be flattened, to make it easier to store and query in a database or a dataframe.  


---

### **How to processed this data?**  

1Ô∏è‚É£ **Flatten nested fields:**  
   - For example, fields `actor`, `repo`, `payload` and `org` are nested and we should extract all the necessary data:  
     ```json
     'actor': {
        'id': 198386041,
        'login': 'Anqi0607',
        ...
     ```
     to:
     ```json
     'actor__id': 198386041,
     'actor__login': 'Anqi0607',
     ...
     ```

In [11]:
event

{'id': '51586324900',
 'type': 'PullRequestEvent',
 'actor': {'id': 206166622,
  'login': 'chaiandmushrooms',
  'display_login': 'chaiandmushrooms',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/chaiandmushrooms',
  'avatar_url': 'https://avatars.githubusercontent.com/u/206166622?'},
 'repo': {'id': 419661684,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'opened',
  'number': 734,
  'pull_request': {'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/pulls/734',
   'id': 2634176404,
   'node_id': 'PR_kwDOGQOHdM6dAleU',
   'html_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734',
   'diff_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734.diff',
   'patch_url': 'https://github.com/DataTalksClub/data-engineering-zoomcamp/pull/734.patch',
   'issue_url': 'https://api.github.com/repos/DataTal

In [12]:
def process_event(event):
  result = {}

  result['id'] = event['id']
  result['type'] = event['type']
  result['public'] = event['public']
  result['created_at'] = event['created_at']

  result['actor__id'] = event['actor']['id']
  result['actor__login'] = event['actor']['login']

  return result

In [13]:
processed_events = []

for event in events_page:
  processed_event = process_event(event)
  processed_events.append(processed_event)

processed_events

[{'id': '51586324900',
  'type': 'PullRequestEvent',
  'public': True,
  'created_at': '2025-07-02T04:04:13Z',
  'actor__id': 206166622,
  'actor__login': 'chaiandmushrooms'},
 {'id': '51586034442',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-02T03:51:53Z',
  'actor__id': 74387989,
  'actor__login': 'marcosetm'},
 {'id': '51584237360',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-02T02:28:26Z',
  'actor__id': 46380150,
  'actor__login': 'rdhwiya'},
 {'id': '51584123999',
  'type': 'ForkEvent',
  'public': True,
  'created_at': '2025-07-02T02:23:04Z',
  'actor__id': 15181448,
  'actor__login': 'jickingbackup'},
 {'id': '51583055421',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-02T01:32:40Z',
  'actor__id': 29785251,
  'actor__login': 'bmall0323'},
 {'id': '51583045809',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-02T01:32:21Z',
  'actor__id': 91744159,
  'actor__login': 'edilaine-as'},
 {'id': '51


2Ô∏è‚É£ **Convert timestamps:**  
   - Originally, timestamps might have been stored as ISO datetime strings:  
     ```json
     "created_at": "2024-06-12T14:28:46Z"
     ```
     You can store them as they are, but in some cases, you may need to **convert** them to timestamps.
   - Now, they are **formatted as Unix timestamp**:  
     ```json
     "created_at": "1718202526"
     ```


In [14]:
from datetime import datetime

def process_event(event):
  result = {}

  result['id'] = event['id']
  result['type'] = event['type']
  result['public'] = event['public']

  parsed_timestamp = datetime.fromisoformat(event['created_at'])
  result['created_at'] = parsed_timestamp.timestamp()

  result['actor__id'] = event['actor']['id']
  result['actor__login'] = event['actor']['login']

  return result


process_event(event)

{'id': '51546749269',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1751361680.0,
 'actor__id': 166155895,
 'actor__login': 'yajeddig'}


3Ô∏è‚É£ **Unnest lists:**  
   - The original structure might include a nested list:  
     ```json
      'is_template': False,
      'web_commit_signoff_required': False,
      'topics': ['data-engineering', 'dbt', 'docker', 'kafka', 'kestra', 'spark'],
      'visibility': 'public',
      'forks': 6093,
     ```
   - Since lists **cannot be stored directly in a database table**, they were likely **moved to a separate table**.  





In [15]:
from datetime import datetime
# datetime.fromisoformat timestamp()

def process_event(event):
  result = {}

  result['id'] = event['id']
  result['type'] = event['type']
  result['public'] = event['public']

  parsed_timestamp = datetime.fromisoformat(event['created_at'])
  result['created_at'] = parsed_timestamp.timestamp()

  result['actor__id'] = event['actor']['id']
  result['actor__login'] = event['actor']['login']

  return result

process_event(event)

{'id': '51546749269',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1751361680.0,
 'actor__id': 166155895,
 'actor__login': 'yajeddig'}

In [16]:
all_data = []

pages = events_getter()

for page in pages:
  all_data.extend(page)

len(all_data)

296

In [17]:
def process_event(event):
  result = {}

  result['id'] = event['id']
  result['type'] = event['type']
  result['public'] = event['public']

  parsed_timestamp = datetime.fromisoformat(event['created_at'])
  result['created_at'] = parsed_timestamp.timestamp()

  result['actor__id'] = event['actor']['id']
  result['actor__login'] = event['actor']['login']

  topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

  processed_topics = []
  for topic in topics:
    processed_topic = {
        'event_id': event['id'],
        'topic_name': topic
    }
    processed_topics.append(processed_topic)

  return result, processed_topics

In [18]:
processed_events = []
processed_topics = []

for event in all_data:
  processed_event, topics = process_event(event)
  processed_events.append(processed_event)
  processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

[{'id': '52054879166', 'type': 'WatchEvent', 'public': True, 'created_at': 1752495955.0, 'actor__id': 212767187, 'actor__login': 'reyaleman-tech'}, {'id': '52051650046', 'type': 'ForkEvent', 'public': True, 'created_at': 1752491421.0, 'actor__id': 91944358, 'actor__login': 'qkrwnscjf'}, {'id': '52050279855', 'type': 'WatchEvent', 'public': True, 'created_at': 1752489484.0, 'actor__id': 216563235, 'actor__login': 'stewart-h13'}, {'id': '52046949622', 'type': 'WatchEvent', 'public': True, 'created_at': 1752485009.0, 'actor__id': 141754322, 'actor__login': 'Vital-Ahishakiye'}, {'id': '52040127738', 'type': 'WatchEvent', 'public': True, 'created_at': 1752475068.0, 'actor__id': 173776489, 'actor__login': 'Kaelthas200'}]
[{'event_id': '51586324900', 'topic_name': 'data-engineering'}, {'event_id': '51586324900', 'topic_name': 'dbt'}, {'event_id': '51586324900', 'topic_name': 'docker'}, {'event_id': '51586324900', 'topic_name': 'kafka'}, {'event_id': '51586324900', 'topic_name': 'kestra'}]


### **Example: Loading data into database**  


`watch_events` ‚Üí Stores WatchEvent data

`pull_request_events` ‚Üí Stores PullRequestEvent data



In [19]:
import duckdb

# 1. Create a connection to a DuckDB database
conn = duckdb.connect("github_events.db")


# 2. Create the `github_events` Table
conn.execute("""
CREATE TABLE IF NOT EXISTS github_events (
    id TEXT PRIMARY KEY,
    type TEXT,
    public BOOLEAN,
    created_at DOUBLE,
    actor__id BIGINT,
    actor__login TEXT
);
""")

flattened_data = [
    (
        record["id"],
        record["type"],
        record["public"],
        record["created_at"],
        record["actor__id"],
        record["actor__login"]
    )
    for record in processed_events
]

# 3. Insert Data into the `github_events` Table
conn.executemany("""
INSERT INTO github_events (id, type, public, created_at, actor__id, actor__login)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING;
""", flattened_data)

print("Data successfully loaded into DuckDB!")


# Query and Print Data
df = conn.execute("SELECT * FROM github_events").df()

conn.close()

print("\nGitHub Events Data:")
df

Data successfully loaded into DuckDB!

GitHub Events Data:


Unnamed: 0,id,type,public,created_at,actor__id,actor__login
0,52054879166,WatchEvent,True,1.752496e+09,212767187,reyaleman-tech
1,52051650046,ForkEvent,True,1.752491e+09,91944358,qkrwnscjf
2,52050279855,WatchEvent,True,1.752489e+09,216563235,stewart-h13
3,52046949622,WatchEvent,True,1.752485e+09,141754322,Vital-Ahishakiye
4,52040127738,WatchEvent,True,1.752475e+09,173776489,Kaelthas200
...,...,...,...,...,...,...
291,51564153103,ForkEvent,True,1.751383e+09,193060242,AnassamzilUE
292,51562759595,ForkEvent,True,1.751381e+09,206166622,chaiandmushrooms
293,51558379325,WatchEvent,True,1.751376e+09,181355935,ramm14
294,51557019247,WatchEvent,True,1.751375e+09,218780560,genius-mushroom


Problems:

‚ùå **Schema management is manual** ‚Äì If the schema changes, you need to update table structures manually.  
‚ùå **No automatic retries** ‚Äì If the network fails, data may be lost.  
‚ùå **No incremental loading** ‚Äì Every run reloads everything, making it slow and expensive.  
‚ùå **More code to maintain** ‚Äì A simple pipeline quickly becomes complex.  


In [20]:
def process_event(event):
  result = {}

  result['id'] = event['id']
  result['type'] = event['type']
  result['public'] = event['public']

  parsed_timestamp = datetime.fromisoformat(event['created_at'])
  result['created_at'] = parsed_timestamp.timestamp()

  result['actor__id'] = event['actor']['id']
  result['actor__login'] = event['actor']['login']

  result['repo__id'] = event['repo']['id']

  topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

  processed_topics = []
  for topic in topics:
    processed_topic = {
        'event_id': event['id'],
        'topic_name': topic
    }
    processed_topics.append(processed_topic)

  return result, processed_topics



In [21]:
processed_events = []
processed_topics = []

for event in all_data:
  processed_event, topics = process_event(event)
  processed_events.append(processed_event)
  processed_topics.extend(topics)

print(processed_events[:5])
print(processed_topics[:5])

[{'id': '52054879166', 'type': 'WatchEvent', 'public': True, 'created_at': 1752495955.0, 'actor__id': 212767187, 'actor__login': 'reyaleman-tech', 'repo__id': 419661684}, {'id': '52051650046', 'type': 'ForkEvent', 'public': True, 'created_at': 1752491421.0, 'actor__id': 91944358, 'actor__login': 'qkrwnscjf', 'repo__id': 419661684}, {'id': '52050279855', 'type': 'WatchEvent', 'public': True, 'created_at': 1752489484.0, 'actor__id': 216563235, 'actor__login': 'stewart-h13', 'repo__id': 419661684}, {'id': '52046949622', 'type': 'WatchEvent', 'public': True, 'created_at': 1752485009.0, 'actor__id': 141754322, 'actor__login': 'Vital-Ahishakiye', 'repo__id': 419661684}, {'id': '52040127738', 'type': 'WatchEvent', 'public': True, 'created_at': 1752475068.0, 'actor__id': 173776489, 'actor__login': 'Kaelthas200', 'repo__id': 419661684}]
[{'event_id': '51586324900', 'topic_name': 'data-engineering'}, {'event_id': '51586324900', 'topic_name': 'dbt'}, {'event_id': '51586324900', 'topic_name': 'doc

In [22]:
import duckdb

# 1. Create a connection to DuckDB
conn = duckdb.connect("github_events.db")


# 2. Fetch current table schema
current_columns = {row[1] for row in conn.execute("PRAGMA table_info(github_events)").fetchall()}
print(current_columns)

# 3. Detect and add new columns dynamically
for record in processed_events:
    for key in record.keys():
        if key not in current_columns:
            col_type = "TEXT"  # Default type
            if isinstance(record[key], bool):
                col_type = "BOOLEAN"
            elif isinstance(record[key], int):
                col_type = "BIGINT"
            elif isinstance(record[key], float):
                col_type = "DOUBLE"
            print(f"ALTER TABLE github_events ADD COLUMN {key} {col_type};")
            alter_query = f"ALTER TABLE github_events ADD COLUMN {key} {col_type};"
            conn.execute(alter_query)
            print(f"Added new column: {key} ({col_type})")
            current_columns.add(key)  # Update schema tracking

# 4. Prepare data for insertion (handle missing fields)
columns = sorted(current_columns)  # Maintain consistent order
flattened_data = [
    tuple(record.get(col, None) for col in columns)  # Fill missing values with NULL
    for record in processed_events
]

# 5. Construct dynamic SQL for insertion
placeholders = ", ".join(["?" for _ in columns])
columns_str = ", ".join(columns)

insert_query = f"""
INSERT INTO github_events ({columns_str})
VALUES ({placeholders})
ON CONFLICT (id) DO UPDATE SET {", ".join(f"{col}=excluded.{col}" for col in columns if col != "id")};
"""

# 6. Insert data into DuckDB
conn.executemany(insert_query, flattened_data)
print("Data successfully loaded into DuckDB with schema updates!")

# 7. Query the table
df = conn.execute("""SELECT * FROM github_events""").df()

conn.close()

print("\nGitHub Events Data:")
df

{'id', 'actor__login', 'created_at', 'type', 'public', 'actor__id'}
ALTER TABLE github_events ADD COLUMN repo__id BIGINT;
Added new column: repo__id (BIGINT)
Data successfully loaded into DuckDB with schema updates!

GitHub Events Data:


Unnamed: 0,id,type,public,created_at,actor__id,actor__login,repo__id
0,52054879166,WatchEvent,True,1.752496e+09,212767187,reyaleman-tech,419661684
1,52051650046,ForkEvent,True,1.752491e+09,91944358,qkrwnscjf,419661684
2,52050279855,WatchEvent,True,1.752489e+09,216563235,stewart-h13,419661684
3,52046949622,WatchEvent,True,1.752485e+09,141754322,Vital-Ahishakiye,419661684
4,52040127738,WatchEvent,True,1.752475e+09,173776489,Kaelthas200,419661684
...,...,...,...,...,...,...,...
291,51564153103,ForkEvent,True,1.751383e+09,193060242,AnassamzilUE,419661684
292,51562759595,ForkEvent,True,1.751381e+09,206166622,chaiandmushrooms,419661684
293,51558379325,WatchEvent,True,1.751376e+09,181355935,ramm14,419661684
294,51557019247,WatchEvent,True,1.751375e+09,218780560,genius-mushroom,419661684



#### **What‚Äôs happening here?**
| Step | Action |
|----------|-------------|
| üõ† **1** | Connects to DuckDB |
| üìú **2** | Fetches current schema from DuckDB |
| üèó **3** | Detects and **adds new columns dynamically** |
| üîÑ **4** | Fills missing values with `NULL` to ensure smooth inserts |
| üìå **5** | Dynamically constructs an **INSERT or UPDATE (`UPSERT`)** query |
| üìù **6** | Inserts new data while updating existing records |
| üìä **7** | Queries the data |

---

