# TGDataset (Zenodo 7640712)

This notebook provides a **streaming-friendly** overview of the TGDataset Telegram channels dataset (Zenodo record `7640712`).

Assumptions:
- The dataset is already **mounted** on the server.
- Your project has a `src/` folder and `src.core.config` exposes `SETTINGS.DATA_ROOT` (same pattern as your Higgs notebook).

What you'll get:
- file discovery + disk footprint
- format sniffing (JSONL vs JSON array)
- safe sampling and optional full scan for counts/time ranges
- a quick schema/key summary to help downstream parsing


In [1]:
# ============================================================
# 0) Project setup (same pattern as Higgs notebook)
# ============================================================
from pathlib import Path
import sys

def find_repo_root(start: Path | None = None) -> Path:
    start = (start or Path.cwd()).resolve()
    for p in [start, *start.parents]:
        if (p / "src").is_dir():
            return p
    raise FileNotFoundError("Could not find repo root containing `src/`")

REPO_ROOT = find_repo_root()
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from src.core.config import SETTINGS  # noqa: E402

print("REPO_ROOT:", REPO_ROOT)
print("DATA_ROOT:", SETTINGS.DATA_ROOT)


REPO_ROOT: /mnt/c/Users/rescic/PycharmProjects/dezinfo-datasets
DATA_ROOT: /home/rescic/dezinfo_data


## 1) Point to the mounted dataset archive

No downloads here. You’ve mounted TGDataset on your server, and the dataset is stored as a **`.tar.gz` archive**.

In this section we:
1. point to the mounted folder under `SETTINGS.DATA_ROOT`, and
2. locate the `.tar.gz` file **without extracting it**.


In [2]:
# ============================================================
# 1) Mounted dataset path (NO downloading / NO extracting)
# ============================================================
DATASET_SUBDIR = "tgdataset"   # <-- mount folder name
DATASET_DIR = Path(SETTINGS.DATA_ROOT) / DATASET_SUBDIR
assert DATASET_DIR.exists(), f"Mounted dataset folder not found: {DATASET_DIR}"
print("DATASET_DIR:", DATASET_DIR)

# Find the .tar.gz archive in the mounted folder
ARCHIVES = sorted(DATASET_DIR.glob("*.tar.gz"))
assert ARCHIVES, f"No .tar.gz files found in {DATASET_DIR}"
TAR_PATH = ARCHIVES[0]  # if you have multiple archives, pick the right one here
print("TAR_PATH:", TAR_PATH)


DATASET_DIR: /home/rescic/dezinfo_data/tgdataset
TAR_PATH: /home/rescic/dezinfo_data/tgdataset/TGDataset_1.tar.gz


## 2) Discover files inside the `.tar.gz` and disk footprint

Because the dataset is archived, we can’t use `DATASET_DIR.rglob(...)`.

Instead, we list members **inside** the tarball and:
- filter for JSON-like payload files (`.json`, `.jsonl`, `.ndjson`)
- compute a size breakdown by top-level folder *inside the archive*

This gives you a quick sense of scale without extracting anything.


In [None]:
from collections import defaultdict
from pathlib import Path
import tarfile

def sizeof_fmt(num: int, suffix="B") -> str:
    for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi"]:
        if abs(num) < 1024.0:
            return f"{num:3.1f} {unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f} Ei{suffix}"

json_exts = {".json", ".jsonl", ".ndjson"}

json_members = []
folder_bytes = defaultdict(int)
total_bytes = 0

with tarfile.open(TAR_PATH, mode="r:gz") as tf:
    for m in tf.getmembers():
        if not m.isfile():
            continue
        suffix = Path(m.name).suffix.lower()
        if suffix in json_exts:
            json_members.append(m)
            sz = m.size
            total_bytes += sz

            parts = Path(m.name).parts
            top = parts[0] if parts else "."
            folder_bytes[top] += sz

print("JSON-like files found (inside tar):", len(json_members))
assert json_members, "No JSON-like files were found inside the .tar.gz. Check your archive / mount path."

print("Total JSON footprint (inside tar):", sizeof_fmt(total_bytes))

print("\nTop-level folder breakdown (inside tar):")
for k, v in sorted(folder_bytes.items(), key=lambda kv: kv[1], reverse=True):
    print(f"  - {k:20s} {sizeof_fmt(v)}")

print("\nExample files (inside tar):")
for m in sorted(json_members, key=lambda x: x.name)[:12]:
    print(" ", m.name)


