(section_22_data_acquisition)=
# Data Acquisition
One of the most challenging problems to solve in deep learning has nothing to do with advanced neural network architectures, algorithm design, hardware configuration or AI framework selection. Solving problems by teaching machines to learn without being explicitly programmed rests on our ability to acquire, prepare, and serve the right data, of the right quality, quantity and format. From data ingestion, through analytics, machine learning and deep learning model training, data pipelines are the fundamental building blocks of artificial intelligence. In this section, we design, build and execute a simple, automated, and reproducible data acquisition and ingestion pipeline.


 Recall from the prior section, several aspects of our data 

## Data Acquisition ETL Design
### Source to Target Model (STTM)
Recall, our source data are comprised of CoreData and CommonFeaturesData datasets. The goal of the data acquisition pipeline is to:
1. **Extract** the CoreData and CommonFeaturesData datasets depicted on the left of {ref}`sttm` to a local staging area, 
2. **Transform** the CoreData and CommonFeaturesData datasets into the four target environment datasets described on right of {ref}`sttm`, then
3. **Load** the four datasets into the target relational database management system.

The following characterizes the mapping from the source data to the target data environment.

```{figure} ../images/STTM.png
---
height: 500px
width: 900px
name: sttm
---
Source to Target Model
```
Let's review the mapping of the CoreData and CoreFeatures.
#### CoreData Dataset Mapping
The CoreData datasets map to an Impressions table, and a CoreFeatures table. The Impressions table has all the fields contained in the CoreData datasets with one exception: the feature list. Feature lists found in the CoreData and CommonFeaturesData datasets are variable length lists of feature structures, each containing a feature name, feature id, and feature value. The lists of feature structures will be parsed, normalized, and stored in a separate CoreFeatures table where each observation corresponds to a single feature for an impression. 

#### CommonFeaturesData Dataset Mapping
Similarly, the CommonFeaturesData dataset is comprised of rows of common feature lists observed across many impressions. This dataset will map to a CommonFeaturesSummary and a CommonFeatures table.  The CommonFeaturesSummary simply stores the common_feature_index and the number of feature structures in feature lists stored in the CommonFeatures table.  

### Directed Acyclic Graph 
We've described the ETL process as a pipeline through which data flows sequentially from one end to the other. In practice, the metaphor is a bit misleading. Data isn't literally flowing from one end of a single tube to the other. Rather, ETL processes may be complex, non-linear, networks, of objects, tasks performed on those objects, and dependencies between tasks. A more apt theoretical framework for reasoning about ETL workflows can be borrowed from graph theory.

A graph is a pair $G=(V,E)$, where: 
- $V$ is a set of vertices, and 
- $E$ is a set of paired vertices or edges.

In a *directed* graph or *digraph*, each edge $E \subseteq \{(x,y)|(x,y)\in V^2$ and $x\ne y\}$ between a pair of vertices has a polarity or orientation from one vertex to another. For instance, the pair of vertices may be tasks to perform within a data pipeline, and the edge between them may represent the constraint that the end task must initiate after the start task has completed.  

A *path* graph is a sequence of edges in a graph in which the ending vertex or task of each edge in a sequence is the same as the starting vertex or task of the next edge in the sequence. More formally, a graph of order $n\ge2$ is a graph in which the vertices can be listed in an order $\{v_1,v_2,\dots,v_n\}$ such that the edges are $\{v_i,v_{i+1}\}$ for  $i=1,2,\dots,n-1$. When the starting vertex of the path is the same as the ending vertex of the path, a cycle has been formed. 

Finally, a directed *acyclic* graph has at least one topological ordering of its vertices into a sequence, such that the start vertex of every directed edge occurs earlier in the sequence than the ending vertex of that edge. Further, any graph that has topological ordering cannot have any cycles because the edge into the earliest vertex of the cycle would have to be oriented in the wrong direction. 

Given their mathematical properties, DAGs have been used in a wide range of scientific, computational, biological, and sociological applications. 

Now, we can design our ETL process as a directed acyclic graph $G=(V,E)$ where $V$ is a set of objects or vertices, and $E$ is the set edges or tasks directionally connecting objects. The high-level ETL DAG is summarized in {ref}`etl_dag`.

```{figure} ../images/ETLDAG.png
---
height: 500px
width: 900px
name: etl_dag
---
Extract Transform Load DAG
```

In [3]:
# Imports
# External Modules
import os
import boto3
from botocore.exceptions import NoCredentialsError
import logging
import progressbar
import tarfile
import tempfile
import numpy as np
import numexpr as ne
os.environ['NUMEXPR_MAX_THREADS'] = '24'
os.environ['NUMEXPR_NUM_THREADS'] = '16'
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', 100)
pd.set_option('display.width', 1000)
# Logging Configuration
# ------------------------------------------------------------------------------------------------ #
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------------------------------------ #

