# AI is at the forefront of many companies effort to drive innovation and growth.
But to get started with any of these, we need data to be in the right place, in the right shape, at the right time.

ETL (extract and transform before loading) :
- traditional and common way
- tabular and non-tabular data
- Python `pandas`

ELT (extract and load before transforming) 🇰
- more recent
- data warehouse
- tabular data

# ETL pipeline:
Using three custom-built Python functions to extract, transform, and load data. The load function, which writes a pandas DataFrame to a SQL database. Then, the extract, transform, and load functions are called to execute the ETL pipeline.

In [None]:
def load(data_frame, target_table):
  # Some custom-built Python logic to load data to `SQL`
  data_frame.to_sql(name=target_table, con=POSTGRES_CONNECTION)
  print(f"Loading data to the {target_table} table")

# Now, run the data pipeline
extracted_data = extract(file_name="raw_data.csv")
transformed_data = transform(data_frame=extracted_data)
load(data_frame=transformed_data, target_table="cleaned_data")

In [None]:
# Extract data from the raw_data.csv file
extracted_data = extract(file_name="raw_data.csv")

# Transform the extracted_data
transformed_data = transform(data_frame=extracted_data)

# Load the transformed_data to cleaned_data.csv
load(data_frame=transformed_data, target_table="cleaned_data")

In [None]:
def load(data_frame, file_name):
  # Write cleaned_data to a `CSV` using file_name
  data_frame.to_csv(file_name)
  print(f"Successfully loaded data to {file_name}")

extracted_data = extract(file_name="raw_data.csv")

# Transform extracted_data using transform() function
transformed_data = transform(data_frame=extracted_data)

# Load transformed_data to the file transformed_data.csv
load(data_frame=transformed_data, file_name="transformed_data.csv")


In [None]:
# Complete building the transform() function
def transform(source_table, target_table):
  data_warehouse.execute(f"""
  CREATE TABLE {target_table} AS
      SELECT
          CONCAT("Product ID: ", product_id),
          quantity * price
      FROM {source_table};
  """)

extracted_data = extract(file_name="raw_sales_data.csv")
load(data_frame=extracted_data, table_name="raw_sales_data")

# Populate total_sales by transforming raw_sales_data
transform(source_table="raw_sales_data", target_table="total_sales")

In [None]:
# Read a CSV with a path stored using file_name into memory
def extract(file_name):
  return pd.read_csv(file_name)

# Filter the data_frame to only incude a subset of columns
def transform(data_frame):
  return data_frame.loc[:, ["industry_name", "number_of_firms"]]

# Write the data_frame to a CSV
def load(data_frame, file_name):
  data_frame.to_csv(file_name)

extracted_data = extract(file_name="raw_industry_data.csv")
transformed_data = transform(data_frame=extracted_data)

# Pass the transformed_data DataFrame to the load() function
load(data_frame=transformed_data, file_name="number_of_firms.csv")s

# ELT pipeline:


In [None]:
def transform (source_table, target_table):
data_warehouse.run_sql("""
  CREATE TABLE {target_table} AS
    SELECT
      <field-name>, <field-name>,
    FROM {source_table};
"""

# Similar to ETL pipelines, call the extract, load, and transform functions
extracted_data = extract(file_name="raw_data.csv")
load(data_frame=extracted_data, table_name="raw_data")
transform(source_table="raw_data", target_table="cleaned_data")

# Extracting data from structured sources
Almost all data pipelines are initiated by extracting data from a source system. Most common source systems are:
- CSV, parquet*, and JSON files (*Apache Parquet is an open-source, column-oriented file format designed for efficient field storage and retrieval)
- dynamic stores such as SQL databases
- APIs, which are commonly used to ingest data from a third party
- within organizations, data lakes and warehouses are popular source systems for data pipelines
- in more advanced pipelines, it's even common to web scrape the data.

In [None]:
import pandas as pd

# Read the sales data (parquet file) into a DataFrame
sales_data = pd.read_parquet("sales_data.parquet", engine="fastparquet")

# Check the data type of the columns of the DataFrames
print(sales_data.dtypes)

# Print the shape of the DataFrame, as well as the head
print(sales_data.shape)
print(sales_data.head())

In [None]:
# Pulling data from SQL databases
import sqlalchemy

# Create a connection to the sales database
db_engine = sqlalchemy.create_engine("postgresql+psycopg2://repl:password@localhost:5432/sales")

# Query the sales table
raw_sales_data = pd.read_sql("sales", db_engine)
print(raw_sales_data)

In [None]:
# Code modularization to help make pipelines more readable and reusable, and can help to expedite troubleshooting efforts.
def extract():
  	# Create a connection URI and connection engine
    connection_uri = "postgresql+psycopg2://repl:password@localhost:5432/sales"
    db_engine = sqlalchemy.create_engine(connection_uri)

In [None]:
def extract():
    connection_uri = "postgresql+psycopg2://repl:password@localhost:5432/sales"
    db_engine = sqlalchemy.create_engine(connection_uri)
    raw_data = pd.read_sql("SELECT * FROM sales WHERE quantity_ordered = 1", db_engine)

    # Print the head of the DataFrame
    print(raw_data.head())

    # Return the extracted DataFrame
    return raw_data

# Call the extract() function
raw_sales_data = extract()

# Transforming data with pandas


In [None]:
# Extract data from the sales_data.parquet path
raw_sales_data = extract("sales_data.parquet")

def transform(raw_data):
  	# Only keep rows with `Quantity Ordered` greater than 1
    clean_data = raw_data.loc[raw_data['Quantity Ordered'] > 1, :]

    # Only keep columns "Order Date", "Quantity Ordered", and "Purchase Address"
    clean_data = clean_data.loc[:, ["Order Date", "Quantity Ordered", "Purchase Address"]]

    # Return the filtered DataFrame
    return clean_data

transform(raw_sales_data)

In [None]:
raw_sales_data = extract("sales_data.csv")

def transform(raw_data):
    # Convert the "Order Date" column to type datetime
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")

    # Only keep items under ten dollars
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]
    return clean_data

clean_sales_data = transform(raw_sales_data)

# Check the data types of each column
print(clean_sales_data.dtypes)

# Validating data transformations

In [None]:
raw_sales_data = extract("sales_data.csv")

def transform(raw_data):
    # Convert the "Order Date" column to type datetime
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")

    # Only keep items under ten dollars
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]
    return clean_data

clean_sales_data = transform(raw_sales_data)

# Check the data types of each column
print(clean_sales_data.dtypes)


# What is the value of the price "Price Each" column of the two most expensive items in the transformed DataFrame?
print(clean_sales_data["Price Each"].sort_values(ascending=False))

# Loading sales data to a CSV file
Loading data is an essential component of any data pipeline. It ensures that any data consumers and processes have reliable access to data that I've extracted and transformed earlier in a pipeline.

In [None]:
def transform(raw_data):
	# Find the items prices less than 25 dollars
	return raw_data.loc[raw_data["Price Each"] < 25, ["Order ID", "Product", "Price Each", "Order Date"]]

def load(clean_data):
	# Write the data to a CSV file without the index column
	clean_data.to_csv("transformed_sales_data.csv", index=False)


clean_sales_data = transform(raw_sales_data)

# Call the load function on the cleaned DataFrame
load(clean_sales_data)

# Customizing a CSV file
Sometimes, data needs to be stored in a CSV file in a customized manner. This may include using different header values, including or excluding the index column of a DataFrame, or altering the character used to separate columns.

In [None]:
# Import the os library
import os

# Load the data to a csv file with the index, no header and pipe separated
def load(clean_data, path_to_write):
	clean_data.to_csv(path_to_write, header=False, sep="|")

load(clean_sales_data, "clean_sales_data.csv")

# Check that the file is present.
file_exists = os.path.exists("clean_sales_data.csv")
print(file_exists)

# Persisting data to files
Loading data to a final destination is one of the most important steps of a data pipeline. I'll use the `transform()` function to transform product sales data before loading it to a .csv file. This will give downstream data consumers a better view into total sales across a range of products.

In [None]:
def load(clean_data, file_path):
    # Write the data to a file
    clean_data.to_csv(file_path, header=False, index=False)

    # Check to make sure the file exists
    file_exists = os.path.exists(file_path)
    if not file_exists:
        raise Exception(f"File does NOT exists at path {file_path}")

