# Bronze Layer Processing Tutorial

This notebook demonstrates how to process raw MIND dataset files into the Bronze layer of our medallion architecture.

## What is the Bronze Layer?

The Bronze layer is the first transformation layer in our medallion architecture. It:
- Converts raw TSV files to Parquet format
- Performs initial data validation and cleansing
- Standardizes data schemas
- Preserves original data while making it more accessible

## Pipeline Steps

1. Load the raw TSV files from MINDLarge
2. Parse and validate data
3. Convert to optimized Parquet format
4. Store with metadata

Let's get started!

In [None]:
# Import libraries
import os
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import json
from pathlib import Path
import time

# Set paths
BASE_DIR = Path("../..").resolve()
MIND_DATASET_PATH = BASE_DIR / "MINDLarge"
BRONZE_PATH = BASE_DIR / "ml_pipeline" / "data" / "bronze"

# Create output directory if it doesn't exist
os.makedirs(BRONZE_PATH, exist_ok=True)

## 1. Load and Process News Data

First, we'll process the news data from TSV to Parquet format.

In [None]:
def process_news_data(source_path, output_path, split_name):
    """Process news data from TSV to Parquet format."""
    start_time = time.time()
    
    # Define the column names for news data
    news_columns = [
        'news_id', 'category', 'subcategory', 'title', 'abstract', 'url', 
        'title_entities', 'abstract_entities'
    ]
    
    # Read the TSV file
    news_file = source_path / f"MINDlarge_{split_name}" / "news.tsv"
    print(f"Processing {news_file}...")
    
    news_df = pd.read_csv(
        news_file, 
        sep='\t', 
        names=news_columns,
        quoting=3
    )
    
    # Process entity columns (convert string to JSON)
    for col in ['title_entities', 'abstract_entities']:
        news_df[col] = news_df[col].apply(
            lambda x: json.loads(x) if pd.notna(x) and x.strip() else []
        )
    
    # Create text column by combining title and abstract
    news_df['text'] = news_df.apply(
        lambda row: f"{row['title']} {row['abstract']}" if pd.notna(row['abstract']) else row['title'],
        axis=1
    )
    
    # Add metadata columns
    news_df['source'] = 'mind'
    news_df['split'] = split_name
    
    # Write to Parquet format
    output_file = output_path / f"news_{split_name}.parquet"
    news_df.to_parquet(output_file, index=False)
    
    print(f"Processed {len(news_df)} news articles in {time.time() - start_time:.2f} seconds")
    print(f"Output saved to {output_file}")
    
    return news_df

## 2. Load and Process Behaviors Data

Next, we'll process the user behavior data.

In [None]:
def process_behaviors_data(source_path, output_path, split_name):
    """Process behaviors data from TSV to Parquet format."""
    start_time = time.time()
    
    # Define the column names for behaviors data
    behaviors_columns = [
        'impression_id', 'user_id', 'time', 'history', 'impressions'
    ]
    
    # Read the TSV file
    behaviors_file = source_path / f"MINDlarge_{split_name}" / "behaviors.tsv"
    print(f"Processing {behaviors_file}...")
    
    behaviors_df = pd.read_csv(
        behaviors_file, 
        sep='\t', 
        names=behaviors_columns
    )
    
    # Convert timestamp to datetime
    behaviors_df['time'] = pd.to_datetime(behaviors_df['time'])
    
    # Process history column (list of news IDs)
    behaviors_df['history'] = behaviors_df['history'].apply(
        lambda x: x.split() if pd.notna(x) and x.strip() else []
    )
    
    # Add metadata columns
    behaviors_df['split'] = split_name
    
    # Write to Parquet format
    output_file = output_path / f"behaviors_{split_name}.parquet"
    behaviors_df.to_parquet(output_file, index=False)
    
    print(f"Processed {len(behaviors_df)} behavior records in {time.time() - start_time:.2f} seconds")
    print(f"Output saved to {output_file}")
    
    return behaviors_df

## 3. Process Entity Embeddings

Finally, we'll convert the entity embeddings to Arrow format for efficient storage and retrieval.