In [1]:
# REMOVE-CELL
# Must reset current directory to the project home before importing internal modules
home = "/home/john/projects/DeepCVR/"
os.chdir(home)

In [None]:
# Imports
# Internal Modules
from deepcvr.utils.config import S3Config

In [2]:
# Constants 
S3_BUCKET = 'deepcvr-data'
DIRECTORY_EXTERNAL = "data/external"
DIRECTORY_RAW = 'data/raw'
DIRECTORY_STAGED = 'data/staged'
DIRECTORY_SAMPLE = 'data/sample'
FILEPATH_EXTERNAL_TRAIN = os.path.join(DIRECTORY_EXTERNAL, 'taobao_train.tar.gz')
FILEPATH_EXTERNAL_TEST = os.path.join(DIRECTORY_EXTERNAL, 'taobao_test.tar.gz')
FILEPATH_RAW_TRAIN_CORE = os.path.join(DIRECTORY_RAW,"sample_skeleton_train.csv")
FILEPATH_RAW_TRAIN_COMMON = os.path.join(DIRECTORY_RAW,"common_features_train.csv")
FILEPATH_RAW_TEST_CORE = os.path.join(DIRECTORY_RAW,"sample_skeleton_test.csv")
FILEPATH_RAW_TEST_COMMON = os.path.join(DIRECTORY_RAW,"common_features_test.csv")


## Download Data
Downloading the data from our S3 instance will take approximately 15 minutes on a standard 40 Mbps internet line.

In [4]:
# %load -s S3Downloader deepcvr/data/download.py
class S3Downloader:
    """Download operator for Amazon S3 Resources

    Args:
        bucket (str): The name of the S3 bucket
        destination (str): Director to which all resources are to be downloaded
    """

    def __init__(self, bucket: str, destination: str, force: bool = False) -> None:
        self._bucket = bucket
        self._destination = destination
        self._force = force
        config = S3Config()
        self._s3 = boto3.client(
            "s3", aws_access_key_id=config.key, aws_secret_access_key=config.secret
        )
        self._progressbar = None

    def execute(self) -> None:

        object_keys = self._list_bucket_contents()

        for object_key in object_keys:
            destination = os.path.join(self._destination, object_key)
            if not os.path.exists(destination) or self._force:
                self._download(object_key, destination)
            else:
                logger.info(
                    "Bucket resource {} already exists and was not downloaded.".format(destination)
                )

    def _list_bucket_contents(self) -> list:
        """Returns a list of objects in the designated bucket"""
        objects = []
        s3 = boto3.resource("s3")
        bucket = s3.Bucket(self._bucket)
        for object in bucket.objects.all():
            objects.append(object.key)
        return objects

    def _download(self, object_key: str, destination: str) -> None:
        """Downloads object designated by the object ke if not exists or force is True"""

        response = self._s3.head_object(Bucket=self._bucket, Key=object_key)
        size = response["ContentLength"]

        self._progressbar = progressbar.progressbar.ProgressBar(maxval=size)
        self._progressbar.start()

        os.makedirs(os.path.dirname(destination), exist_ok=True)
        try:
            self._s3.download_file(
                self._bucket, object_key, destination, Callback=self._download_callback
            )
            logger.info("Download of {} Complete!".format(object_key))
        except NoCredentialsError:
            msg = "Credentials not available for {} bucket".format(self._bucket)
            raise NoCredentialsError(msg)

    def _download_callback(self, size):
        self._progressbar.update(self._progressbar.currval + size)


In [5]:
downloader = S3Downloader(bucket=S3_BUCKET, destination=DIRECTORY_EXTERNAL)
downloader.execute()

INFO:botocore.credentials:Credentials found in config file: ~/.aws/config
INFO:__main__:Bucket resource data/external/taobao_test.tar.gz already exists and was not downloaded.
INFO:__main__:Bucket resource data/external/taobao_train.tar.gz already exists and was not downloaded.


## Extract Raw Data
Here, we extract the compressed files into a raw data directory