# Load the transformed data to the provided file path
load(clean_sales_data, "transformed_sales_data.csv")

# Persisting data in an ETL pipeline
Loading data to a file is a great way to ensure that data consumers, including data scientists and analysts, have stable access to extracted and transformed data.

While typically most visible during the "load" portion of an ETL process, data persistence is a best practice that can, and should, happen at multiple stages in a data pipeline.

Persisting data to a file allows for a "snapshot" to be taken at various points throughout the pipeline. This is especially useful when recovering from a failure, and is essential if data is hard to reacquire from a source system.

# Logging within a data pipeline
logging is used to alert engineers of data pipeline performance.

Logs are messages created and written during the execution of a pipeline. They are configured by the developing party, and document the performance of a pipeline.

Logs provide a starting point when solutions fail by letting Data Engineers revisit the execution of the pipeline.

Logs are the foundation for all monitoring and alerting efforts, and are essential for creating transparent data pipelines.

In [None]:
def transform(raw_data):
    raw_data["Order Date"] = pd.to_datetime(raw_data["Order Date"], format="%m/%d/%y %H:%M")
    clean_data = raw_data.loc[raw_data["Price Each"] < 10, :]

    # Create an info log regarding transformation
    logging.info("Transformed 'Order Date' column to type 'datetime'.")

    # Create debug-level logs for the DataFrame before and after filtering
    logging.debug(f"Shape of the DataFrame before filtering: {raw_data.shape}")
    logging.debug(f"Shape of the DataFrame after filtering: {clean_data.shape}")

    return clean_data

clean_sales_data = transform(raw_sales_data)

# Handling exceptions when loading data
Sometimea data pipelines might throw an exception. These exceptions are a form of alerting, and they let a Data Engineer know when something unexpected happened. It's important to properly handle these exceptions.

In [None]:
def extract(file_path):
    return pd.read_parquet(file_path)

# Update the pipeline to include a try block
try:
	# Attempt to read in the file
    raw_sales_data = extract("sales_data.parquet")

# Catch the FileNotFoundError
except FileNotFoundError as file_not_found:
	# Write an error-level log
	logging.error(file_not_found)

# Ingesting JSON data with pandas

In [None]:
def extract(file_path):
  # Read the JSON file into a DataFrame
  return pd.read_json(file_path, orient="records")

# Call the extract function with the appropriate path, assign to raw_testing_scores
raw_testing_scores = extract("testing_scores.json")

# Output the head of the DataFrame
print(raw_testing_scores.head())

# Reading JSON data into memory

In [None]:
# Import the json library
import json

def extract(file_path):
    with open(file_path, "r") as json_file:
        # Load the data from the JSON file
        raw_data = json.load(json_file)
    return raw_data

raw_testing_scores = extract("nested_scores.json")

# Print the raw_testing_scores
print(raw_testing_scores)


# Iterating over dictionaries (JSON)

In [None]:
raw_testing_scores_keys = []

# Iterate through the `keys` of the raw_testing_scores dictionary
for school_id in raw_testing_scores.keys():
  	# Append each key to the raw_testing_scores_keys list
	raw_testing_scores_keys.append(school_id)

print(raw_testing_scores_keys[0:3])


In [None]:
raw_testing_scores_values = []

# Iterate through `the values` of the raw_testing_scores dictionary
for school_info in raw_testing_scores.values():
	raw_testing_scores_values.append(school_info)

print(raw_testing_scores_values[0:3])


In [None]:
# Loop through both `the keys and values`

raw_testing_scores_keys = []
raw_testing_scores_values = []

# Iterate through the values of the raw_testing_scores dictionary
for school_id, school_info in raw_testing_scores.items():
	raw_testing_scores_keys.append(school_id)
	raw_testing_scores_values.append(school_info)

print(raw_testing_scores_keys[0:3])
print(raw_testing_scores_values[0:3])

# Parsing data from dictionaries

In [None]:
# Parse the street_address from the dictionary
street_address = school.get("street_address")

# Parse the scores dictionary
scores = school.get("scores")

