# Data Acquisition
Modeling the state-of-the-art from the 1970s, we'll obtain the data using the standard extract, transform and load (ETL) design pattern.  This will require a Criteo data source configuration, a series of tasks to perform the ETL operations, a pipeline to orchestrate the process, and a workspace into which the output Dataset object will be loaded. We'll import them now and introduce the modules enroute.

In [1]:
# IMPORTS
from myst_nb import glue
import pandas as pd
pd.set_option('display.width', 1000)
pd.options.display.float_format = '{:,.2f}'.format

from cvr.core.dataset import DatasetRequest
from cvr.core.workspace import Workspace, WorkspaceManager
from cvr.utils.config import CriteoConfig 
from cvr.data.etl import Extract, TransformETL, LoadDataset
from cvr.core.pipeline import DataPipeline, DataPipelineBuilder, DataPipelineRequest


## Workspace
Before we set up the ETL pipeline, we will need to establish a workspace for it. A Workspace object is essentially a container for Datasets, Models, and Experiments. Singleton class WorkspaceManager allows us to create and manage Workspace objects. 

In [2]:
# REMOVE
# wsm = WorkspaceManager()
# wsm.delete_workspace('Trieste')

In [3]:
wsm = WorkspaceManager()
vesuvio = wsm.create_workspace(name="Vesuvio", description="Vesuvio's: Best Irish Coffee in the Neighborhood", current=True)

Workspace Trieste has been created and is now the 'current' workspace. Datasets, Models, and Experiments will be contained within this workspace until another workspace is created and/or set current.

## Data Source
The CriteoConfig object packages the URL, file structure, and local destination file path information for the Criteo data source. For illustrative purposes, the CriteoConfig on this machine is shown below.

In [4]:
config = CriteoConfig()
config.print()



                        Criteo Data Source Configuration                        
                        ________________________________                        
                         name : Criteo Sponsored Search Conversion Log Dataset
                          url : http://go.criteo.net/criteo-research-search-conversion.tar.gz
                  destination : data\external\criteo.tar.gz
             filepath_extract : Criteo_Conversion_Search/CriteoSearchData
                 filepath_raw : raw\criteo.csv
                          sep : \t
                      missing : -1


## Data Pipeline Steps
Our pipeline consists of three steps, Extract, TransformETL and LoadDataset described below. 

| Step | Module       | Description                                                                     |
|------|--------------|---------------------------------------------------------------------------------|
| 1    | Extract      | Downloads the source data into a local raw data directory                       |
| 2    | TransformETL | Transforms the raw data into a Dataset object and performs basic preprocessing. |
| 3    | LoadDataset  | Loads the Dataset object into our current workspace.                            |

The Extract step takes the CriteoConfig data source configuration object as input and produces our raw data. Transform ETL performs two basic preprocessing a priori based upon the description of the data provided by Criteo Labs. First, we convert the missing values indicator (-1) to NaNs. Second, we convert the categorical variables to the pandas' category data type for computation and space efficiency purposes. The pipeline tasks are instantiated below. Finally, LoadDataset loads the preprocessed Dataset object into our current workspace.

In [5]:
extract = Extract(datasource_config=config, chunk_size=20)
transform=TransformETL(value=[-1,"-1"])
load = LoadDataset()

## Data Pipeline Request
Specifying the parameters for the Pipeline and its resultant Dataset is performed via request objects. 

In [6]:
dataset_request = DatasetRequest(name="criteo", 
                                 description="Criteo Preprocessed", 
                                 stage="preprocessed", 
                                 sample_size=None,
                                 )
pipeline_request = DataPipelineRequest(name="etl", 
                                       stage="preprocessed", 
                                       workspace=vesuvio,
                                       random_state=602, 
                                       force=True, 
                                       logging_level='info',
                                       dataset_request=dataset_request
                                       )

## Data Pipeline Builder
Now, we pass our request to the DataPipelineBuilder object, add the tasks, and call the build method. The pipeline is provided via a property on the builder.

In [7]:
builder = DataPipelineBuilder()
builder.make_request(pipeline_request) 
builder.add_task(extract)
builder.add_task(transform)
builder.add_task(load)
builder.build()
pipeline = builder.pipeline


## ETL Pipeline Execution
The dataset is approximately 6.5 GB; making this ETL a network and IO intensive process. Estimated processing time: 12 minutes. 

In [8]:
dataset = pipeline.run()
pipeline.summary

Started etl
	Decompression initiated.
	Decompression Complete! 6129.08 Mb Extracted.
Completed etl




                              Pipeline etl Summary                              
                              ____________________                              
           Task                      Start                        End  Minutes   Status