In [6]:
# %load -s Extractor deepcvr/data/extract.py
class Extractor:
    """Decompresses a gzip archive, stores the raw data

    Args:
        source (str): The filepath to the source file to be decompressed
        destination (str): The destination directory into which data shall be stored.
        filetype (str): The file extension for the uncompressed data
        force (bool): Forces extraction even when files already exist.
    """

    def __init__(self, source: str, destination: str, force: bool = False) -> None:

        self._source = source
        self._destination = destination
        self._force = force

    def execute(self) -> None:
        """Extracts and stores the data, then pushes filepaths to xCom."""
        logger.debug("\tSource: {}\tDestination: {}".format(self._source, self._destination))

        # If all 4 raw files exist, it is assumed that the data have been downloaded
        n_files = len(os.listdir(self._destination))
        if n_files < 4:

            with tempfile.TemporaryDirectory() as tempdir:
                # Recursively extract data and store in destination directory
                self._extract(source=self._source, destination=tempdir)

    def _extract(self, source: str, destination: str) -> None:
        """Extracts the data and returns the extracted filepaths"""

        logger.debug("\t\tOpening {}".format(source))
        data = tarfile.open(source)

        for member in data.getmembers():
            if self._is_csvfile(filename=member.name):
                if self._not_exists_or_force(member_name=member.name):
                    logger.debug("\t\tExtracting {} to {}".format(member.name, self._destination))
                    data.extract(member, self._destination)  # Extract to destination
                else:
                    pass  # Do nothing if the csv file already exists and Force is False

            else:
                logger.debug("\t\tExtracting {} to {}".format(member.name, destination))
                data.extract(member, destination)  # Extract to tempdirectory

    def _not_exists_or_force(self, member_name: str) -> bool:
        """Returns true if the file doesn't exist or force is True."""
        filepath = os.path.join(self._destination, member_name)
        return not os.path.exists(filepath) or self._force

    def _is_csvfile(self, filename: str) -> bool:
        """Returns True if filename is a csv file, returns False otherwise."""
        return ".csv" in filename


In [7]:
extractor = Extractor(source=FILEPATH_EXTERNAL_TRAIN, destination=DIRECTORY_RAW)
filenames = extractor.execute()
os.listdir(DIRECTORY_RAW)

['sample_skeleton_train.csv',
 'sample_skeleton_test.csv',
 'common_features_test.csv',
 'common_features_train.csv']

## Core Dataset Preprocessing
Let's take a preliminary look at the core training dataset.
### Core Raw Training Set

In [18]:
df = pd.read_csv(FILEPATH_RAW_TEST_CORE, header=None, index_col=[0], nrows=10000)
df.head()

Unnamed: 0_level_0,1,2,3,4,5
0,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,0,0,23bd0f75de327c60,14,21691810781.030193516651.020555871431.020683152771.020788010261.070298787552.07...
2,0,0,23bd0f75de327c60,15,20556627321.020683168931.020789873281.0853100205382.53970298966523.5835250893551...
3,0,0,23bd0f75de327c60,12,20683154051.020565395121.030193516651.021692734271.021091004791.021090841271.0...
4,0,0,23bd0f75de327c60,11,50996861712.9957321090689921.020788010261.020683152761.020580106491.02109104804...
5,0,0,543b0cd53c7d5858,11,20683170931.050893553232.6390621090204101.021090452281.021090890731.02109035934...


Here we have: 

| Column | Field                                  |
|--------|----------------------------------------|
| 0      | Sample-id                              |
| 1      | Click Label                            |
| 2      | Conversion Label                       |
| 3      | Common Features Foreign Key            |
| 4      | Number of features in the feature list |
| 5      | Feature List                           |


In [19]:
df = pd.read_csv(FILEPATH_RAW_TRAIN_COMMON, header=None, index_col=0, nrows=100)
df.head()

Unnamed: 0_level_0,1,2
0,Unnamed: 1_level_1,Unnamed: 2_level_1
84dceed2e3a667f8,343,101313191.012534387741.012634387791.012734387821.012838648851.012938648881.015...
0000350f0c2121e7,811,127_1437162241.94591127_1435146270.69315127_1437728710.69315127_1435432831.60944127_...
000091a89d1867ab,7,12534387731.012434387691.012234387611.012134386581.012938648891.012838648851.0...
0001a4114b0ae8bf,231,150_1439166842.3979150_1439407981.07056150_1438923681.6259150_1439146340.55962150_14...
0001def19d7cb335,964,150_1439091500.84715150_1439330134.44265150_1439340833.3322150_1438742584.09988150_1...


Here we have: 

| Column | Field                                  |
|--------|----------------------------------------|
| 0      | Sample-id                              |
| 1      | Click Label                            |
| 2      | Conversion Label                       |
| 3      | Common Features Foreign Key            |
| 4      | Number of features in the feature list |
| 5      | Feature List                           |

# REMOVE-CELL
# References and Notes
Refer to  https://www.netquest.com/blog/en/random-sampling-stratified-sampling for sampling techniques