# Spark 에서 Pytorch로 간단하게 데이터 변환하기
1. 스파크를 사용해서 데이터 로드
2. 스파크 데이터프레임을 petastorm spark_dataset_converter를 사용해서 Pytorch 데이터 로더로 변환
3. 학습을 위해 싱글 노드 파이토치 모델에 데이터를 피딩
4. 분산 하이퍼 파라미터 튜닝 함수에 데이터를 피딩
5. 파이토치 분산 모델에 데이터 피딩

In [0]:
import petastorm 
import torch
import pyarrow

print("petastorm 버전 : ", petastorm.__version__)
print("pyarrow 버전 : ", pyarrow.__version__)
print("pytorch 버전 : ", torch.__version__)

In [0]:
from pyspark.sql.functions import col

from petastorm.spark import SparkDatasetConverter, make_spark_converter

import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial 
from petastorm import TransformSpec
from torchvision import transforms

from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK

import horovod.torch as hvd
from sparkdl import HorovodRunner

In [0]:
BATCH_SIZE = 32
NUM_EPOCHS = 10

In [0]:
# flowers 데이터 셋을 사용한다. ( 이미지 분류 )
# 빠른 예제 테스트를 위해서 데이터를 총 100개만 사용함.
df = spark.read.format("delta").load("/databricks-datasets/flowers/delta") \
  .select(col("content"), col("label_index")) \
  .limit(1000)
  
num_classes = df.select("label_index").distinct().count()
print("분류 클래스 개수 - ", num_classes)
# 학습 데이터로 이미지 90개 검증 데이터로 이미지 10개를 사용함.
df_train, df_val = df.randomSplit([0.9, 0.1], seed=12345)

# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)


print(f"학습 데이터 - {df_train.count()},\t 검증 데이터 - {df_val.count()}")

In [0]:
# petastorm의 make_spark_converter를 사용해서 데이터프레임을 캐시한다. 
# import petatorm.spark.make_spark_converter
# 데이터브릭스에서 제공하는 파일 시스템에 스파크 데이터프레임을 캐시한다.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache")

converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)

print(f"학습 데이터 : {len(converter_train)}, 검증 데이터 : {len(converter_val)}")


In [0]:
'''
def get_model(lr=1e-2):
  # torchvision 라이브러리에서 MobileNetV2 모델을 가져온다.
  model = torchvision.models.resnet18(pretrained=True)
  # 모델 파라미터를 학습하지 않도록 만든다.
  for param in model.parameters():
    param.requires_gra = False
  
  # 트랜스퍼 러닝을 위해서 분류 목적의 레이어를 추가함
  num_ftrs = model.fc.in_features

  # 새롭게 추가되는 레이어는 학습이 가능하다. ( requires_grad=True)
  model.fc = torch.nn.Linear(num_ftrs, num_classes)
  
  return model
'''

In [0]:
import torch.nn as nn
import torch.nn.functional as F 

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(3, 3, 1)
    self.pool = nn.MaxPool2d(2, 2) # 
    self.fc1 = nn.Linear(3*112*112, 5)
  
  def forward(self, x):
    x = self.pool(F.relu(self.conv1(x))) #224 - 1 + 1 = 224 -> 112
    x = x.view(-1, 3*112*112)
    x = self.fc1(x)
    return x
  
def get_model():
  return Net()

In [0]:
def train_one_epoch(model, criterion, optimizer, scheduler,
                    train_dataloader_iter, steps_per_epoch, epoch,
                    device):
  # 모델을 학습 모드로 만든다.
  model.train()
  
  running_loss = 0.0
  running_corrects = 0
  
  # 1 epoch 마다 데이터에 대해 iteration
  for step in range(steps_per_epoch):
    pd_batch = next(train_dataloader_iter)
    
    inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
    
    with torch.set_grad_enabled(True):
      optimizer.zero_grad()
      
      # Forward -모델에 데이터를 Input한다.
      outputs = model(inputs)
      _, preds = torch.max(outputs, 1)
      loss = criterion(outputs, labels)
      
      # Backward + Optimize - Loss를 역전파 시키고 모델 파라미터를 업데이트 한다.
      loss.backward()
      optimizer.step()
  
    running_loss += loss.item() * inputs.size(0)
    running_corrects += torch.sum(preds == labels.data)
  
  scheduler.step()
  
  epoch_loss = running_loss / (steps_per_epoch * BATCH_SIZE)
  epoch_acc = running_corrects.double() / ( steps_per_epoch * BATCH_SIZE )

  print("학습 Loss : {:.4f} 정확도 : {:.4f}".format(epoch_loss, epoch_acc))
  return epoch_loss, epoch_acc

