# ETL


## Overview

- What is it?
- Types of ETL
  -  Batch
  -  Realtime


## What is it?

The bright future of decision making has been, and still is making decisions using data; not solely trusting human intuition.
Analysts, scientists and statisticians have a problem, though. They want to understand data: but data is almost always inconsistent,
corrupted, missing, or just plain invalid.

That's because people are involved in data collection most of the time.

However, as we hear over and over again: 

- "You can have data without information, but you cannot have information without data." - Daniel Keys Moran

The job of an Extraction Transformation and Loading (ETL) system is to try and homogenize those data into a consistent
format so the data can be compared.

It's much like a body's digestive system. It digests information into its constituent parts, orders what it can for use and 
discards the rest. As data engineers, you're the plumbers for your organization's GI tracts.

### You're already practiced

Already you know something about ETL. Even in your first classes you were loading data into the database using the `COPY FROM CSV` command.

You were doing ETL there! Admittedly it was a very simple workflow -- most of the work was being doing in the database, but ETL is a continuum.


### Extraction

This is where we take information in one format and pull out the bits that are useful to our purpose.

e.g. Pulling certain attributes out of a JSON object result from an API call.

### Transformation

Taking those extracted data, and putting them into whatever format we desire, correcting incorrect values where possible, possibly annotating related
information into the same destination format.

e.g. Putting the selected JSON attributes into a Protobuffer, adding identifier annotations to data in other systems.


#### Loading

Putting your data into a database for later analysis.

e.g. psql -c \COPY your_table FROM 'your_file.csv' CSV


## Types of ETL

### Batch

This is in many ways the simplest way to construct a system, and how many of the highest performance ETL systems organize their work.

One downside is that up-to-date information is only available after each batch is run.

### Realtime

This system means that you continuously update your database(s) as new information comes into your system. It's a good choice
when the requirement is that your system's information must be close to real-time.

One downside is that this is a more difficult system to scale as your data size and frequency increase.


In [1]:
### Imports

import collections
import random

import numpy as np
import pandas

from functools import wraps

In [2]:
### Data Vars

columns_headers = []
num_rows = 10


In [3]:
### Decorators

def destroy_percent(percent, value):
    """Will corrupt, destoy or mangle a percentage of whatever data your wrapped function returns."""
    def decorator(func):
        @wraps(func)
        def _wrapped(*args, **kwargs):
            ret_val = func(*args, **kwargs)
            if isinstance(ret_val, collections.Iterable):
                changed_values = {}
                for idx, item in enumerate(ret_val):
                    if random.randint(0, 100) < percent:
                        changed_values[idx] = item
                        
                for change_idx, item in changed_values.items():
                    if callable (value):
                        ret_val[change_idx] = value(item)
                    else:
                        ret_val[change_idx] = value
                    
                return ret_val
                        
            # if we're a regular scalar, just replace our return value a random percent of the time.
            if (random.randint(0, 100) < percent):
                if callable(value):
                    return value(ret_val)
                return value
            else:
                return ret_val
            
        return _wrapped 

    return decorator

In [4]:
### Finite Data
states = ['OR', 'WA', 'CA', 'ID']
state_initial_pops = {state : random.randint(10, 400) for state in states}
BAD_CONTINUOUS_DATA_VALUES = [-1, None, 0, 45.3]

def bad_data(*args, **kwargs):
    return random.choice(BAD_CONTINUOUS_DATA_VALUES)

def capitalize(input):
    """returns list of single letter that is captialized."""
    return [input.capitalize()]

def insert_space(input):
    """returns list of a letter and a space character"""
    return [input, ' ']

string_transforms = [capitalize, insert_space]

def randomize_string(input, percent=5):
    output = []
    letters = input.split()
    for letter in input:
        out_letter = [letter]
        if random.randint(0, 100) < percent:
            out_letter = random.choice(string_transforms)(letter)
        output.extend(out_letter)
        
    return ''.join(output)
    

In [5]:
### Continuous Data