In [None]:
def process_entity_embeddings(source_path, output_path, split_name):
    """Process entity embeddings from VEC to Arrow format."""
    start_time = time.time()
    
    # Read entity embeddings
    entity_file = source_path / f"MINDlarge_{split_name}" / "entity_embedding.vec"
    relation_file = source_path / f"MINDlarge_{split_name}" / "relation_embedding.vec"
    
    # Process entity embeddings
    if entity_file.exists():
        print(f"Processing {entity_file}...")
        
        # Read the first line to get dimensions
        with open(entity_file, 'r') as f:
            first_line = f.readline().strip().split(' ')
            num_entities, embedding_dim = int(first_line[0]), int(first_line[1])
        
        # Read embeddings
        entity_embeddings = {}
        with open(entity_file, 'r') as f:
            # Skip the first line (header)
            next(f)
            
            # Process each line
            for line in f:
                tokens = line.strip().split(' ')
                entity_id = tokens[0]
                embedding = np.array([float(x) for x in tokens[1:]])
                entity_embeddings[entity_id] = embedding
        
        # Convert to Arrow format
        entity_ids = list(entity_embeddings.keys())
        embeddings = np.stack(list(entity_embeddings.values()))
        
        # Create Arrow table
        table = pa.Table.from_arrays(
            [pa.array(entity_ids), pa.array(embeddings.tolist())],
            names=['entity_id', 'embedding']
        )
        
        # Write to Arrow file
        output_file = output_path / f"entity_embeddings_{split_name}.arrow"
        with pa.OSFile(str(output_file), 'wb') as sink:
            with pa.RecordBatchFileWriter(sink, table.schema) as writer:
                writer.write_table(table)
        
        print(f"Processed {len(entity_embeddings)} entity embeddings in {time.time() - start_time:.2f} seconds")
        print(f"Output saved to {output_file}")
    
    # Process relation embeddings
    if relation_file.exists():
        relation_start_time = time.time()
        print(f"Processing {relation_file}...")
        
        # Read the first line to get dimensions
        with open(relation_file, 'r') as f:
            first_line = f.readline().strip().split(' ')
            num_relations, embedding_dim = int(first_line[0]), int(first_line[1])
        
        # Read embeddings
        relation_embeddings = {}
        with open(relation_file, 'r') as f:
            # Skip the first line (header)
            next(f)
            
            # Process each line
            for line in f:
                tokens = line.strip().split(' ')
                relation_id = tokens[0]
                embedding = np.array([float(x) for x in tokens[1:]])
                relation_embeddings[relation_id] = embedding
        
        # Convert to Arrow format
        relation_ids = list(relation_embeddings.keys())
        embeddings = np.stack(list(relation_embeddings.values()))
        
        # Create Arrow table
        table = pa.Table.from_arrays(
            [pa.array(relation_ids), pa.array(embeddings.tolist())],
            names=['relation_id', 'embedding']
        )
        
        # Write to Arrow file
        output_file = output_path / f"relation_embeddings_{split_name}.arrow"
        with pa.OSFile(str(output_file), 'wb') as sink:
            with pa.RecordBatchFileWriter(sink, table.schema) as writer:
                writer.write_table(table)
        
        print(f"Processed {len(relation_embeddings)} relation embeddings in {time.time() - relation_start_time:.2f} seconds")
        print(f"Output saved to {output_file}")

## 4. Process All Splits

Now let's process all the data splits (train, dev, test).

In [None]:
# Process all splits
for split in ['train', 'dev', 'test']:
    print(f"\nProcessing {split} split")
    print("-" * 50)
    
    # Process news data
    news_df = process_news_data(MIND_DATASET_PATH, BRONZE_PATH, split)
    
    # Process behaviors data
    behaviors_df = process_behaviors_data(MIND_DATASET_PATH, BRONZE_PATH, split)
    
    # Process entity embeddings
    process_entity_embeddings(MIND_DATASET_PATH, BRONZE_PATH, split)

## 5. Create User Profiles

As a final step, we'll create user profiles from the behaviors data to get a unified view of each user's reading history.

