# What is Data Engineering?

A data engineer is one that develops, constructs, tests, and maintains architectures such as databases and large-scale processing systems.

This usually involves:
- processing large amounts of data
- (consequently) using clusters of machines

Some typical data engineer tasks:
- develop scalable data architecture
- streamline data acquisition
- set up processes to bring together data
- clean corrupt data
- using cloud technology

## Tools of the data engineer

A data engineer's tasks begin and end with databases. Within a business we  typically have (1) databases which support the application and (2) databases which are used for analysis.

Some typical tasks are:
1. Processing (cleaning, aggregating and joining data - since this usually means large amounts of data, it is intrinsically tied to parallel processing frameworks)
2. Scheduling (planning jobs with specific intervals, resolving dependency of jobs)

## Examples of tools

- Databases: MySQL, PostgreSQL
- Processing: Apache Spark, Hive
- Scheduling: Airflow, Oozie, Luigi

## Cloud Providers

For a company to hold their own servers is extremely unpractical: there's many associated costs that come with it such as maintenance, cost of insurance, unused downtime, etc.

It is much more practical to outsource all of this to a company (a cloud provider) who is responsible for all these costs.

Typically cloud providers offer: storage, computation and databases.

## Databases

A database is a usually large collection of data organized especially for rapid search and retrieval. It holds, organizes and searches/retrieves data (through a database management system, or DBMS).

If differs from a filesystem in that it is very organized and includes and is optimised for functionalites such as search, replication, etc.

## Data Structure

Data can generally be structured or unstructured (or in between). Structured data is data with a schema whereas unstructured data could be a video or a photo. In between we have *semi-structured data* such as JSON.

## SQL and NoSQL

We can have either SQL or NoSQL structures.

They differ in a few key points. SQL data is represented by **tables**, with a database schema. They are based in relational databases. 

NoSQL can is always non-relational and can be both structured or unstructured. Two popular types of NoSQL databases are key-value stores (where Redis is the most popular) or document stores (where MongoDB is the most popular choice).

In key-value stores the values are usually simple: use-cases are caching or distributed configuration, for instance.

In document stores, values are usually semi-structured objects like a JSON file.

## Database Schema in SQL

A popular schema is the star schema. This consists of one or more fact tables which reference any number of dimension tables. In this schema, facts are things that happened (e.g. product orders) and dimensions is information about the world (e.g. customer information)

In [None]:
# creating two tables in SQL
CREATE TABLE "Customer" (
    "id" SERIAL NOT NULL,
    "first_name" varchar,
    "last_name" varchar,
    PRIMARY KEY ("id")
);

CREATE TABLE "Order" (
    "id" SERIAL NOT NULL,
    "customer_id" integer REFERENCES "Customer",
    "product_name" varchar,
    "product_price" integer,
    PRIMARY KEY ("id")
);

## Star Schema

Common database schema which consists of one or more **fact tables** which reference n **dimension tables**

- **Facts**: things that happened, such as product orders
- **Dimensions**: information on the world (e.g. customer information)

## Parallel Computing

Parallel computing is fundamental because of **memory** and **processing power**. The amounts of data used usually exceed what a single computer can hold in memory. With parallel computing, we distribute the operations we are working on (e.g. cleaning or transforming data) across a cluster of machines which each execute a portion of the task. The result is then combined.

Pros:
- processing power;
- we can partition the dataset;

Cons:
- task needs to be large;
- communication overhead;
- parallel slowdown (the speed does not increase linearly)

### Implementation

#### Local machine

In [1]:
from multiprocessing import Pool

In [3]:
def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age": group["Age".mean()]}, index=[year])

with Pool(4) as p:
    # apply our function across the groups of a dataframe
    results = p.map(take_mean_age, athlete_events.groupby("Year"))
    
result_df = pd.concat(results)

In [None]:
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

#### Dask

In [8]:
import dask.dataframe as dd

# partition dataframe into 4
athlete_events_dask = dd.from_pandas(athlete_events, npartitions=4)