## 3) Peek at file format (JSON array vs JSONL) — inside the archive

TGDataset JSON payloads may be stored either as:
- **JSONL/NDJSON** (one object per line), or
- a **single JSON array** (file begins with `[`).

Here we “sniff” the first few KB of a **file inside the tarball** to guess its format.
This is a lightweight check and does not parse the full file.


In [None]:
from pathlib import Path
import tarfile
import io

def sniff_json_member(tar_path: Path, member_name: str, max_bytes: int = 4096) -> dict:
    with tarfile.open(tar_path, mode="r:gz") as tf:
        m = tf.getmember(member_name)
        f = tf.extractfile(m)
        assert f is not None, f"Could not extract member: {member_name}"
        raw = f.read(max_bytes)

    ws = b" \t\r\n"
    i = 0
    while i < len(raw) and raw[i:i+1] in ws:
        i += 1

    first = raw[i:i+1].decode("utf-8", errors="replace") if i < len(raw) else ""
    text = raw.decode("utf-8", errors="replace")

    looks_like_array = first == "["
    looks_like_object = first == "{"
    looks_like_jsonl = ("\n" in text and ("{" in text)) and not looks_like_array

    return {
        "member": member_name,
        "first_non_ws_char": first,
        "looks_like_array": looks_like_array,
        "looks_like_object": looks_like_object,
        "looks_like_jsonl": looks_like_jsonl,
    }

# Pick a representative file to sniff
sample_member = sorted(json_members, key=lambda x: x.name)[0].name
sniff = sniff_json_member(TAR_PATH, sample_member)
sniff


## 4) Streaming iteration over channel objects (from inside `.tar.gz`)

Goal: iterate channel objects without extracting the archive or loading huge files into memory.

We support two common encodings:
- **JSONL**: stream line-by-line with `json.loads`
- **JSON array**: stream items with `ijson` (recommended)

Everything below reads file members directly from the tar archive via `tarfile.extractfile(...)`.


In [None]:
import json
import tarfile
import io
from typing import Dict, Iterator, Any, Optional

def iter_jsonl_objects_from_tar(tar_path: Path, member_name: str) -> Iterator[Dict[str, Any]]:
    with tarfile.open(tar_path, mode="r:gz") as tf:
        m = tf.getmember(member_name)
        f = tf.extractfile(m)
        assert f is not None, f"Could not extract member: {member_name}"
        txt = io.TextIOWrapper(f, encoding="utf-8", errors="replace")
        for line in txt:
            line = line.strip()
            if not line:
                continue
            yield json.loads(line)

def iter_json_array_objects_with_ijson_from_tar(
    tar_path: Path,
    member_name: str,
    array_item_prefix: str = "item",
) -> Iterator[Dict[str, Any]]:
    try:
        import ijson  # type: ignore
    except Exception as e:
        raise ImportError(
            "This file looks like a JSON array. Install ijson to stream it: `pip install ijson`"
        ) from e

    with tarfile.open(tar_path, mode="r:gz") as tf:
        m = tf.getmember(member_name)
        f = tf.extractfile(m)
        assert f is not None, f"Could not extract member: {member_name}"
        for obj in ijson.items(f, array_item_prefix):
            yield obj

def iter_channel_objects_from_tar(tar_path: Path, member_name: str, sniff: Optional[dict] = None):
    sniff = sniff or sniff_json_member(tar_path, member_name)
    if sniff.get("looks_like_array"):
        return iter_json_array_objects_with_ijson_from_tar(tar_path, member_name)
    return iter_jsonl_objects_from_tar(tar_path, member_name)

# Quick demo: iterate a few objects from the sample member
it = iter_channel_objects_from_tar(TAR_PATH, sample_member, sniff=sniff)
demo = []
for _, obj in zip(range(3), it):
    demo.append(obj)
demo


## 5) Dataset stats (sample mode by default) — scanning members in the tarball

TGDataset can be very large. A full scan may take a while.

We provide two modes:
- **Sample scan** (default): quick scan over the first `n_files` members, limited objects per member.
- **Full scan**: set `full_scan=True` to scan all JSON-like members.

We try (best-effort) to estimate:
- number of channel objects scanned
- number of messages scanned (if a channel has a list field like `messages`)
- approximate min/max timestamps (if timestamps are present)

Because TGDataset schema variants exist, you may need to adjust `iter_messages_from_channel()`.


In [None]:
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Any, Dict

