# DBLP Data Engineering & ETL Pipeline

**Team Member:** Truc Le  
**Task:** Data Engineering & Infrastructure  
**Date:** December 1, 2025

## Overview

This notebook implements the ETL (Extract, Transform, Load) pipeline for the DBLP research dataset:

1. **Extract:** Ingest JSON shards from DBLP dataset
2. **Transform:** Normalize schema, build citation and coauthor edges
3. **Load:** Store cleaned data as Parquet files

## Deliverables

- ✅ Cleaned datasets (papers, authorships, citations, coauthorships)
- ✅ Schema normalization and data quality checks
- ✅ Citation network edges (directed)
- ✅ Coauthor network edges (undirected)
- ✅ Parquet storage with partitioning
- ✅ Data dictionary and profiling

---

## 1. Setup and Imports

In [1]:
# Import required libraries
import json
import pandas as pd
import numpy as np
from pathlib import Path
import unicodedata
import re
from typing import Dict, List, Tuple
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)

print("✓ Libraries imported successfully")
print(f"Pandas version: {pd.__version__}")

ModuleNotFoundError: No module named 'pandas'

## 2. Configuration and Paths

In [None]:
# Configuration
INPUT_DIR = Path("../dblp-ref")
OUTPUT_DIR = Path("../data/parquet")
CHUNK_SIZE = 50000  # Records per output part file
MAX_RECORDS = 0  # Set to 0 for full dataset, or limit for testing

# Create output directories
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
for table in ['papers', 'authorships', 'citations', 'coauthorships']:
    (OUTPUT_DIR / table).mkdir(parents=True, exist_ok=True)

print(f"Input directory: {INPUT_DIR.absolute()}")
print(f"Output directory: {OUTPUT_DIR.absolute()}")
print(f"Chunk size: {CHUNK_SIZE:,} records")
print(f"Max records: {MAX_RECORDS:,}" if MAX_RECORDS > 0 else "Max records: ALL")

Input directory: /Users/julio/Library/CloudStorage/OneDrive-UniversityOfHouston/0. Fall 2025/COSC3337/GitProject/COSC-3337-Project/notebooks/../dblp-ref
Output directory: /Users/julio/Library/CloudStorage/OneDrive-UniversityOfHouston/0. Fall 2025/COSC3337/GitProject/COSC-3337-Project/notebooks/../data/parquet
Chunk size: 50,000 records
Max records: ALL


## 3. Data Normalization Functions

These functions handle schema normalization and data cleaning:

In [None]:
def normalize_author(name: str) -> str:
    """
    Normalize author names for deduplication:
    - Lowercase
    - Strip whitespace
    - Collapse multiple spaces
    - Remove Unicode accents
    """
    if name is None:
        return None
    
    # Strip and lowercase
    s = name.strip().lower()
    
    # Collapse spaces
    s = re.sub(r"\s+", " ", s)
    
    # Remove accents using Unicode normalization
    s = unicodedata.normalize("NFKD", s)
    s = "".join(ch for ch in s if not unicodedata.combining(ch))
    
    return s

# Test the function
test_names = ["José García-López  ", "JOHN SMITH", "Marie-Claire Dubois"]
for name in test_names:
    print(f"{name:30} → {normalize_author(name)}")

José García-López              → jose garcia-lopez
JOHN SMITH                     → john smith
Marie-Claire Dubois            → marie-claire dubois


## 4. ETL Processing Function

This function processes batches of JSON records and generates four tables:

In [None]:
def process_records(records: List[Dict]) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Process a batch of JSON records into four tables:
    1. papers - publication metadata
    2. authorships - author-paper relationships
    3. citations - citation edges (directed)
    4. coauthorships - coauthor edges (undirected)
    """
    papers_rows = []
    authorships_rows = []
    citations_rows = []
    coauthor_rows = []

    for rec in records:
        pid = rec.get("id")
        if not pid:
            continue
        
        # Extract fields
        title = rec.get("title")
        venue = rec.get("venue")
        year = rec.get("year")
        abstract = rec.get("abstract")
        n_citation = rec.get("n_citation")
        refs = rec.get("references") or []
        authors = rec.get("authors") or []

        # Compute derived fields
        abstract_len = len(abstract) if isinstance(abstract, str) else None
        ref_count = len(refs) if isinstance(refs, list) else 0
        author_count = len(authors) if isinstance(authors, list) else 0

        # Papers table
        papers_rows.append({
            "id": pid,
            "title": title,
            "venue": venue if venue is not None else None,
            "year": int(year) if isinstance(year, int) or (isinstance(year, str) and year.isdigit()) else None,
            "n_citation": int(n_citation) if isinstance(n_citation, int) or (isinstance(n_citation, str) and n_citation.isdigit()) else None,
            "abstract": abstract if isinstance(abstract, str) else None,
            "abstract_len": abstract_len,
            "ref_count": ref_count,
            "author_count": author_count,
        })

        # Authorships table + build coauthor edges
        normalized_authors = []
        for pos, a in enumerate(authors):
            norm_name = normalize_author(a) if isinstance(a, str) else None
            authorships_rows.append({
                "paper_id": pid,
                "author_name": a,
                "author_position": pos,
                "author_norm": norm_name,
            })
            if norm_name:
                normalized_authors.append(norm_name)

        # Coauthor edges (undirected, all pairs)
        for i in range(len(normalized_authors)):
            for j in range(i + 1, len(normalized_authors)):
                a1, a2 = normalized_authors[i], normalized_authors[j]
                # Sort for consistency (undirected)
                if a1 > a2:
                    a1, a2 = a2, a1
                coauthor_rows.append({
                    "author1_norm": a1,
                    "author2_norm": a2,
                    "paper_id": pid,
                    "year": papers_rows[-1]["year"],
                    "venue": venue if venue is not None else None,
                })

        # Citations table (directed edges)
        for dst in refs:
            if dst:
                citations_rows.append({
                    "src_id": pid,
                    "dst_id": dst,
                    "src_year": papers_rows[-1]["year"],
                    "src_venue": venue if venue is not None else None,
                })

    return (
        pd.DataFrame.from_records(papers_rows),
        pd.DataFrame.from_records(authorships_rows),
        pd.DataFrame.from_records(citations_rows),
        pd.DataFrame.from_records(coauthor_rows)
    )

print("✓ Processing function defined")

✓ Processing function defined


## 5. Run ETL Pipeline

Load JSON shards and process them in chunks:

In [None]:
# Find input files
input_files = sorted(INPUT_DIR.glob("dblp-ref-*.json"))
print(f"Found {len(input_files)} input files:")
for f in input_files:
    print(f"  - {f.name}")

# Initialize counters
total_records = 0
part_num = {'papers': 0, 'authorships': 0, 'citations': 0, 'coauthorships': 0}
buffer = []

print(f"\nProcessing with chunk size: {CHUNK_SIZE:,}")
print("="*60)

# Process files
for file_path in input_files:
    with open(file_path, 'r', encoding='utf-8') as fh:
        for line in tqdm(fh, desc=f"Processing {file_path.name}"):
            line = line.strip()
            if not line:
                continue
            
            try:
                rec = json.loads(line)
            except json.JSONDecodeError:
                continue
                
            buffer.append(rec)
            total_records += 1
            
            # Process chunk
            if len(buffer) >= CHUNK_SIZE:
                papers_df, auth_df, cite_df, coauth_df = process_records(buffer)
                
                # Save to parquet
                part_num['papers'] += 1
                papers_df.to_parquet(OUTPUT_DIR / 'papers' / f'part-{part_num["papers"]:05d}.parquet', index=False)
                
                if not auth_df.empty:
                    part_num['authorships'] += 1
                    auth_df.to_parquet(OUTPUT_DIR / 'authorships' / f'part-{part_num["authorships"]:05d}.parquet', index=False)
                
                if not cite_df.empty:
                    part_num['citations'] += 1
                    cite_df.to_parquet(OUTPUT_DIR / 'citations' / f'part-{part_num["citations"]:05d}.parquet', index=False)
                
                if not coauth_df.empty:
                    part_num['coauthorships'] += 1
                    coauth_df.to_parquet(OUTPUT_DIR / 'coauthorships' / f'part-{part_num["coauthorships"]:05d}.parquet', index=False)
                
                buffer.clear()
            
            # Stop if max records reached
            if MAX_RECORDS > 0 and total_records >= MAX_RECORDS:
                break
    
    if MAX_RECORDS > 0 and total_records >= MAX_RECORDS:
        break

# Process remaining buffer
if buffer:
    papers_df, auth_df, cite_df, coauth_df = process_records(buffer)
    
    part_num['papers'] += 1
    papers_df.to_parquet(OUTPUT_DIR / 'papers' / f'part-{part_num["papers"]:05d}.parquet', index=False)
    
    if not auth_df.empty:
        part_num['authorships'] += 1
        auth_df.to_parquet(OUTPUT_DIR / 'authorships' / f'part-{part_num["authorships"]:05d}.parquet', index=False)
    
    if not cite_df.empty:
        part_num['citations'] += 1
        cite_df.to_parquet(OUTPUT_DIR / 'citations' / f'part-{part_num["citations"]:05d}.parquet', index=False)
    
    if not coauth_df.empty:
        part_num['coauthorships'] += 1
        coauth_df.to_parquet(OUTPUT_DIR / 'coauthorships' / f'part-{part_num["coauthorships"]:05d}.parquet', index=False)

print(f"\n{'='*60}")
print(f"✓ ETL Complete!")
print(f"Total records processed: {total_records:,}")
print(f"\nPart files created:")
for table, count in part_num.items():
    print(f"  {table}: {count} parts")

Found 4 input files:
  - dblp-ref-0.json
  - dblp-ref-1.json
  - dblp-ref-2.json
  - dblp-ref-3.json

Processing with chunk size: 50,000


Processing dblp-ref-0.json: 0it [00:00, ?it/s]

Processing dblp-ref-0.json: 1000000it [00:29, 34117.97it/s]
Processing dblp-ref-1.json: 1000000it [00:31, 31809.71it/s]
Processing dblp-ref-2.json: 1000000it [00:35, 28139.72it/s]
Processing dblp-ref-3.json: 79007it [00:01, 40196.57it/s] 



✓ ETL Complete!
Total records processed: 3,079,007

Part files created:
  papers: 62 parts
  authorships: 62 parts
  citations: 62 parts
  coauthorships: 62 parts


## 6. Verify Output and Data Quality

Load the generated datasets and perform basic validation:

In [None]:
# Load all generated tables
papers = pd.read_parquet(OUTPUT_DIR / 'papers')
authorships = pd.read_parquet(OUTPUT_DIR / 'authorships')
citations = pd.read_parquet(OUTPUT_DIR / 'citations')
coauthorships = pd.read_parquet(OUTPUT_DIR / 'coauthorships')

print("Dataset Statistics:")
print("="*60)
print(f"Papers:         {len(papers):>10,} records")
print(f"Authorships:    {len(authorships):>10,} records")
print(f"Citations:      {len(citations):>10,} records")
print(f"Coauthorships:  {len(coauthorships):>10,} records")
print(f"\nUnique authors: {authorships['author_norm'].nunique():>10,}")
print(f"Unique venues:  {papers['venue'].nunique():>10,}")

# Display sample from each table
print("\n" + "="*60)
print("Sample from Papers table:")
print(papers.head(3))

Dataset Statistics:
Papers:          3,079,007 records
Authorships:     9,476,165 records
Citations:      25,166,994 records
Coauthorships:  14,724,453 records

Unique authors:  1,751,941
Unique venues:       5,079

Sample from Papers table:
                                     id  \
0  00127ee2-cb05-48ce-bc49-9de556b93346   
1  001c58d3-26ad-46b3-ab3a-c1e557d16821   
2  001c8744-73c4-4b04-9364-22d31a10dbf1   

                                               title  \
0  Preliminary Design of a Network Protocol Learn...   
1  A methodology for the physically accurate visu...   
2  Comparison of GARCH, Neural Network and Suppor...   

                                               venue  year  n_citation  \
0  international conference on human-computer int...  2013           0   
1            visual analytics science and technology  2011          50   
2       pattern recognition and machine intelligence  2009          50   

                                            abstract  abstract_

In [None]:
# Quality checks
print("\nData Quality Checks:")
print("="*60)

# Check 1: Referential integrity (authorships → papers)
auth_paper_ids = set(authorships['paper_id'])
paper_ids = set(papers['id'])
orphaned_auth = auth_paper_ids - paper_ids
print(f"✓ Authorships referential integrity: {len(orphaned_auth) == 0}")
if orphaned_auth:
    print(f"  Warning: {len(orphaned_auth)} orphaned authorships")

# Check 2: Citations source integrity
cite_src_ids = set(citations['src_id'])
orphaned_cite = cite_src_ids - paper_ids
print(f"✓ Citations source integrity: {len(orphaned_cite) == 0}")
if orphaned_cite:
    print(f"  Warning: {len(orphaned_cite)} orphaned citation sources")

# Check 3: Missing values
print(f"\nMissing Values:")
print(f"  Abstracts: {papers['abstract'].isna().sum():,} ({papers['abstract'].isna().sum()/len(papers)*100:.1f}%)")
print(f"  Venues: {papers['venue'].isna().sum():,} ({papers['venue'].isna().sum()/len(papers)*100:.1f}%)")
print(f"  Years: {papers['year'].isna().sum():,} ({papers['year'].isna().sum()/len(papers)*100:.1f}%)")

# Check 4: Data ranges
print(f"\nData Ranges:")
print(f"  Year range: {papers['year'].min():.0f} - {papers['year'].max():.0f}")
print(f"  Citations per paper (mean): {papers['n_citation'].mean():.2f}")
print(f"  Authors per paper (mean): {papers['author_count'].mean():.2f}")
print(f"  References per paper (mean): {papers['ref_count'].mean():.2f}")


Data Quality Checks:
✓ Authorships referential integrity: True
✓ Citations source integrity: True

Missing Values:
  Abstracts: 530,475 (17.2%)
  Venues: 0 (0.0%)
  Years: 0 (0.0%)

Data Ranges:
  Year range: 1936 - 2018
  Citations per paper (mean): 35.22
  Authors per paper (mean): 3.08
  References per paper (mean): 8.17


## 7. Summary and Next Steps

### Deliverables Completed ✓

1. **Cleaned Datasets** stored in `../data/parquet/`:
   - `papers/` - Core publication metadata
   - `authorships/` - Author-paper relationships with normalized names
   - `citations/` - Directed citation network edges
   - `coauthorships/` - Undirected collaboration network edges

2. **Schema Normalization**:
   - Type validation and conversion
   - Author name normalization (Unicode, case-folding)
   - Derived metrics (abstract_len, ref_count, author_count)

3. **Network Edge Construction**:
   - Citation edges: Directed, with temporal/venue metadata
   - Coauthor edges: Undirected, sorted pairs for consistency

### For Teammates

**Next notebooks to complete:**
- `02_data_profiling_analysis.ipynb` - Comprehensive profiling and visualization
- `03_network_analysis_template.ipynb` - Network metrics and community detection
- `04_topic_modeling.ipynb` - TF-IDF and topic extraction from abstracts
- `05_visualization_report.ipynb` - Generate figures and tables for final report

**Key files:**
- Data dictionary: `../docs/data_dictionary.md`
- Query utilities: `../scripts/query_utils.py`
- See README.md for usage instructions