In [0]:
def evaluate(model, criterion, 
             val_dataloader_iter, validation_steps,
             device, metric_agg_fn=None):
  # 모델을 evaluate 모드로 만듬
  model.eval()
  
  running_loss = 0.0
  running_corrects = 0
  
  # 모든 validation set에 대해 Iterate 한다.
  for step in range(validation_steps):
    pd_batch = next(val_dataloader_iter)
    inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)
    
    with torch.set_grad_enabled(False):
      # Forward
      outputs = model(inputs)
      _, preds = torch.max(outputs, 1)
      loss = criterion(outputs, labels)
      
    running_loss += loss.item()
    running_corrects += torch.sum(preds == labels.data)
    
  epoch_loss = running_loss / validation_steps
  epoch_acc = running_corrects.double() / (validation_steps * BATCH_SIZE)
  
  if metric_agg_fn != None:
    epoch_loss = metric_agg_fn(epoch_loss, 'avg_loss')
    epoch_acc = metric_agg_fn(epoch_acc, 'avg_acc')
    
  print("검증 Loss : {:.4f} 정확도 : {:.4f}".format(epoch_loss, epoch_acc))
  return epoch_loss, epoch_acc

## 이미지 처리
모델에 데이터셋을 피딩하기 전, 바이너리 형태로 되어있는 이미지 데이터를 변환한다.     
데이터브릭스에서는 스파크 데이터프레임을 사용해서 이미지로 변환하는 것은 권장하지 않음.     
이미지로 변환하면서 사이즈가 엄청 커지기 때문에 성능 감소로 이어질 수 있다고 한다.    
대신에 petastorm에서 변환을 진행을 하는 것을 권장함

In [0]:
def transform_row(is_train, pd_batch):
  # 입력과 아웃풋이 반드시 pandas.DataFrame 이어야 한다.
  transformers = [transforms.Lambda(lambda x: Image.open(io.BytesIO(x)))]
  
  if is_train:
    # 학습 데이터에 Data Augmentation을 한다.
    transformers.extend([
      transforms.RandomResizedCrop(224),
      transforms.RandomHorizontalFlip(),
    ])
  else:
    transformers.extend([
      transforms.Resize(256),
      transforms.CenterCrop(224),
    ])
  
  transformers.extend([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
  ])
  
  trans = transforms.Compose(transformers)
  
  pd_batch['features'] = pd_batch['content'].map(lambda x: trans(x).numpy())
  pd_batch = pd_batch.drop(labels=['content'], axis=1)

  return pd_batch

In [0]:
def get_transform_spec(is_train=True):
  # TransformSpec 의 output shape는 petastorm에 자동으로 전달되는 것이 아니다.
  # edit_fields 에서 새로운 column의 shape를 명시해줘야하고 
  # selected_fields 에 output column의 순서를 명시해줘야한다.
  return TransformSpec(partial(transform_row, is_train),
                       edit_fields=[('features', np.float32, (3, 224, 224), False)],
                       selected_fields=['features', 'label_index'])

In [0]:
def train_and_evaluate(lr=1e-3):
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
  model = get_model()
  model = model.to(device)
  
  criterion = torch.nn.CrossEntropyLoss()
  
  # 마지막 분류 레이어만 학습한다.
  optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
  
  # Learning Rate를 7 epoch 마다 0.1 씩 감소시킨다.
  exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
  
  # converter.make_torch_dataloader 함수를 사용해서 파이토치 데이터로드를 생성한다.
  # converter_train = make_spark_converter(df_train)
  with converter_train.make_torch_dataloader(transform_spec = get_transform_spec(is_train=True),
                                             batch_size = BATCH_SIZE) as train_dataloader, \
       converter_val.make_torch_dataloader(transform_spec = get_transform_spec(is_train=False),
                                             batch_size = BATCH_SIZE) as val_dataloader:
    
    train_dataloader_iter = iter(train_dataloader)
    steps_per_epoch = len(converter_train) // BATCH_SIZE
    
    val_dataloader_iter = iter(val_dataloader)
    validation_steps = max(1, len(converter_val)) // BATCH_SIZE
                           
    for epoch in range(NUM_EPOCHS):
      print('Epoch {}/{}'.format(epoch+1, NUM_EPOCHS))
      print('_' * 10)
                           
      train_loss, train_acc = train_one_epoch(model, criterion, optimizer, exp_lr_scheduler,
                                              train_dataloader_iter, steps_per_epoch, epoch,
                                              device)
      val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps, device)

  return val_loss  