@dataclass
class ScanConfig:
    full_scan: bool = False
    n_files: int = 5
    n_channels_per_file: int = 200
    n_messages_per_channel: int = 2000  # cap in sample mode

CFG = ScanConfig(full_scan=False)

def parse_any_datetime(x) -> Optional[datetime]:
    if x is None:
        return None
    if isinstance(x, (int, float)):
        ts = float(x)
        if ts > 1e12:  # ms -> s
            ts /= 1000.0
        try:
            return datetime.utcfromtimestamp(ts)
        except Exception:
            return None
    if isinstance(x, str):
        # try python's ISO parser first
        try:
            return datetime.fromisoformat(x.replace("Z", "+00:00"))
        except Exception:
            return None
    return None

def iter_messages_from_channel(channel: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
    if isinstance(channel.get("messages"), list):
        return channel["messages"]
    if isinstance(channel.get("channel"), dict) and isinstance(channel["channel"].get("messages"), list):
        return channel["channel"]["messages"]
    return []

def extract_message_timestamp(msg: Dict[str, Any]) -> Optional[datetime]:
    for k in ("date", "datetime", "timestamp", "ts", "time", "created_at"):
        if k in msg:
            dt = parse_any_datetime(msg.get(k))
            if dt:
                return dt
    return None

members_sorted = sorted(json_members, key=lambda x: x.name)
members_to_scan = members_sorted if CFG.full_scan else members_sorted[:CFG.n_files]

channels_scanned = 0
messages_scanned = 0
min_dt = None
max_dt = None

for m in members_to_scan:
    member_name = m.name
    sniff_m = sniff_json_member(TAR_PATH, member_name)
    it = iter_channel_objects_from_tar(TAR_PATH, member_name, sniff=sniff_m)

    for i, channel in enumerate(it):
        if not CFG.full_scan and i >= CFG.n_channels_per_file:
            break

        channels_scanned += 1

        msgs = list(iter_messages_from_channel(channel))
        if not CFG.full_scan:
            msgs = msgs[:CFG.n_messages_per_channel]

        messages_scanned += len(msgs)

        for msg in msgs:
            if not isinstance(msg, dict):
                continue
            dt = extract_message_timestamp(msg)
            if not dt:
                continue
            min_dt = dt if (min_dt is None or dt < min_dt) else min_dt
            max_dt = dt if (max_dt is None or dt > max_dt) else max_dt

print("Members scanned:", len(members_to_scan))
print("Channels scanned:", channels_scanned)
print("Messages scanned (best-effort):", messages_scanned)
print("Min timestamp (best-effort):", min_dt)
print("Max timestamp (best-effort):", max_dt)


## 6) Schema/key summary from sampled channels (tar-aware)

Different TGDataset dumps can vary slightly in which top-level fields are present per channel object.

We sample channel objects across a few archive members and summarize:
- most common top-level keys
- a few likely identifiers (username/id/title)

This helps you adapt parsing and feature extraction quickly.


In [None]:
from collections import Counter
from itertools import islice

def sample_channels_from_tar(members, n_total: int = 200, per_member: int = 50) -> list[dict]:
    out = []
    for m in members:
        member_name = m.name if hasattr(m, "name") else str(m)
        try:
            sniff_m = sniff_json_member(TAR_PATH, member_name)
            it = iter_channel_objects_from_tar(TAR_PATH, member_name, sniff=sniff_m)
            for ch in islice(it, per_member):
                out.append(ch)
                if len(out) >= n_total:
                    return out
        except Exception:
            continue
    return out

sample = sample_channels_from_tar(members_to_scan, n_total=200, per_member=50)
print("Sampled channels:", len(sample))

key_counts = Counter()
id_like = Counter()
title_like = Counter()

for ch in sample:
    if not isinstance(ch, dict):
        continue
    key_counts.update(ch.keys())

    for k in ("id", "channel_id", "chat_id", "username", "user", "name"):
        if k in ch and ch.get(k) not in (None, ""):
            id_like[k] += 1
    for k in ("title", "channel_title", "display_name", "about", "description"):
        if k in ch and ch.get(k) not in (None, ""):
            title_like[k] += 1

print("\nMost common top-level keys:")
for k, v in key_counts.most_common(25):
    print(f"  - {k:25s} {v}")

print("\nLikely identifier fields (presence counts):")
for k, v in id_like.most_common():
    print(f"  - {k:25s} {v}")

print("\nLikely title/description fields (presence counts):")
for k, v in title_like.most_common():
    print(f"  - {k:25s} {v}")
