# FLSim Tutorial: Sentiment Classification with LEAF's Sent140



## Introduction

In this tutorial, we will train a binary sentiment classifier on LEAF's Sent140 dataset with federated learning using FLSim. 


### Prerequisites

To get the most of this tutorial, you should be comfortable with training machine learning models with **PyTorch** and familiar with the concept of **federated learning (FL)**. If you are unfamiliar with either of them or could use a refresher, please take a look at the following resources before proceeding with the tutorial:

- McMahan & Ramage (2017): [Federated Learning: Collaborative Machine Learning without Centralized Training Data](https://ai.googleblog.com/2017/04/federated-learning-collaborative.html). A short blog post from Google AI introducing the main idea of FL in a beginner-friendly way.
- McMahan et al. (2017): [Communication-Efficient Learning of Deep Networks from Decentralized Data](https://arxiv.org/pdf/1602.05629.pdf). This paper first proposes the approach of federated learning. The described algorithm is now known as federated averaging (or FedAvg for short).
- PyTorch has [extensive tutorials](https://pytorch.org/tutorials/) on their website.
- If you're new to **sentiment classification**, you can find Pang and Lee's survey on the topic [here](https://www.cs.cornell.edu/home/llee/omsa/omsa-published.pdf). 

Now that you're familiar with PyTorch and FL and have a sense of sentiment classification, let's move on!

### Objectives 

By the end of this tutorial, we will have learnt how to

1. Build a data pipeline for federated learning with FLSim,
2. Create a sentiment classification model compatible with FL training,
3. Create a metrics reporter to collect and report metrics,
4. Set hyperparameters for FL training, and
5. Launch an FL training flow using FLSim.

## Training a sentiment classifier with FLSim

### Prerequisites
First, let us install flsim via pip with the command below:

In [1]:
!pip install --quiet flsim



Some useful parameters for later - no need to change these.

In [2]:
!nvcc --version


nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2021 NVIDIA Corporation
Built on Mon_Sep_13_19:13:29_PDT_2021
Cuda compilation tools, release 11.5, V11.5.50
Build cuda_11.5.r11.5/compiler.30411180_0


In [3]:
USE_CUDA = True
LOCAL_BATCH_SIZE = 32
MAX_SEQ_LEN = 25

# suppress large outputs
VERBOSE = False

### 0. About the dataset

For this tutorial, we will use [LEAF's](https://leaf.cmu.edu/) [Sentiment140 (Sent140) dataset](https://leaf.cmu.edu/build/html/tutorials/sent140-md.html), which consists of 1.6 million tweets by 660k users. Note that the mean number of tweets per user is 2.42 and the standard deviation is 4.71.

![Sent140 distribution of samples across users](https://leaf.cmu.edu/webpage/images/twitter_hist.png)

Before the next step in this tutorial, we need to download the dataset and partition the data by users. 
In particular, we sample 
1% of the entire dataset (`--sf 0.01`) 
in a non-IID manner (`-s niid`) 
and partition 90% of sampled users into train and 10% of sampled users 
into test (`--tf 0.90`) as opposed to splitting individual samples into train and test (`-t 'user'`) .
We require all users to have at least one sample (`-k 1`).

For more information on the various preprocessing options, see [here](https://github.com/TalwalkarLab/leaf/tree/master/data/sent140). You can find the LEAF paper [here](https://arxiv.org/pdf/1812.01097.pdf).


In [4]:
# Don't need to run this after the directory has been created once

'''

%%capture preprocess_output

# Download and preprocess the data
!git clone https://github.com/TalwalkarLab/leaf.git
%cd leaf/data/sent140
# !./preprocess.sh --sf 0.01 -s niid -t 'user' --tf 0.90 -k 1 --spltseed 1

# change preprocess option (-t) so each user's data gets split into train/test
!./preprocess.sh --sf 0.01 -s niid -t 'sample' --tf 0.90 -k 2 --spltseed 1

if VERBOSE: print(preprocess_output)

'''

%cd leaf/data/sent140

/workspace/leaf/data/sent140


Let us now find our preprocessed data. Note that if you use different preprocessing options, you might need to change these!



In [5]:
TRAIN_DATA = !ls data/train
TRAIN_DATA = "data/train/" + TRAIN_DATA[0]

TEST_DATA = !ls data/test
TEST_DATA = "data/test/" + TEST_DATA[0]

TRAIN_DATA, TEST_DATA

('data/train/all_data_niid_01_keep_2_train_9.json',
 'data/test/all_data_niid_01_keep_2_test_9.json')

We can now load the training data and get a sense of how samples are distributed across users in our subset of the Sent140 dataset.


In [6]:
import json
import numpy as np

# load the training data
with open(TRAIN_DATA, "r") as f:
    training_data = json.load(f)

# how samples are distributed across users
n_samples = training_data['num_samples']
print(f"""\nNumber of samples per user:
  min={np.min(n_samples)}, 
  max={np.max(n_samples)}, 
  median={np.median(n_samples)}, 
  mean={np.mean(n_samples):.2f}, 
  std={np.std(n_samples):.2f}
  """)


Number of samples per user:
  min=1, 
  max=123, 
  median=2.0, 
  mean=3.64, 
  std=6.37
  


In [7]:
# print(training_data.keys())
# print(training_data["users"])


Let us also look at the data for an example user. Notice that there are multiple metadata fields in addition to the tweet itself and its sentiment label.

In [8]:
EXAMPLE_USER = training_data["users"][0]
training_data["user_data"][EXAMPLE_USER]

{'x': [['2003370575',
   'Tue Jun 02 06:24:38 PDT 2009',
   'NO_QUERY',
   'bricaligirl',
   'Doin my hair for school... Sooo tired ',
   'training']],
 'y': [0]}

### 1. Data pipeline

Now, let us define how to build the data pipeline for federated learning:

1. To load the training and test data, we define a new dataset class, `Sent140Dataset`, which converts each user's tweets (features) into a `torch.Tensor`, discarding tweet metadata such as the date and time of the tweet. It also stores each tweet's sentiment (label) as well.



In [9]:
import itertools
import re
import string
import unicodedata

import torch
from torch.utils.data import Dataset


# 1. The Sent140Dataset will store the tweets and corresponding sentiment for each user.

class Sent140Dataset(Dataset):
    def __init__(self, data_root, max_seq_len):
        self.data_root = data_root
        self.max_seq_len = max_seq_len
        self.all_letters = {c: i for i, c in enumerate(string.printable)}
        self.num_letters = len(self.all_letters)
        self.UNK = self.num_letters

        with open(data_root, "r+") as f:
            self.dataset = json.load(f)

        self.data = {}
        self.targets = {}
        self.num_classes = 2  # binary sentiment classification

        # Populate self.data and self.targets
        for user_id, user_data in self.dataset["user_data"].items():
            self.data[user_id] = self.process_x(list(user_data["x"]))
            self.targets[user_id] = self.process_y(list(user_data["y"]))

    def __len__(self):
        return len(self.data)

    def __iter__(self):
        for user_id in self.data.keys():
            yield self.__getitem__(user_id)

    def __getitem__(self, user_id: str):
        if user_id not in self.data or user_id not in self.targets:
            raise IndexError(f"User {user_id} is not in dataset")
        return self.data[user_id], self.targets[user_id]

    def unicodeToAscii(self, s):
        return "".join(
            c for c in unicodedata.normalize("NFD", s)
            if unicodedata.category(c) != "Mn" and c in self.all_letters
        )

    def line_to_indices(self, line: str, max_seq_len: int):
        line_list = self.split_line(line)  # split phrase in words
        line_list = line_list
        chars = self.flatten_list([list(word) for word in line_list])
        indices = [
            self.all_letters.get(letter, self.UNK)
            for i, letter in enumerate(chars)
            if i < max_seq_len
        ]
        # Add padding
        indices = indices + [self.UNK] * (max_seq_len - len(indices))
        return indices

    def process_x(self, raw_x_batch):
        x_batch = [e[4] for e in raw_x_batch]  # e[4] contains the actual tweet
        x_batch = [self.line_to_indices(e, self.max_seq_len) for e in x_batch]
        x_batch = torch.LongTensor(x_batch)
        return x_batch

    def process_y(self, raw_y_batch):
        y_batch = [int(e) for e in raw_y_batch]
        return y_batch

    def split_line(self, line):
        """
        Split given line/phrase (str) into list of words (List[str])
        """
        return re.findall(r"[\w']+|[.,!?;]", line)

    def flatten_list(self, nested_list):
        return list(itertools.chain.from_iterable(nested_list))

2. We can now load the train and test dataset using `Sent140Dataset`.


In [10]:
# 2. Load the train and test datasets.
train_dataset = Sent140Dataset(
    data_root=TRAIN_DATA,
    max_seq_len=MAX_SEQ_LEN,
)
test_dataset = Sent140Dataset(
    data_root=TEST_DATA,
    max_seq_len=MAX_SEQ_LEN,
)

Recall our `EXAMPLE_USER` from earlier? Their data now looks like this:

In [11]:
train_dataset[EXAMPLE_USER]

(tensor([[39, 24, 18, 23, 22, 34, 17, 10, 18, 27, 15, 24, 27, 28, 12, 17, 24, 24,
          21, 75, 75, 75, 54, 24, 24]]),
 [0])

To complete our data pipeline, we only need to

3. Create a data loader, which will batchify training, eval, and test data. There is no need to create a sharder since each data sample is already associated with a user. For each dataset, the data loader splits each client's data into batches of size `batch_size`. We choose not to drop the last batch.

4. Lastly, wrap the data loader with a data provider and return it. 
The data provider creates clients from the groupings in the data loader and adds metadata (e.g. number of examples, number of batches per client). 
Our data is now formatted such that the trainer will accept it.

In [12]:
from flsim.utils.example_utils import LEAFDataLoader, LEAFDataProvider

# 3. Batchify training, eval, and test data. Note that train_dataset is already sharded.
dataloader = LEAFDataLoader(
    train_dataset,
    test_dataset,
    test_dataset,
    batch_size=LOCAL_BATCH_SIZE,
    drop_last=False,
)

# 4. Wrap the data loader with a data provider.
data_provider = LEAFDataProvider(dataloader)
print(f"\nClients in total: {data_provider.num_users()}")

Creating FL User: 2493user [00:00, 25579.78user/s]
Creating FL User: 2493user [00:00, 32745.12user/s]
Creating FL User: 2493user [00:00, 32249.25user/s]


Clients in total: 2493





### 2. Create the model

Now, let's see how we can create a model that is compatible with FL-training.

1. First, we define a standard, non-FL sentiment classification PyTorch `nn.Module`. In this tutorial we use a simple char-LSTM with an embedding, LSTM, and linear layer.

2. Create a `torch.device` and choose where the model will be allocated (CUDA or CPU).

As with the data pipeline, these steps are identical to creating a model in non-federated learning. Note that in contrast to non-FL learning, we haven't moved the model to device yet.

In [13]:
from torch import nn

class CharLSTM(nn.Module):
    def __init__(
        self,
        num_classes,
        n_hidden,
        num_embeddings,
        embedding_dim,
        max_seq_len,
        dropout_rate,
    ):
        super().__init__()
        self.dropout_rate = dropout_rate
        self.n_hidden = n_hidden
        self.num_classes = num_classes
        self.max_seq_len = max_seq_len
        self.num_embeddings = num_embeddings

        self.embedding = nn.Embedding(
            num_embeddings=self.num_embeddings, embedding_dim=embedding_dim
        )
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=self.n_hidden,
            num_layers=2,
            batch_first=True,
            dropout=self.dropout_rate,
        )
        self.fc = nn.Linear(self.n_hidden, self.num_classes)
        self.dropout = nn.Dropout(p=self.dropout_rate)

    def forward(self, x):
        seq_lens = torch.sum(x != (self.num_embeddings - 1), 1) - 1
        x = self.embedding(x)  # [B, S] -> [B, S, E]
        out, _ = self.lstm(x)  # [B, S, E] -> [B, S, H]
        out = out[torch.arange(out.size(0)), seq_lens]
        out = self.fc(self.dropout(out))  # [B, S, H] -> # [B, S, C]
        return out

We initialize our model with such parameters that it is compatible with our dataset.

In [14]:
import torch

# 1. Define our model, a simple char-LSTM.
model = CharLSTM(
    num_classes=train_dataset.num_classes,
    n_hidden=100,
    num_embeddings=train_dataset.num_letters + 1,
    embedding_dim=100,
    max_seq_len=MAX_SEQ_LEN,
    dropout_rate=0.1,
)

# 2. Choose where the model will be allocated.
cuda_enabled = torch.cuda.is_available() and USE_CUDA
device = torch.device(f"cuda:{0}" if cuda_enabled else "cpu")

model, device

(CharLSTM(
   (embedding): Embedding(101, 100)
   (lstm): LSTM(100, 100, num_layers=2, batch_first=True, dropout=0.1)
   (fc): Linear(in_features=100, out_features=2, bias=True)
   (dropout): Dropout(p=0.1, inplace=False)
 ),
 device(type='cuda', index=0))

As with the data pipeline, there are a few extra steps that we need to take to make sure that our model is compatible with FL. In particular, we need to

3. Wrap the PyTorch module with the FLSim `FLModel`, an abstracted version of a FL-friendly model class that is accepted by the trainer and handles metric collection, as well as the forward pass for both training and evaluation. We can recover our `nn.Module` by calling `FLModel.fl_get_module()`

4. Move the model to GPU and enable CUDA if desired. `FLModel.fl_cuda()` internally calls `model.to(device)` to move the model to GPU.

In [15]:
from flsim.utils.example_utils import FLModel

# 3. Wrap the model with FLModel.
global_model = FLModel(model, device)
assert(global_model.fl_get_module() == model)

# 4. Move the model to GPU and enable CUDA if desired.
if cuda_enabled:
    global_model.fl_cuda()

### 3. Metrics Reporting

After having created our data pipeline and FL model, we will now create our metrics reporter. 
The metrics reporter allows us to collect, evaluate, and report relevant training, aggregation, and evaluation/test metrics as well as log them onto TensorBoard.



In [16]:
from flsim.interfaces.metrics_reporter import Channel
from flsim.utils.example_utils import MetricsReporter

# Create a metric reporter.
metrics_reporter = MetricsReporter([Channel.TENSORBOARD, Channel.STDOUT])

There are three functions that are of particular interest:

1. `compute_scores` computes the metrics of interest for both training and aggregation (if desired) as well as evaluation/test.

2. `create_eval_metrics` creates a dictionary that stores the value for each eval metric. 

3. `compare_metrics` compares the current eval metrics that are returned by `create_eval_metrics` to the best eval metrics so far.

For this tutorial, our only metric of interest is top-1 accuracy. In general, as with the data loading and model, you should write your own metrics reporter depending on the task.

In [17]:
import inspect

if VERBOSE:
    print(inspect.getsource(MetricsReporter.compute_scores))
    print(inspect.getsource(MetricsReporter.create_eval_metrics))
    print(inspect.getsource(MetricsReporter.compare_metrics))

### 4. Hyperparameters

We can represent the hyperparameters for FL training in a JSON config for ease of representation and we convert the JSON config to OmegaConf before passing it to the FL trainer.

In particular, we specify a FedAvg with LR implementation with 10 users per round.

In [39]:
import flsim.configs
from flsim.utils.config_utils import fl_config_from_json
from omegaconf import OmegaConf

json_config = {
    "trainer": {
        "_base_": "base_sync_trainer",
        "server": {
            "_base_": "base_sync_server",
            "server_optimizer": {
                # there are different types of server optimizers
                # fed avg with lr requires a learning rate, whereas e.g. fed_avg doesn't
                  # "_base_": "base_fed_avg_with_lr",
                # server's learning rate
                  # "lr": 0.7,
                # server's global momentum
                  # "momentum": 0.9

                # Federated ADAM (with weight decay)
                # Server Defaults:  
                  # lr: float = 0.001
                  # weight_decay: float = 0.00001
                  # beta1: float = 0.9
                  # beta2: float = 0.999
                  # eps: float = 1e-8
                "_base_": "base_fed_adam",
                "lr":0.01
            },
            # aggregate client models into a single model by taking their weighted sum
            "aggregation_type": "WEIGHTED_AVERAGE",
            # type of user selection sampling
            "active_user_selector": {
                "_base_": "base_uniformly_random_active_user_selector"
            }
        },
        "client": {
            # number of client's local epochs
            # "epochs": 1,  <--- old value in example
            "epochs": 10,
            "optimizer": {
                # client's optimizer
                "_base_": "base_optimizer_sgd",
                # client's local learning rate
                # "lr": 1,
                "lr": 0.1,
                # client's local momentum
                "momentum": 0
            }
        },
        # number of users per round for aggregation
        "users_per_round": 1000,
        # total number of global epochs
        # total #rounds = ceil(total_users / users_per_round) * epochs <---- THIS IS THE MAIN COMMUNICATION COST METRIC
        #   total_users = ~6000
        # "epochs": 1,
        "epochs": 1,
        # frequency of reporting train metrics
        "train_metrics_reported_per_epoch": 10,
        # keep the trained model always (as opposed to only when it
        # performs better than the previous model on eval)
        "always_keep_trained_model": False,
        # frequency of evaluation per epoch
        "eval_epoch_frequency": 1,
        "do_eval": True,
        # should we report train metrics after global aggregation
        "report_train_metrics_after_aggregation": True
    }
}

cfg = fl_config_from_json(json_config)
if VERBOSE: print(OmegaConf.to_yaml(cfg))

### 5. Training
Recall that we already built the data provider and created a model compatible with FL training. 
We also initialized a metrics reporter and set our desired hyperparameters.

Now, we only need to instantiate the trainer with the model and hyperparameter config we defined earlier to launch the FL training flow. We run FL training with the above JSON config and utilize `eval_score` to store the final evaluation metrics.



In [40]:
from hydra.utils import instantiate
import time

# Instantiate the trainer.
trainer = instantiate(cfg.trainer, model=global_model, cuda_enabled=cuda_enabled)   
# I think, this model argument can be loaded as torch model and passed into FLModel()
# For when we want intermediate models in SHA/Hyperband

start = time.time()

# Launch FL training.
final_model, eval_score = trainer.train(
    data_provider=data_provider,
    metric_reporter=metrics_reporter,
    num_total_users=data_provider.num_users(),
    distributed_world_size=1,
)

total_train_time = time.time()-start

Round:   0%|                                                                                   | 0/3 [00:00<?, ?round/s]

Train finished Global Round: 1
(epoch = 1, round = 1, global round = 1), Loss/Training: 0.12807748222412627
(epoch = 1, round = 1, global round = 1), Accuracy/Training: 94.36513899323816
reporting (epoch = 1, round = 1, global round = 1) for aggregation


Round:  33%|████████████████████████▋                                                 | 1/3 [02:24<04:48, 144.38s/round]

(epoch = 1, round = 1, global round = 1), Loss/Aggregation: 2.2671658528587018
(epoch = 1, round = 1, global round = 1), Accuracy/Aggregation: 53.14300025043827
Train finished Global Round: 2
(epoch = 1, round = 2, global round = 2), Loss/Training: 0.2766862300449927
(epoch = 1, round = 2, global round = 2), Accuracy/Training: 92.40078806642273
reporting (epoch = 1, round = 2, global round = 2) for aggregation


Round:  67%|█████████████████████████████████████████████████▎                        | 2/3 [04:54<02:27, 147.91s/round]

(epoch = 1, round = 2, global round = 2), Loss/Aggregation: 1.1413128868049711
(epoch = 1, round = 2, global round = 2), Accuracy/Aggregation: 57.585139318885446
Train finished Global Round: 3
(epoch = 1, round = 3, global round = 3), Loss/Training: 0.1404463000027323
(epoch = 1, round = 3, global round = 3), Accuracy/Training: 94.20993858179787
reporting (epoch = 1, round = 3, global round = 3) for aggregation
(epoch = 1, round = 3, global round = 3), Loss/Aggregation: 1.1006472662621591
(epoch = 1, round = 3, global round = 3), Accuracy/Aggregation: 57.11892797319933
Running (epoch = 1, round = 3, global round = 3) for Eval
(epoch = 1, round = 3, global round = 3), Loss/Eval: 1.129719145570064
(epoch = 1, round = 3, global round = 3), Accuracy/Eval: 53.20422535211268
Current eval accuracy: {'Accuracy': 53.20422535211268}%, Best so far: {'Accuracy': 55.563380281690144}%


Round:  67%|█████████████████████████████████████████████████▎                        | 2/3 [07:30<03:45, 225.48s/round]
Epoch:   0%|                                                                                   | 0/1 [07:30<?, ?epoch/s]


In [41]:
print(eval_score)

None


In [20]:
print("Total training time: {}".format( total_train_time ))

Total training time: 1875.268052816391


In [34]:
'''
Test cell for loading model via state dict
'''
import copy

model2 = CharLSTM(
        num_classes=train_dataset.num_classes,
        n_hidden=100,
        num_embeddings=train_dataset.num_letters + 1,
        embedding_dim=100,
        max_seq_len=MAX_SEQ_LEN,
        dropout_rate=0.1,
    )
model2.load_state_dict( copy.deepcopy( model.state_dict()) )
trainer2 = instantiate(cfg.trainer, model=FLModel(model2,device), cuda_enabled=cuda_enabled)   

trainer.test(
    data_iter=data_provider.test_data(),
    metric_reporter=MetricsReporter([Channel.STDOUT]),
)

Running (epoch = 1, round = 1, global round = 1) for Test
(epoch = 1, round = 1, global round = 1), Loss/Test: 1.1561948809489
(epoch = 1, round = 1, global round = 1), Accuracy/Test: 52.07746478873239


{'Accuracy': 52.07746478873239}

After training finishes, we evaluate the model and report the accuracy on the test set before finishing this tutorial.


In [46]:
# We can now test our trained model.
trainer.test(
    data_iter=data_provider.test_data(),
    metric_reporter=MetricsReporter([Channel.STDOUT]),
)

Running (epoch = 1, round = 1, global round = 1) for Test
(epoch = 1, round = 1, global round = 1), Loss/Test: 1.1297191458749012
(epoch = 1, round = 1, global round = 1), Accuracy/Test: 53.20422535211268


{'Accuracy': 53.20422535211268}

## Summary

In this tutorial, we first showed how to get and preprocess LEAF's Sent140 dataset. 
We then built a data provider by splitting each client's data into batches. 
We defined a simple char-LSTM as our model, wrapped it with a model compatible with FL training, and moved it to GPU. 
Lastly, we set the hyperparameters for FL training, launched the training flow, and evaluated our model.

### Additional resources

- For a more in-depth understanding of this tutorial, check out [example_utils.py](https://github.com/facebookresearch/FLSim/blob/main/flsim/utils/example_utils.py) where we define the data loader, data provider, `FLModel`, and metrics reporter that we use in this tutorial.

- [FLSim tutorials](https://github.com/facebookresearch/FLSim/tree/main/tutorials) - check out our other tutorial on image classification.

- Kairouz et al. (2021): [Advances and Open Problems in Federated Learning](https://arxiv.org/pdf/1912.04977.pdf). As the title suggests, an in-depth overview of advances and open problems in FL.

- If you're interested in federated learning with differential privacy, take a look at [Opacus](https://opacus.ai/), a library that enables training PyTorch models with differential privacy. 
You can find a blog post introducing Opacus [here](https://ai.facebook.com/blog/introducing-opacus-a-high-speed-library-for-training-pytorch-models-with-differential-privacy/).