# Parse the math, reading and writing values from scores
math_score = scores.get("math")
reading_score = scores.get("reading")
writing_score = scores.get("writing")

print(f"Street Address: {street_address}")
print(f"Math: {math_score}, Reading: {reading_score}, Writing: {writing_score}")


In [None]:
normalized_testing_scores = []

# Loop through each of the dictionary key-value pairs
for school_id, school_info in raw_testing_scores.items():
	normalized_testing_scores.append([
    	school_id,
    	school_info.get("street_address"),  # Pull the "street_address"
    	school_info.get("city"),
    	school_info.get("scores").get("math", 0),
    	school_info.get("scores").get("reading", 0),
    	school_info.get("scores").get("writing", 0),
    ])

print(normalized_testing_scores)

# Transforming and cleaning DataFrames

In [None]:
# Create a DataFrame from the normalized_testing_scores list
normalized_data = pd.DataFrame(normalized_testing_scores)

# Set the column names
normalized_data.columns = ["school_id", "street_address", "city", "avg_score_math", "avg_score_reading", "avg_score_writing"]

normalized_data = normalized_data.set_index("school_id")
print(normalized_data.head())

# Filling missing values with pandas

In [None]:
# Print the head of the `raw_testing_scores` DataFrame
print(raw_testing_scores.head())

In [None]:
# Fill NaN values with the average from that column
raw_testing_scores["math_score"] = raw_testing_scores["math_score"].fillna(raw_testing_scores["math_score"].mean())

# Print the head of the raw_testing_scores DataFrame
print(raw_testing_scores.head())


In [None]:
def transform(raw_data):
	raw_data.fillna(
    	value={
			# Fill NaN values with column mean
			"math_score": raw_data["math_score"].mean(),
			"reading_score": raw_data["reading_score"].mean(),
			"writing_score": raw_data["writing_score"].mean()
		}, inplace=True
	)
	return raw_data

clean_testing_scores = transform(raw_testing_scores)

# Print the head of the clean_testing_scores DataFrame
print(clean_testing_scores.head())

# Grouping data with pandas

In [None]:
def transform(raw_data):
	# Use .loc[] to only return the needed columns
	raw_data = raw_data.loc[:, ["city", "math_score", "reading_score", "writing_score"]]

    # Group the data by city, return the grouped DataFrame
	grouped_data = raw_data.groupby(by=["city"], axis=0).mean()
	return grouped_data

# Transform the data, print the head of the DataFrame
grouped_testing_scores = transform(raw_testing_scores)
print(grouped_testing_scores.head())


# Applying advanced transformations to DataFrames


In [None]:
def transform(raw_data):
	# Use the apply function to extract the street_name from the street_address
    raw_data["street_name"] = raw_data.apply(
   		# Pass the correct function to the apply method
        find_street_name,
        axis=1
    )
    return raw_data

# Transform the raw_testing_scores DataFrame
cleaned_testing_scores = transform(raw_testing_scores)

# Print the head of the cleaned_testing_scores DataFrame
print(cleaned_testing_scores.head())


# Loading data to a Postgres database

In [None]:
# Update the connection string, create the connection object to the schools database
db_engine = sqlalchemy.create_engine("postgresql+psycopg2://repl:password@localhost:5432/schools")

# Write the DataFrame to the scores table
cleaned_testing_scores.to_sql(
	name="scores",
	con=db_engine,
	index=False,
	if_exists="replace"
)


# Validating data loaded to a Postgres Database

In [None]:
def load(clean_data, con_engine):
	# Store the data in the schools database
    clean_data.to_sql(
    	name="scores_by_city",
		con=con_engine,
		if_exists="replace",  # Make sure to replace existing data
		index=True,
		index_label="school_id"
    )


In [None]:
def load(clean_data, con_engine):
    clean_data.to_sql(name="scores_by_city", con=con_engine, if_exists="replace", index=True, index_label="school_id")

# Call the load function, passing in the cleaned DataFrame
load(cleaned_testing_scores, db_engine)

# Call query the data in the scores_by_city table, check the head of the DataFrame
to_validate = pd.read_sql("SELECT * FROM scores_by_city", con=db_engine)
print(to_validate.head())