def get_population(mean, sigma, num_years):
    return np.random.normal(mean, sigma, num_years)

@destroy_percent(30, None)
def get_pop_30_nan(current, sigma, num_years):
    return get_population(current, sigma, num_years)

@destroy_percent(50, bad_data)
def get_pop_50_bad(current, sigma, num_years):
    return get_population(current, sigma, num_years)

def get_average_annual_income(current, sigma, num_years):
    return np.random.normal(current, sigma, num_years)

@destroy_percent(2, bad_data)
def get_monthly_income(current, sigma, num_years):
    return get_average_annual_income(current, sigma, num_years * 12)
    

In [6]:
num_years = 4
simple_data = [
    {
        'state': state,
        'population': get_pop_50_bad(
            state_initial_pops[state], random.randint(0, 40), num_years
        ),
        'income': get_average_annual_income(40, 7, num_years)
    }
    for state in states
]

In [7]:
simple_data

[{'income': array([ 35.25827653,  27.50968443,  38.1298767 ,  40.84193408]),
  'population': array([ 402.90631553,  305.19917819,  351.37905321,   45.3       ]),
  'state': 'OR'},
 {'income': array([ 39.31116694,  25.97865363,  42.06570704,  40.68232409]),
  'population': array([ 125.19677103,  124.86388795,  125.33855318,           nan]),
  'state': 'WA'},
 {'income': array([ 33.99095615,  44.83983402,  44.89549257,  29.57446891]),
  'population': array([ 45.3       ,  45.3       ,  35.39742749,  35.61326458]),
  'state': 'CA'},
 {'income': array([ 42.18199548,  46.1540899 ,  37.5492348 ,  31.99521674]),
  'population': array([ 45.3       ,   4.3755346 ,          nan,  44.78070737]),
  'state': 'ID'}]

In [8]:
# Interpolation of missing data
# Sometimes this is pretty straight forward, esp. for missing data

pandas.Series(simple_data[0]['population'])

0    402.906316
1    305.199178
2    351.379053
3     45.300000
dtype: float64

In [9]:
pandas.Series(simple_data[0]['population']).interpolate()

0    402.906316
1    305.199178
2    351.379053
3     45.300000
dtype: float64

In [10]:
### Data Exploration

### Cleaning Topics
# 


In [11]:
def is_int(value):
    return isinstance(value, int)

def is_float(value):
    return isinstance(value, float)

def is_array(value):
    return isinstance(value, list)

In [12]:
is_int(5.0)

False

In [13]:
is_float(5)

False

In [14]:
is_array(['one','two','three'])

True

In [47]:
### Exercise
## Build an ETL pipeline for our Simple Data

def extract(uncleaned_data):
    pass

def transform(untransformed_data):
    #List of dicts, some of values are lists
    # [ {'key': [value1, value2]}]
    return_data = []
    for data in untransformed_data:
        return_dict = {}
        for key, value in data.items():
            if key == 'population':
                ret_value = []
                for item in value:
                    if is_int(item):
                        ret_value.append(item)
                return_dict['population'] = ret_value
            elif key == 'income':
                ret_value = []
                for item in value:
                    if is_float(item):
                        ret_value.append(item)
                return_dict['income'] = ret_value
            elif key == 'state':
                ret_value = []
                for item in value:
                    if 
                
            return_data.append(return_dict)
    return return_data

def load(unloaded_data):
    """Here let's just return a format that can be converted into a CSV with headers easily,
    a list of dictionaries would do nicely.
    """
    pass

In [48]:
pandas_data = pandas.DataFrame(simple_data)

In [49]:
pandas_data

Unnamed: 0,income,population,state
0,"[35.2582765341, 27.509684428, 38.1298766986, 4...","[402.906315526, 305.199178187, 351.37905321, 4...",OR
1,"[39.3111669356, 25.978653626, 42.0657070399, 4...","[125.196771028, 124.863887946, 125.338553182, ...",WA
2,"[33.9909561523, 44.8398340178, 44.8954925665, ...","[45.3, 45.3, 35.3974274944, 35.6132645771]",CA
3,"[42.1819954778, 46.1540898977, 37.5492347996, ...","[45.3, 4.3755345957, nan, 44.7807073695]",ID