# run parallel computation on each partition
result_df = athlete_events_dask.groupby("Year").Age.mean().compute()

## Parallel Computing Frameworks

### Background 

Apache Hadoop is a collection of open-source projects maintained by the Apache Software Foundation. The two important ones to discuss initially are **MapReduce** and **HDFS**. 

- **HDFS** is a distributed file system (similar to a normal file system, only files are distributed across multiple machines). It is often replaced now by S3 or Cloud Storage
- **MapReduce** is a Big Data processing paradigm which splits computation across a cluster of machines. The problem with MapReduce was the complexity in writing - this is what led to Hive
- **Hive** is a layer on top of Hadoop ecosystem that makes data from several sources queryable using a Structured Query Language, Hive SQL. MapReduce was the implmeentaion originally but now it works across a bunch of tools. It was initially developed by Facebook.

In [None]:
# Hive SQL sample
SELECT year, AVG(age)
FROM views.athlete_events
GROUP BY year

# looks like re0gular SQL but under the hood the query is transformed into a job which can run across a cluster of
# computers

- **Spark** distributes data over a cluster of computers. In MapReduce-based systems tend to need expensive disk writes between jobs, Spark tries to keep as much as possible in memory. 
    - It relies on Resilient Distributed Datasets or RDDs.
    - RDDs are conceptually like a list of tuples
    - We can apply two types of operations on RDDs: **transformations** (`.map()` or  `.filter()`) or **actions** (`.count()` or `.first()`)
    - **Transformations** results in transformed RDDs while **actions** result in a single value
    - When working with RDDs we tipically used a programming language interface like PySpark
        - PySpark uses a DataFrame abstraction
        - Looks similar to pandas

## PySpark Example

In [None]:
# assuming existence of athlete_events_spark, a Spark DataFrame

# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean("Age"))

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean("Age").show())

## Running PySpark locally

In [10]:
## /home/repl/spark-script.py
if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    athlete_events_spark = (spark
        .read
        .csv("/home/repl/datasets/athlete_events.csv",
             header=True,
             inferSchema=True,
             escape='"'))

    athlete_events_spark = (athlete_events_spark
        .withColumn("Height",
                    athlete_events_spark.Height.cast("integer")))

    print(athlete_events_spark
        .groupBy('Year')
        .mean('Height')
        .orderBy('Year')
        .show())

In [None]:
spark-submit --master local[4] /home/repl/spark-script.py

## Workflow Scheduling Frameworks

Suppose we have a simple Spark Job which:
- pulls data from a CSV file;
- filters out some corrupt records;
- loads data into a SQL database, ready for analysis

We need a scheduler to ensure this flow works properly and in the right dependencies (CRON is insufficient because it does not express precedence of tasks). 

This is usually achieved by a Directed Acyclic Graph (DAG) where each node is a task and the edges connecting them represent the order in which the process should run.

### Example frameworks
 
- Luigi (Spotify)
- Airflow (Airbnb)

### Airflow Sample

In [None]:
# Create the DAG object
dag = DAG(
    dag_id="car_factory_simulation",
    default_args={
        "owner": "airflow",
        "start_date": airflow.utils.dates.days_ago(2)
    },
    schedule_interval="0 * * * *"
)

# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)

# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

# ETL
## Extract

### Files

Data processing tasks start which extracting some data from a source. Sources can be split into:
- unstructured data (plain text)
- flat files (rows are records and columns are attributes, like a `.csv`)
- semi-structured (e.g. JSON)

JSON consists of four atomic data types:
1. number
2. string
3. boolean
4. null

and two composite data types:
1. array
2. object

### APIs

Data moves on the Web through **requests**. When we go to a website we request data which is returned for us by a server and rendered in our browser. Some servers return data to be read by humans and some return data which is destined to be used in programs. These are APIs and they typically return data in a  JSON format.

#### Example code

In [12]:
import requests

response = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
print(response.json())

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}


### Databases

Most common way to extract data. They can be either:

- Applications databases:
    - Transactions
    - Inserts or changes
    - OLTP (online transaction processing)
    - Row-oriented
- Analytical databases:
    - OLAP (online analytical processing)
    - Column-oriented

To connect to a database we **always need a connection string (or URI)**. This looks like:

In [None]:
postgresql://[user[:password]@[host][:port]]

#### Python 

In [None]:
import sqlalchemy

connection_uri = "postgresql://repl:password@localhost:5432/pagila"
db_engine = sqlalchemy.create_engine(connection_uri)

import pandas as pd

pd.read_sql("SELECT * FROM customer", db_engine)

## Transform

### Python

A typical transformation in Python would be to split a string column containing a customer's name into first and last name, for example.

In [None]:
# Get the rental rate column as a string
rental_rate_str = film_df.rental_rate.astype("str")

# Split up and expand the column
rental_rate_expanded = rental_rate_str.str.split(".", expand=True)

# Assign the columns to film_df
film_df = film_df.assign(
    rental_rate_dollar=rental_rate_expanded[0],
    rental_rate_cents=rental_rate_expanded[1],
)

### PySpark

In [None]:
# first read from a database
spark.read.jdbc(
    "jdbc:postgresql://localhost:5432/pagila",
    "customer",
    {
        "user":"repl",
        "password":"password"
    }
)

In [None]:
# Use groupBy and mean to aggregate the column
ratings_per_film_df = rating_df.groupBy('film_id').mean('rating')

# Join the tables using the film_id column
film_df_with_ratings = film_df.join(
    ratings_per_film_df,
    film_df.film_id==ratings_per_film_df.film_id
)

# Show the 5 first results
print(film_df_with_ratings.show(5))

## Loading

We've seen there are two types of databases which are typically used: analytics and application databases.

### Analytics

- Column-oriented
- Optimal for analytics which are queries about a subset of columns, generally
- Also lent themselves better to parallelization

### Applications

- Row-oriented which means we store data per record
- Makes it easier to add new rows in small transactions (e.g. getting a customer record is easy and fast)

### MPP

- Massively Parallel Processing Databases
- Often the target at the end of an ETL process
- Column-oriented databases optimised for analytics that run in a distributed fashion
- Queries are split into subtasks and executed across a cluster of nodes

#### Redshift

- Example of an MPP
- To load data into Redshift a good way is to write files to S3 and then send the copy query to Redshift
- `.parquet` is a typical file type to use here

In [None]:
# simple code snippet to load data into Redshift

# pandas
df.to_parquet("./s3://path/tp/bucket/customer.parquet")
# pyspark
df.write.parquet("./s3://path/tp/bucket/customer.parquet")

In [None]:
COPY customer
FROM "./s3://path/tp/bucket/customer.parquet"
FORMAT as parquet

#### PostgreSQL

In [None]:
# transformation on data
recommendations = transform_find_recommendations(ratings_df)

# load into postgresql database
recommendations.to_sql(
    "recommendations",
    db_engine,
    schema="store"
    if_exists="replace"
)

## ETL - Putting it all together

ETL should be encapsulted into a clean ETL function. 

In [None]:
def extract_table_to_df(tablename, db_engine):
    return pd.read_sql(f"SELECT * FROM {tablename}", db_engine)

def split_columns_transform(df, column, pattern, suffixes):
    # converts column in str and splits it on a pattern 

def load_df_into_dwh(film_df, tablename, schema, db_engine):
    return pd.to_sql(tablename, db_engine, schema=schema, if_exists="replace")

db_engines = { ... } # needs to be configured

def etl():
    # Extract
    film_df = extract_table_to_df("film", db_engines["store"])
    # Transform
    film_df = split_columns_transform(film_df, "rental_rate", ".", ["dollar", "_cents"])
    # Load
    load_df_into_dwh(film_df, "film", "store", db_engines["dwh"])

### Airflow

Recall:

- Workflow scheduler
- Python
- DAGs
- Tasks are defined in operators