0       Extract 2022-01-27 08:34:14.678481 2022-01-27 08:41:03.244730     6.81  200: OK
1  TransformETL 2022-01-27 08:41:03.247735 2022-01-27 08:41:18.829615     0.26  200: OK
2   LoadDataset 2022-01-27 08:41:18.833617 2022-01-27 08:41:41.775193     0.38  200: OK


Our pipeline appears to have run successfully. Let's check the task summaries.

In [9]:
xsum = extract.summary



                              Extract Task Summary                              
                       Dataset: Etl / Preprocessed Stage                        
                       _________________________________                        
                    Size Extracted (Mb) : 6,129.08
                                  Start : 2022-01-27 08:34:14.678481
                                    End : 2022-01-27 08:41:03.244730
                               Duration : 0:06:48.566249
                                 Status : 200: OK
                            Status Date : 2022-01-27
                            Status Time : 08:41:03


In [10]:
# GLUE
_ = glue("etl_downloaded",xsum.get("Content Length (Mb)",1910.081), display=False)
_ = glue("etl_chunk_size", xsum.get("Chunk Size (Mb)", 20), display=False)
_ = glue("etl_chunks_downloaded", xsum.get("Chunks Downloaded",97), display=False)
_ = glue("etl_speed", xsum.get("Mbps",5.102), display=False)
_ = glue("etl_size", xsum.get("Size Extracted (Mb)",6129.08), display=False)
if 'Duration' in xsum.keys():
    _ = glue("etl_duration", round(xsum["Duration"].total_seconds() / 60,2), display=False)
else:
    _ = glue("etl_duration",17.38)

From this we see that we've downloaded {glue:}`etl_downloaded` Mb in about {glue:}`etl_duration` minutes in {glue:}`etl_chunks_downloaded`  {glue:}`etl_chunk_size` Mb chunks with an average throughput of {glue:}`etl_speed` Mbps. Next, we have the transform step.

In [11]:
_ = transform.summary



                           TransformETL Task Summary                            
          Missing Values Replacement: Dataset Etl / Preprocessed Stage          
          ____________________________________________________________          
                       Before     After
sale                        0         0
sales_amount                0  14262913
conversion_time_delay       0  14268293
click_ts                    0         0
n_clicks_1week              0   6744427
product_price               0         0
product_age_group           0  11760058
device_type                 0      3032
audience_id                 0  11502018
product_gender              0  11654195
product_brand               0   7241560
product_category_1          0   6142756
product_category_2          0   6151249
product_category_3          0   7342676
product_category_4          0  10494308
product_category_5          0  14595054
product_category_6          0  15717150
product_category_7          0  1599

Replacing the missing value indicators with NaNs will simplify data processing and analysis. It does reveal; however, signficant data sparsity. Notably, diversity and sparsity in observations are common challenges in marketing and customer analytics. 

Lastly, we have the load step.

In [12]:
_ = load.summary



                            LoadDataset Task Summary                            
                       Dataset: Etl / Preprocessed Stage                        
                       _________________________________                        
                             AID : preprocessed_criteo_012722
                       Workspace : Vesuvio
                    Dataset Name : criteo
                           Stage : preprocessed
                        filepath : workspaces\Vesuvio\datasets\preprocessed\dataset_preprocessed_preprocessed_criteo_012722.pkl
                           Start : 2022-01-27 08:41:18.833617
                             End : 2022-01-27 08:41:41.775193
                        Duration : 0:00:22.941576
                          Status : 200: OK
                     Status Date : 2022-01-27
                     Status Time : 08:41:41


We'll note the name and stage for this dataset, which we will use to obtain the Dataset object from the workspace. Before closing this section, we'll demonstrate how an object can be stored and retrieved from our current workspace, 'vesuvio'.

In [13]:
dataset = vesuvio.get_dataset(name='criteo', stage='preprocessed')
dataset.info()



                                 Dataset criteo                                 
                                 ______________                                 
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 15995634 entries, 0 to 15995633
Data columns (total 23 columns):
 #   Column                 Non-Null Count     Dtype   
---  ------                 --------------     -----   
 0   sale                   15995634 non-null  category
 1   sales_amount           1732721 non-null   float64 
 2   conversion_time_delay  1727341 non-null   float64 
 3   click_ts               15995634 non-null  float64 
 4   n_clicks_1week         9251207 non-null   float64 
 5   product_price          15995634 non-null  float64 
 6   product_age_group      4235576 non-null   category
 7   device_type            15992602 non-null  category
 8   audience_id            4493616 non-null   category
 9   product_gender         4341439 non-null   category
 10  product_brand          8754074 non-null   ca

Viola! This closes the data acquisition portion of this series. In the next section, we will get our first glimpses of the data from a profiling and data quality perspective.