In [50]:
transformed_data = pandas.DataFrame(transform(simple_data))

In [None]:
# pandas_data.applymap(function_call_here)

In [51]:
transformed_data

Unnamed: 0,income,population
0,"[35.2582765341, 27.509684428, 38.1298766986, 4...",[]
1,"[35.2582765341, 27.509684428, 38.1298766986, 4...",[]
2,"[35.2582765341, 27.509684428, 38.1298766986, 4...",[]
3,"[39.3111669356, 25.978653626, 42.0657070399, 4...",[]
4,"[39.3111669356, 25.978653626, 42.0657070399, 4...",[]
5,"[39.3111669356, 25.978653626, 42.0657070399, 4...",[]
6,"[33.9909561523, 44.8398340178, 44.8954925665, ...",[]
7,"[33.9909561523, 44.8398340178, 44.8954925665, ...",[]
8,"[33.9909561523, 44.8398340178, 44.8954925665, ...",[]
9,"[42.1819954778, 46.1540898977, 37.5492347996, ...",[]


In [20]:
advanced_data = [
    {
        randomize_string('state', 50): randomize_string(state),
        randomize_string('population', 25): get_pop_50_bad(
            state_initial_pops[state], random.randint(0, 40), num_years
        ),
        randomize_string('income', 40): get_average_annual_income(40, 7, num_years)
    }
    for state in states
]

In [21]:
advanced_data

[{'Inco me': array([ 38.66553355,  51.80708372,  34.41147694,  35.06690257]),
  'PopulatioN': array([ 326.14402675,  323.59766333,  319.49612417,  328.65553744]),
  'statE': 'OR'},
 {'i ncOMe': array([ 44.38513231,  37.0406365 ,  27.91603891,  51.5343365 ]),
  'population': array([ 151.05127282,   -1.        ,  105.17106141,           nan]),
  'state': 'WA'},
 {'St ATe': 'CA',
  'iNc omE': array([ 24.35489797,  50.22909709,  37.02543426,  55.86710196]),
  'pOPulAtion': array([  0.        ,  16.00621388,  15.72410031,  10.82873164])},
 {'income ': array([ 37.18917482,  29.5574099 ,  42.59073476,  29.75724132]),
  'populAtion': array([ 41.8851978 ,  45.3       ,          nan,  50.44604077]),
  's tat e': 'ID'}]

In [22]:
# Normalizing Strings
from difflib import SequenceMatcher

all_column_names = [sorted(item.keys()) for item in advanced_data]
first_column_names = [item[0] for item in all_column_names]

def get_column_similarities(list_of_columns):
    ratios = []
    for name in list_of_columns:
        # Going through our list of column_names and comparing it with the next one in the list.
        if list_of_columns.index(name) + 1 < len(list_of_columns):
            ratios.append((name, SequenceMatcher(
                        None, name, list_of_columns[list_of_columns.index(name) + 1]
                    ).ratio()))
        else:
            ratios.append((name, 0.0))
    return ratios

my_ratios = get_column_similarities(first_column_names)
un_sorted = get_column_similarities([item.keys()[0] for item in advanced_data])

In [23]:
my_ratios

[('Inco me', 0.42857142857142855),
 ('i ncOMe', 0.3076923076923077),
 ('St ATe', 0.15384615384615385),
 ('income ', 0.0)]

In [24]:
un_sorted

[('Inco me', 0.16666666666666666),
 ('state', 0.36363636363636365),
 ('St ATe', 0.46153846153846156),
 ('s tat e', 0.0)]

In [25]:
###Exercise
## Build and ETL pipleine for our Advanced Data

def extract(uncleaned_data):
    pass

def transform(untransformed_data):
    pass

def load(unloaded_data):
    """Here let's just return a format that can be converted into a CSV with headers easily,
    a list of dictionaries would do nicely.
    """
    pass
