# Data Engineering Intro
- high level overview

# Tools of data engineer

Database - mysql, postgresql

Data processing - clean, aggregate, join data, scale up processing
- ie. Apache Spark, Hive

Scheduling - plan jobs with specific intervals, resolve dependency requirements of jobs
- Apache Airflow, oozie, simple bash tool like cron

Cloud computing

Cloud service providers - in 2018, AWS 32%, Azure 17%, Google 10%

Services - storage, computation, databases

1. Storage - AWS S3, Azure Blob Storage, Google Cloud Storage
1. Computation - perform calculations, ie. hosting a web server
- AWS EC2, Azure Virtual Machines, Google compute engine
1. Databases (db) - AWS RDS, Azure SQL db, Google Cloud SQL

SQL - Star schema
definition - consists of one or more fact tables referencing any number of dimension tables
- analytical databases like Redshift optimize
- facts = things that happened (ie. product orders)
- dimensions = info on the world (ie. customer information)



# Examples: join

In [None]:
# sample join statment
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "customer_id" = "Customer"."id";




In [None]:
# PostgreSQL - use pandas to query the db using read_sql() fxn
# Complete the SELECT statement
data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name
""", db_engine)

# Show the first 3 rows of the DataFrame
print(data.head(3))

# Show the info of the DataFrame
print(data.info())

In [None]:
# Complete the SELECT statement. Yields df
data = pd.read_sql("""
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "Order"."customer_id"="Customer"."id"
""", db_engine)

# Show the id column of data
print(data.id)

# Parallel computing
- gains - memory and processing power (but mostly memory)
- risks - overhead due to communication
    - task needs to be large
    - need several processing units
    - parallel slowdown = speed does not increase linearly
    

## multiprocessing.Pool API

In [None]:
# example: low level code
from multiprocessing import Pool

# accepts tuple
def take_mean_age(year_and_group):
    year, group = year_and_group
    return pd.DataFrame({"Age": group['Age'].mean()}, index=[year])

with Pool(4) as p:
    results = p.map(take_mean_age, athlete_events.groupby('Year'))
    
results_df = pd.concat(results)

In [None]:
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

'''
Compare processing times based on 1, 2, or 4 cores
Processing time: 922.2915172576904
Processing time: 525.2659320831299
Processing time: 317.6119327545166
'''

## dask = high level API for dataframe abstraction

In [None]:
# example: high level code using dask
import dask.dataframe as dd

# partition dataframe into 4
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# run parallel computations on each partition
results_df = athlete_events_dask.groupby('Year').Age.mean().compute()


## Parallel computing frameworks
- Apache hadoop
    1. HDFS - distributed file system
        - cloud-managed storage systems like Amazon S3 replace HDFS
    1. hadoop MapReduce
        - flaws - hard to write MapReduce jobs
        - Hive addresses this, uses Hive SQL
        - looks like SQL, but converted to MapReduce
    1. Spark
        - more popular choice for data processing
        - advantage - keeps as much processing as possible in memory
        - distributes data processing b/n clusters of computers
        - Spark architecture relies on RDD (resilient distributed datasets). Data distributed b/n multiple nodes. No named columns. Like list of tuples

- Spark
    - transformations: .map(), .filter()
    - actions: .count(), .first()
    - PySpark is the python interface to Spark
        - R or Scala also interface to Spark
    
        

# PySpark
methods
- .printSchema(): helps print the schema of a Spark DataFrame.
- .groupBy(): grouping statement for an aggregation.
- .mean(): take the mean over each group.
- .show(): show the results.

In [None]:
# using athlete_events_spark df
# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age'))

# Group by the Year, and find the mean Age, then show result
print(athlete_events_spark.groupBy('Year').mean('Age').show())

## Running PySpark files


Run a PySpark file using spark-submit. This tool can help you submit your application to a spark cluster.

For the sake of this exercise, you're going to work with a local Spark instance running on 4 threads. The file you need to submit is in /home/repl/spark-script.py. 

Feel free to read the file:
cat /home/repl/spark-script.py

You can use spark-submit as follows:

spark-submit \
  --master local[4] \
  /home/repl/spark-script.py

What does this output? Note that it may take a few seconds to get your results.

In [None]:
# check out file
repl:~$ cat /home/repl/spark-script.py
from pyspark.sql import SparkSession


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    athlete_events_spark = (spark
        .read
        .csv("/home/repl/datasets/athlete_events.csv",
             header=True,
             inferSchema=True,
             escape='"'))

    athlete_events_spark = (athlete_events_spark
        .withColumn("Height",
                    athlete_events_spark.Height.cast("integer")))

    print(athlete_events_spark
        .groupBy('Year')
        .mean('Height')
        .orderBy('Year')
        .show())

In [None]:
# use spark-submit to run a PySpark file

repl:~$ spark-submit \
>   --master local[4] \
>   /home/repl/spark-script.py
Picked up _JAVA_OPTIONS: -Xmx512m
Picked up _JAVA_OPTIONS: -Xmx512m
21/10/27 04:20:15 WARN NativeCodeLoader:Unable to load native-hadoop library foryour platform... using builtin-java classes where applicable
+----+------------------+
|Year|       avg(Height)|
+----+------------------+
|1896| 172.7391304347826|
|1900|176.63793103448276|
|1904| 175.7887323943662|
|1906|178.20622568093384|
|1908|177.54315789473685|
|1912| 177.4479889042996|
|1920| 175.7522816166884|
|1924|174.96303901437372|
|1928| 175.1620512820513|
|1932|174.22011541632315|
|1936| 175.7239932885906|
|1948|176.17279726261762|
|1952|174.13893967093236|
|1956|173.90096798212957|
|1960|173.14128595600675|
|1964|  173.448573701557|
|1968| 173.9458648072826|
|1972|174.56536284096757|
|1976|174.92052773737794|
|1980|175.52748832195473|
+----+------------------+
only showing top 20 rows

None

# Workflow scheduling frameworks

How to schedule a pipeline
- Manually
- cron - Linux scheduling tool
- Spotify's Luigi - allows definition of DAGs for complex pipelines
- Apache Airflow - developed by Airbnb
    - de-facto workflow scheduling framework
    

Example - 1st job for csv file, 2nd job to pull in and clean from an API, 3rd job joins data from csv and API together

DAG = Directed Acyclic Graphs
- Visualize dependencies through DAGs
- set of nodes - each node is a task to execute
- directed edges = dependencies b/n tasks
- no cycles

Airflow example
- First create DAG object using the `DAG` class
- Define operations
    - Then use an Operator to define each of the jobs. Several kinds of operators exist in Airflow. There are simple ones like BashOperator and PythonOperator that execute bash or Python code, respectively. Then there are ways to write your own operator, like the SparkJobOperator or StartClusterOperator in the example.
- Setup dependency flow
    - Finally, we define the connections between these operators using `.set_downstream()`.

## Airflow DAG


In [None]:
# example
# Create the DAG object
dag = DAG(dag_id="example_dag", ...,
          schedule_interval="0 * * * *")

# Task definitions
start_cluster = StartClusterOperator(task_id="start_cluster", dag=dag)
ingest_customer_data = SparkJobOperator(
    task_id="ingest_customer_data", dag=dag)
ingest_product_data = SparkJobOperator(task_id="ingest_product_data", dag=dag)
enrich_customer_data = PythonOperator(task_id="enrich_customer_data", ..., dag=dag)

# Complete the downstream flow
start_cluster.set_downstream(ingest_customer_data)
ingest_customer_data.set_downstream(enrich_customer_data)
ingest_product_data.set_downstream(enrich_customer_data)

First, the DAG needs to run on every hour at minute 0. Fill in the schedule_interval keyword argument using the crontab notation. For example, every hour at minute N would be N * * * *. Remember, you need to run at minute 0.

In [None]:
# Create the DAG object
dag = DAG(dag_id="car_factory_simulation",
          default_args={"owner": "airflow","start_date": airflow.utils.dates.days_ago(2)},
          schedule_interval="0 * * * *")

# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)

# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

# Extract