In [None]:
def create_user_profiles():
    """Create user profiles from behaviors data."""
    start_time = time.time()
    
    # Load all behaviors data
    behaviors_dfs = []
    for split in ['train', 'dev', 'test']:
        file_path = BRONZE_PATH / f"behaviors_{split}.parquet"
        if file_path.exists():
            df = pd.read_parquet(file_path)
            behaviors_dfs.append(df)
    
    # Combine behaviors data
    if behaviors_dfs:
        all_behaviors = pd.concat(behaviors_dfs, ignore_index=True)
        
        # Group by user_id
        user_profiles = []
        for user_id, group in all_behaviors.groupby('user_id'):
            # Sort by time
            group = group.sort_values('time')
            
            # Combine all history
            all_history = []
            for history in group['history']:
                all_history.extend(history)
            
            # Remove duplicates while preserving order
            seen = set()
            unique_history = [x for x in all_history if not (x in seen or seen.add(x))]
            
            # Create user profile
            user_profiles.append({
                'user_id': user_id,
                'history': unique_history,
                'history_length': len(unique_history),
                'first_activity': group['time'].min(),
                'last_activity': group['time'].max(),
                'activity_days': (group['time'].max() - group['time'].min()).days + 1,
                'impression_count': len(group)
            })
        
        # Convert to DataFrame
        user_profiles_df = pd.DataFrame(user_profiles)
        
        # Write to Parquet format
        output_file = BRONZE_PATH / "user_profiles.parquet"
        user_profiles_df.to_parquet(output_file, index=False)
        
        print(f"Created {len(user_profiles_df)} user profiles in {time.time() - start_time:.2f} seconds")
        print(f"Output saved to {output_file}")
        
        return user_profiles_df
    else:
        print("No behaviors data found")
        return None

In [None]:
# Create user profiles
user_profiles_df = create_user_profiles()

## 6. Save Metadata

Finally, let's save metadata about the bronze layer processing.

In [None]:
# Create metadata
metadata = {
    "created_at": pd.Timestamp.now().isoformat(),
    "mind_dataset_version": "MINDLarge",
    "pipeline_version": "1.0",
    "bronze_files": {
        "news": [f"news_{split}.parquet" for split in ['train', 'dev', 'test']],
        "behaviors": [f"behaviors_{split}.parquet" for split in ['train', 'dev', 'test']],
        "embeddings": [
            f"entity_embeddings_{split}.arrow" for split in ['train', 'dev', 'test']
        ] + [f"relation_embeddings_{split}.arrow" for split in ['train', 'dev', 'test']],
        "user_profiles": "user_profiles.parquet"
    },
    "stats": {}
}

# Add statistics for each file
for split in ['train', 'dev', 'test']:
    news_file = BRONZE_PATH / f"news_{split}.parquet"
    behaviors_file = BRONZE_PATH / f"behaviors_{split}.parquet"
    
    if news_file.exists():
        news_df = pd.read_parquet(news_file)
        metadata["stats"][f"news_{split}"] = {
            "count": len(news_df),
            "categories": news_df['category'].nunique(),
            "subcategories": news_df['subcategory'].nunique()
        }
    
    if behaviors_file.exists():
        behaviors_df = pd.read_parquet(behaviors_file)
        metadata["stats"][f"behaviors_{split}"] = {
            "count": len(behaviors_df),
            "users": behaviors_df['user_id'].nunique()
        }

# Add user profile stats
if user_profiles_df is not None:
    metadata["stats"]["user_profiles"] = {
        "count": len(user_profiles_df),
        "avg_history_length": user_profiles_df['history_length'].mean(),
        "max_history_length": user_profiles_df['history_length'].max(),
        "avg_activity_days": user_profiles_df['activity_days'].mean()
    }

# Save metadata
with open(BRONZE_PATH / "_metadata.json", 'w') as f:
    json.dump(metadata, f, indent=2)

print("Metadata saved to", BRONZE_PATH / "_metadata.json")

## Conclusion

We've successfully processed the raw MIND dataset into the Bronze layer of our medallion architecture. This layer provides:

- Efficient data storage in Parquet and Arrow formats
- Standardized schemas across all splits
- Initial data cleansing and validation
- Aggregated user profiles
- Detailed metadata about the processing

In the next tutorial, we'll use this Bronze layer data to create the Silver layer with advanced features and embeddings.