# DATA INGESTION PIPELINE

The main objective of this project is the development of a Data Ingestion Pipeline.

There are 2 files. "config.yaml", where you will find the basic validations for incoming data. "functions.py", where are defined the functions needed to do the validations we mentioned.

## Dataset

FILE NAME: "survival_data.csv"

SIZE: "6,53 GB"

COLUMNS: "8"

ROWS: "88809774"  

TOPIC: "Life Insurance Company Clients"

SOURCE: "https://www.kaggle.com/louise2001/survival-analysis-synthetic-data?select=survival_data.csv"

### Needed libraries

In [18]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import dask.dataframe 


### We are going to variabilize de the configuration file path

In [19]:
config_path= r'/Users/vicentesolorzano/Desktop/DATA_GLACIER/WEEK_6_DATA_INGESTION_PIPELINE/DATA_INGESTION_PIPELINE_W6'

### Importation of the functions file ("functions.py")

In [20]:
import functions as fxx

### Configurations file

In [21]:
config_file = fxx.load_config_file('config.yaml',config_path)
config_file

{'data_directory': '/Users/vicentesolorzano/Desktop/DATA_GLACIER/WEEK_6_DATA_INGESTION_PIPELINE/DATA_INGESTION_PIPELINE_W6/data',
 'data_name': 'survival_data.csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'columns_names': ['client_id',
  'age_start_observed',
  'age_end',
  'is_truncated',
  'is_censored',
  'is_dead',
  'date_start_observed',
  'date_end_observed'],
 'drop_columns': ['date_start_observed', 'date_end_observed'],
 'data_type': {'client_id': 'int32',
  'age_start_observed': 'int8',
  'age_end': 'int8',
  'is_truncated': 'bool',
  'is_censored': 'bool',
  'is_dead': 'bool',
  'date_start_observed': None,
  'date_end_observed': None},
 'data_clean_folder': '/Users/vicentesolorzano/Desktop/DATA_GLACIER/WEEK_6_DATA_INGESTION_PIPELINE/DATA_INGESTION_PIPELINE_W6/data_clean',
 'clean_data': 'data_clean.gzip'}

### Dataset

In [22]:
data = fxx.load_data(config_file)

Data Uploaded from /Users/vicentesolorzano/Desktop/DATA_GLACIER/WEEK_6_DATA_INGESTION_PIPELINE/DATA_INGESTION_PIPELINE_W6/data  name =  survival_data.csv


In [23]:
data.head(3)


Unnamed: 0.1,Unnamed: 0,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10
2,24774171,0.0,33.071552,False,True,False,1911-02-07,1944-03-04


### Rename columns with defined names in config file

In [24]:
fxx.rename_columns(data,config_file)

Unnamed: 0,client_id,age_start_observed,age_end,is_truncated,is_censored,is_dead,date_start_observed,date_end_observed
0,15113102,0.0,9.097335,False,True,False,1908-11-17,1917-12-22
1,41505894,0.0,64.486689,False,True,False,1828-09-13,1893-03-10


### Headers validation

In [25]:
fxx.headers_validation(data,config_file)

Columns headers validation passed


1

### Drop irrelevant features

In [26]:
len(data.columns)

8

In [27]:
data.dtypes

client_id                int64
age_start_observed     float64
age_end                float64
is_truncated              bool
is_censored               bool
is_dead                   bool
date_start_observed     object
date_end_observed       object
dtype: object

In [28]:
data = fxx.drop_irrelevant(data, config_file)

In [29]:
len(data.columns)

6

In [31]:
data.columns

Index(['client_id', 'age_start_observed', 'age_end', 'is_truncated',
       'is_censored', 'is_dead'],
      dtype='object')

### Memory optimization

In [32]:
fxx.memory_usage_optimization(data,config_file)

Unnamed: 0_level_0,client_id,age_start_observed,age_end,is_truncated,is_censored,is_dead
npartitions=102,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int32,int8,int8,bool,bool,bool
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [33]:
data.head(2)

Unnamed: 0,client_id,age_start_observed,age_end,is_truncated,is_censored,is_dead
0,15113102,0,9,False,True,False
1,41505894,0,64,False,True,False


### Missing values

In [34]:
data = fxx.drop_missing_values(data)
data

Unnamed: 0_level_0,client_id,age_start_observed,age_end,is_truncated,is_censored,is_dead
npartitions=102,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,int32,int8,int8,bool,bool,bool
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


### Saving clean data 

In [35]:
fxx.save_clean_data(data,config_file)

('Data saved as ', 'data_clean.gzip')