# Data pipeline

This data pipeline has the objective to download and transform the original dataset to obtain our final dataset. Moreover, we will use DVC to keep data versioning control in every step.

### 0. Necessary imports and functions

In [9]:
from kaggle.api.kaggle_api_extended import KaggleApi
from pathlib import Path
import os
from dotenv import load_dotenv
import yaml
import subprocess
import random
import shutil

In [2]:
def upload_data_to_dvc(data_dir: str, commit_message: str | None = None) -> None:
    commands = {
        'dvc_add': f"dvc add {data_dir}",
        'git_add': "git add ../data.dvc",
        'git_commit': "git commit -m " + (commit_message if commit_message else "Add data to DVC"),
        'dvc_push': "dvc push"
    }
    
    try:
        # Add the data directory to DVC
        subprocess.run(commands['dvc_add'].split(), check = True)

        # Commit the changes
        subprocess.run(commands['git_add'].split(), check = True)
        subprocess.run(commands['git_commit'].split(), check = True)

        # Push the changes to the remote repository
        subprocess.run(commands['dvc_push'].split(), check = True)
    except Exception as e:
        print("Error is the following:", e)
        print("Something went wrong!")

In [3]:
def copy_files(sample_list, src_images_dir, src_masks_dir, dest_images_dir, dest_masks_dir):
    for sample in sample_list:
        # Copy image file
        src_image_path = os.path.join(src_images_dir, sample)
        dest_image_path = os.path.join(dest_images_dir, sample)
        shutil.copyfile(src_image_path, dest_image_path)

        # Copy mask file (assuming the mask file has the same name as the image file)
        sample_mask = sample.replace('jpg', 'png')

        src_mask_path = os.path.join(src_masks_dir, sample_mask)
        dest_mask_path = os.path.join(dest_masks_dir, sample_mask)
        shutil.copyfile(src_mask_path, dest_mask_path)

### 1. Varibale definition

All variables are going to be defined based in a configuration file that will ease the process of variable modification.

In [4]:
# Read the YAML configuration file
with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)

# Declare some configuration variables
UPLOAD_ORIGINAL = config['dataPipelines']['dataDownloading']['uploadToDVC']
DATASET_LINK = config['dataPipelines']['dataDownloading']['datasetLink']
DATA_DIR = Path(config['dataPipelines']['dataDownloading']['dataDirectory'])
COMMIT_MESSAGE_ORIGINAL = config['dataPipelines']['dataDownloading']['commitMessage']
UPLOAD_SPLIT = config['dataPipelines']['splitData']['uploadToDVC']
TRAIN_SIZE = config['dataPipelines']['splitData']['trainSize']
VAL_SIZE = config['dataPipelines']['splitData']['valSize']
TEST_SIZE = config['dataPipelines']['splitData']['testSize']
SPLIT_DATA_DIR = Path(config['dataPipelines']['splitData']['dataDirectory'])
COMMIT_MESSAGE_SPLIT = config['dataPipelines']['splitData']['commitMessage']

# Create data directory if it does not exist
DATA_DIR.mkdir(parents = True, exist_ok = True)

# Load environment variables from a .env file and set up Kaggle credentials from environment variables
load_dotenv()
os.environ['KAGGLE_USERNAME'] = os.getenv('KAGGLE_USERNAME')
os.environ['KAGGLE_KEY'] = os.getenv('KAGGLE_KEY')

### 2. Working with the original data

The first thing we need to do is to download the original data. To do this we will use `KaggleAPI`.

In [5]:
# Initialize the Kaggle API
api = KaggleApi()
api.authenticate()

# Download the dataset
api.dataset_download_files(DATASET_LINK, path = DATA_DIR, unzip = True)

Dataset URL: https://www.kaggle.com/datasets/mariarisques/dataset-person-yolos


Now we need to upload the data to DVC to mantain our data versioning control.

In [6]:
if UPLOAD_ORIGINAL:
    upload_data_to_dvc(
        data_dir = DATA_DIR,
        commit_message = COMMIT_MESSAGE_ORIGINAL
    )

### 3. Split data

In this section we are going to split the data into train, validation and test splits.

In [10]:
# Define directories
images_dir = DATA_DIR / 'dataset_person-yolos/data/images'
masks_dir = DATA_DIR / 'dataset_person-yolos/data/masks'
images_dir_train = SPLIT_DATA_DIR / 'train/images'
masks_dir_train = SPLIT_DATA_DIR / 'train/masks'
images_dir_val = SPLIT_DATA_DIR / 'val/images'
masks_dir_val = SPLIT_DATA_DIR / 'val/masks'
images_dir_test = SPLIT_DATA_DIR / 'test/images'
masks_dir_test = SPLIT_DATA_DIR / 'test/masks'

# Get the list of samples and shuffle them
samples = os.listdir(images_dir)
random.shuffle(samples)

# Calculate split indices
num_samples = len(samples)
train_end = int(TRAIN_SIZE * num_samples)
val_end = train_end + int(VAL_SIZE * num_samples)

# Split samples
train_samples = samples[:train_end]
val_samples = samples[train_end:val_end]
test_samples = samples[val_end:]

# Create necessary directories
SPLIT_DATA_DIR.mkdir(parents = True, exist_ok = True)
images_dir_train.mkdir(parents = True, exist_ok = True)
masks_dir_train.mkdir(parents = True, exist_ok = True)
images_dir_val.mkdir(parents = True, exist_ok = True)
masks_dir_val.mkdir(parents = True, exist_ok = True)
images_dir_test.mkdir(parents = True, exist_ok = True)
masks_dir_test.mkdir(parents = True, exist_ok = True)

# Copy files to respective directories
copy_files(train_samples, images_dir, masks_dir, images_dir_train, masks_dir_train)
copy_files(val_samples, images_dir, masks_dir, images_dir_val, masks_dir_val)
copy_files(test_samples, images_dir, masks_dir, images_dir_test, masks_dir_test)

Now we need to upload the data to DVC to mantain our data versioning control.

In [None]:
if UPLOAD_SPLIT:
    upload_data_to_dvc(
        data_dir = SPLIT_DATA_DIR,
        commit_message = COMMIT_MESSAGE_SPLIT
    )

### 4. Transformations