In [0]:
loss = train_and_evaluate()

## 4. 분산 하이퍼 파라미터 튜닝 함수에 데이터 피딩
Hyperopt 라이브러리를 사용해서 하이퍼파라미터 튜닝 작업을 한다.

In [0]:
import hyperopt 
import mlflow 

def train_fn(lr):
  loss = train_and_evaluate(lr)
  return { 'loss' : loss, 'status' : hyperopt.STATUS_OK }

search_space = hp.loguniform('lr', -10, -4)
with mlflow.start_run():
  argmin = fmin(
    fn = train_fn,
    space = search_space, 
    algo = hyperopt.rand.suggest,
    max_evals = 2,
    trials = SparkTrials(parallelism = 2))

In [0]:
# 최적화된 Learning Rate ( 하이퍼 파라미터 )
argmin

## 5. 분산 파이토치 모델
HorovodRunner를 사용해서 분산 학습을 한다.

In [0]:
def metric_average(val, name):
  tensor = torch.tensor(val)
  avg_tensor = hvd.allreduce(tensor, name=name)
  return avg_tensor.item()

In [0]:
def train_and_evaluate_hvd(lr=1e-2):
  # Horovod를 초기화한다.
  hvd.init() 
  
  if torch.cuda.is_available():
    torch.cuda.set_device(hvd.local_rank())
    device = torch.cuda.current_device()
  else:
    device = torch.device("cpu")

  model = get_model()
  model = model.to(device)
  
  criterion = torch.nn.CrossEntropyLoss()
  # 동기 분산 학습에서는 워커 수에 맞춰서 배치 사이즈가 효율적으로 조정된다.
  # 증가 될 배치 사이즈에 따라서 학습율을 키워준다.
  optimizer = torch.optim.SGD(model.parameters(), lr=lr * hvd.size(), momentum=0.9)
  
  

  # 모든 워커가 동일한 파라미터를 가지고 학습할 수 있도록 모델 metadata와 optimizer를 broadcast 해준다.
  hvd.broadcast_parameters(model.state_dict(), root_rank=0)
  hvd.broadcast_optimizer_state(optimizer, root_rank=0)
  

  # Horovod의 DistributedOptimizer로 optimizer를 래핑해준다.
  optimizer_hvd = hvd.DistributedOptimizer(optimizer, named_parameters = model.named_parameters())
  
  exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer_hvd, step_size = 7, gamma = 0.1)
  
  with converter_train.make_torch_dataloader(transform_spec = get_transform_spec(is_train=True),
                                             cur_shard = hvd.rank(), 
                                             shard_count = hvd.size(),
                                             batch_size = BATCH_SIZE) as train_dataloader, \
       converter_val.make_torch_dataloader(transform_spec = get_transform_spec(is_train=False),
                                           cur_shard = hvd.rank(),
                                           shard_count = hvd.size(),
                                           batch_size = BATCH_SIZE) as val_dataloader:
    
    train_dataloader_iter = iter(train_dataloader)
    steps_per_epoch = len(converter_train) // ( BATCH_SIZE * hvd.size())
    
    val_dataloader_iter = iter(val_dataloader)
    validation_steps = max(1, len(converter_val) // ( BATCH_SIZE * hvd.size()))
    
    for epoch in range(NUM_EPOCHS):
      print("Epoch {}/{}".format(epoch + 1, NUM_EPOCHS))
      print('-' * 10)
      
      train_loss, train_acc = train_one_epoch(model, criterion, optimizer_hvd, exp_lr_scheduler,
                                              train_dataloader_iter, steps_per_epoch, epoch,
                                              device)
      val_loss, val_acc = evaluate(model, criterion, 
                                   val_dataloader_iter, validation_steps,
                                   device, metric_agg_fn=metric_average)

  return val_loss

In [0]:
# 클러스터가 두개의 워커로 구성되어 있다고 가정한다.
hr = HorovodRunner(np=2)
hr.run(train_and_evaluate_hvd)