In [3]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name
""", db_engine)

# Show the first 3 rows of the DataFrame
print(data.head(3))

# Show the info of the DataFrame
print(data.info())

# Good work. You now know how to query a SQL database from Python using pandas.

NameError: name 'db_engine' is not defined

In [5]:
# Complete the SELECT statement
data = pd.read_sql("""
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "Order"."customer_id"="Customer"."id"
""", db_engine)

# Show the id column of data
print(data.id)

# Good work. In the IPython Shell, you can see that data.id outputs 2 columns. This is often a source of error, 
# so a better strategy here would be to select specific columns or use the AS keyword in SQL to rename columns.

NameError: name 'db_engine' is not defined

# Parallel Computing

From task to subtasks
For this exercise, you will be using parallel computing to apply the function take_mean_age() that calculates the average athlete's age in a given year in the Olympics events dataset. The DataFrame athlete_events has been loaded for you and contains amongst others, two columns:

Year: the year the Olympic event took place
Age: the age of the Olympian
You will be using the multiprocessor.Pool API which allows you to distribute your workload over several processes. The function parallel_apply() is defined in the sample code. It takes in as input the function being applied, the grouping used, and the number of cores needed for the analysis. Note that the @print_timing decorator is used to time each operation.

In [6]:
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
    with Pool(nb_cores) as p:
        results = p.map(apply_func, groups)
    return pd.concat(results)

# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)

# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)

# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)

NameError: name 'print_timing' is not defined

Great work! For educational purposes, we've used a little trick here to make sure the parallelized version runs faster. In reality, using parallel computing wouldn't make sense here since the computations are simple and the dataset is relatively small. Communication overhead costs the multiprocessing version its edge!

Using a DataFrame
In the previous exercise, you saw how to split up a task and use the low-level python multiprocessing.Pool API to do calculations on several processing units.

It's essential to understand this on a lower level, but in reality, you'll never use this kind of APIs. A more convenient way to parallelize an apply over several groups is using the dask framework and its abstraction of the pandas DataFrame, for example.

The pandas DataFrame, athlete_events, is available in your workspace.

In [8]:
import dask.dataframe as dd

# Set the number of pratitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions = 4)

# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())

ModuleNotFoundError: No module named 'dask'

Nice work! Using the DataFrame abstraction makes parallel computing easier. You'll see in the next video that this abstraction is often used in the big data world, for example in Apache Spark.

Spark, Hadoop and Hive
You've encountered quite a few open source projects in the previous video. There's Hadoop, Hive, and PySpark. It's easy to get confused between these projects.

They have a few things in common: they are all currently maintained by the Apache Software Foundation, and they've all been used for massive parallel processing. Can you spot the differences?

-> Interactive Excercise

A PySpark groupby
You've seen how to use the dask framework and its DataFrame abstraction to do some calculations. However, as you've seen in the video, in the big data world Spark is probably a more popular choice for data processing.

In this exercise, you'll use the PySpark package to handle a Spark DataFrame. The data is the same as in previous exercises: participants of Olympic events between 1896 and 2016.

The Spark Dataframe, athlete_events_spark is available in your workspace.

The methods you're going to use in this exercise are:

.printSchema(): helps print the schema of a Spark DataFrame.
.groupBy(): grouping statement for an aggregation.
.mean(): take the mean over each group.
.show(): show the results.

In [9]:
# Print the type of athlete_events_spark
print(type(athlete_events_spark))

# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age'))

# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age').show())

<class 'type'>


NameError: name 'athlete_events_spark' is not defined

Great work! However, we're not finished just yet. In the next exercise, you'll see more about how to set up PySpark locally.


Running PySpark files
In this exercise, you're going to run a PySpark file using spark-submit. This tool can help you submit your application to a spark cluster.

For the sake of this exercise, you're going to work with a local Spark instance running on 4 threads. The file you need to submit is in /home/repl/spark-script.py. Feel free to read the file:

cat /home/repl/spark-script.py
You can use spark-submit as follows:

spark-submit \
  --master local[4] \
  /home/repl/spark-script.py
What does this output? Note that it may take a few seconds to get your results.

DAGs -> Directly Acyclic Graphs, show the dependencies between jobs that need to be scheduled. The defacto scheduling tool is AirFlow

Airflow DAGs
In Airflow, a pipeline is represented as a Directed Acyclic Graph or DAG. The nodes of the graph represent tasks that are executed. The directed connections between nodes represent dependencies between the tasks.

Representing a data pipeline as a DAG makes much sense, as some tasks need to finish before others can start. You could compare this to an assembly line in a car factory. The tasks build up, and each task can depend on previous tasks being finished. A fictional DAG could look something like this:

Example DAG

Assembling the frame happens first, then the body and tires and finally you paint. Let's reproduce the example above in code.

In [10]:
# Create the DAG object
dag = DAG(dag_id="car_factory_simulation",
          default_args={"owner": "airflow","start_date": airflow.utils.dates.days_ago(2)},
          schedule_interval="0 * * * *")

# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)

# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)

NameError: name 'DAG' is not defined

Good work! Of course, there are many other kinds of operators you can use in real life situations. You're only scratching the surface of Airflow's capabilities here.

# ETL Extract, Transform and Load

Fetch from an API
In the last video, you've seen that you can extract data from an API by sending a request to the API and parsing the response which was in JSON format. In this exercise, you'll be doing the same by using the requests library to send a request to the Hacker News API.

Hacker News is a social news aggregation website, specifically for articles related to computer science or the tech world in general. Each post on the website has a JSON representation, which you'll see in the response of the request in the exercise.



In [11]:
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()['score']
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


Great work! You might have noticed that the response is not a real response. This is a result of DataCamp's sandboxing environment. However, you can interract with the real API using the exact same method.

## Read from a database
In this exercise, you're going to extract data that resides inside tables of a local PostgreSQL database. The data you'll be using is the Pagila example database. The database backs a fictional DVD store application, and educational resources often use it as an example database.

You'll be creating and using a function that extracts a database table into a pandas DataFrame object. The tables you'll be extracting are:

film: the films that are rented out in the DVD store.
customer: the customers that rented films at the DVD store.
In order to connect to the database, you'll have to use a PostgreSQL connection URI, which looks something like this:

postgresql://[user[:password]@][host][:port][/database]

In [13]:
import sqlalchemy

# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)

# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("customer", db_engine)

ModuleNotFoundError: No module named 'psycopg2'

Great! The connection URI can also be a link to an external database. Keep in mind that in this case, pulling in full tables can result in loads of network traffic.

# Transform

## Splitting the rental rate
In the video exercise, you saw how to use pandas to split the email address column of the film table in order to extract the users' domain names. Suppose you would want to have a better understanding of the rates users pay for movies, so you decided to divide the rental_rate column into dollars and cents.

In this exercise, you will use the same techniques used in the video exercises to do just that! The film table has been loaded into the pandas DataFrame film_df. Remember, the goal is to split up the rental_rate column into dollars and cents.

In [14]:
# Get the rental rate column as a string
rental_rate_str = film_df.rental_rate.astype(str)

# Split up and expand the column
rental_rate_expanded = rental_rate_str.str.split('.', expand=True)

# Assign the columns to film_df
film_df = film_df.assign(
    rental_rate_dollar=rental_rate_expanded[0],
    rental_rate_cents=rental_rate_expanded[1],
)

NameError: name 'film_df' is not defined

Good job! Pandas is a reasonable tool to transform your data with for small dataset sizes.

In [15]:
spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila",
                "customer",
                {"user":"repl","password":"password"})

NameError: name 'spark' is not defined

## Joining with ratings
In the video exercise, you saw how to use transformations in PySpark by joining the film and ratings tables to create a new column that stores the average rating per customer. In this exercise, you're going to create more synergies between the film and ratings tables by using the same techniques you learned in the video exercise to calculate the average rating for every film.

The PySpark DataFrame with films, film_df and the PySpark DataFrame with ratings, rating_df, are available in your workspace.

In [16]:
# Use groupBy and mean to aggregate the column
ratings_per_film_df = rating_df.groupby('film_id').mean('rating')

# Join the tables using the film_id column
film_df_with_ratings = film_df.join(
    ratings_per_film_df,
    film_df.film_id==ratings_per_film_df.film_id
)

# Show the 5 first results
print(film_df_with_ratings.show(5))

NameError: name 'rating_df' is not defined

Good work! You successfully joined data from two separate sources.

# Load


## OLAP or OLTP
You saw that there's a difference between OLAP and OLTP operations. A small recap:

OLAP: Online analytical processing
OLTP: Online transaction processing
It's essential to use the right database for the right job. There's a list of statements below. Can you find the most appropriate statement that is true?



## Writing to a file
In the video, you saw that files are often loaded into a MPP database like Redshift in order to make it available for analysis.

The typical workflow is to write the data into columnar data files. These data files are then uploaded to a storage system and from there, they can be copied into the data warehouse. In case of Amazon Redshift, the storage system would be S3, for example.

The first step is to write a file to the right format. For this exercises you'll choose the Apache Parquet file format.

There's a PySpark DataFrame called film_sdf and a pandas DataFrame called film_pdf in your workspace.

In [17]:
# Write the pandas DataFrame to parquet
film_pdf.to_parquet('films_pdf.parquet')

# Write the PySpark DataFrame to parquet
film_sdf.write.parquet('films_sdf.parquet')

NameError: name 'film_pdf' is not defined

Great job! That's really impressive.

## Load into Postgres
In this exercise, you'll write out some data to a PostgreSQL data warehouse. That could be useful when you have a result of some transformations, and you want to use it in an application.

For example, the result of a transformation could have added a column with film recommendations, and you want to use them in your online store.

There's a pandas DataFrame called film_pdf in your workspace.

As a reminder, here's the structure of a connection URI for sqlalchemy:

postgresql://[user[:password]@][host][:port][/database]

In [19]:
# Finish the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# Transformation step, join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)

# Finish the .to_sql() call to write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")

# Run the query to fetch the data
pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_dwh)

ModuleNotFoundError: No module named 'psycopg2'

Awesome work! Loading insights into a PostgreSQL data warehouse isn't that hard! In the next lesson, you'll learn how to put it all together!

# Scheduler with Airflow

crontab.guru fro cron expression

## Defining a DAG
In the previous exercises you applied the three steps in the ETL process:

Extract: Extract the film PostgreSQL table into pandas.
Transform: Split the rental_rate column of the film DataFrame.
Load: Load a the film DataFrame into a PostgreSQL data warehouse.
The functions extract_film_to_pandas(), transform_rental_rate() and load_dataframe_to_film() are defined in your workspace. In this exercise, you'll add an ETL task to an existing DAG. The DAG to extend and the task to wait for are defined in your workspace are defined as dag and wait_for_table respectively.

In [20]:
# Define the ETL function
def etl():
    film_df = extract_film_to_pandas()
    film_df = transform_rental_rate(film_df)
    load_dataframe_to_film(film_df)

# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
                          python_callable=etl,
                          dag=dag)

# Set the upstream to wait_for_table and sample run etl()
etl_task.set_upstream(wait_for_table)
etl()

NameError: name 'PythonOperator' is not defined

Nicely done! Be sure to experiment with a few queries once the data pipeline has run. For example:
pd.read_sql('SELECT rating, AVG(rental_duration) FROM film GROUP BY rating ORDER BY AVG', db_engine)

## Setting up Airflow
In this exercise, you'll learn how to add a DAG to Airflow. To the right, you have a terminal at your disposal. The workspace comes with Airflow pre-configured, but it's easy to install on your own. (https://airflow.apache.org/docs/stable/start.html)

You'll need to move the dag.py file containing the DAG you defined in the previous exercise to, the DAGs folder. Here are the steps to find it:

The airflow home directory is defined in the AIRFLOW_HOME environment variable. Type echo $AIRFLOW_HOME to find out.
In this directory, find the airflow.cfg file. Use head to read the file, and find the value of the dags_folder.
Now you can find the folder and move the dag.py file there: mv ./dag.py <dags_folder>.

Which files does the DAGs folder have after you moved the file?

# Case Study


## Querying the table
Now that you have a grasp of what's happening in the datacamp_application database, let's go ahead and write up a query for that database.

The goal is to get a feeling for the data in this exercise. You'll get the rating data for three sample users and then use a predefined helper function, print_user_comparison(), to compare the sets of course ids these users rated.

In [21]:
# Complete the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/datacamp_application"
db_engine = sqlalchemy.create_engine(connection_uri)

# Get user with id 4387
user1 = pd.read_sql("SELECT * FROM rating WHERE user_id = 4387", db_engine)

# Get user with id 18163
user2 = pd.read_sql("SELECT * FROM rating WHERE user_id = 18163", db_engine)

# Get user with id 8770
user3 = pd.read_sql("SELECT * FROM rating WHERE user_id = 8770", db_engine)

# Use the helper function to compare the 3 users
print_user_comparison(user1, user2, user3)

ModuleNotFoundError: No module named 'psycopg2'

## Average rating per course
A great way to recommend courses is to recommend top-rated courses, as DataCamp students often like courses that are highly rated by their peers.

In this exercise, you'll complete a transformation function transform_avg_rating() that aggregates the rating data using the pandas DataFrame's .groupby() method. The goal is to get a DataFrame with two columns, a course id and its average rating:

course_id	avg_rating
123	4.72
111	4.62
...	...
In this exercise, you'll complete this transformation function, and apply it on raw rating data extracted via the helper function extract_rating_data() which extracts course ratings from the rating table.

In [22]:
# Complete the transformation function
def transform_avg_rating(rating_data):
  # Group by course_id and extract average rating per course
  avg_rating = rating_data.groupby('course_id').rating.mean()
  # Return sorted average ratings per course
  sort_rating = avg_rating.sort_values(ascending=False).reset_index()
  return sort_rating

# Extract the rating data into a DataFrame    
rating_data = extract_rating_data(db_engines)

# Use transform_avg_rating on the extracted data and print results
avg_rating_data = transform_avg_rating(rating_data)
print(avg_rating_data) 

NameError: name 'extract_rating_data' is not defined

Good work! The result of this transformation could now be loaded into another table and used by data products to recommend courses. It would be a very non-personal recommendation, though.

## Filter out corrupt data
One recurrent step you can expect in the transformation phase would be to clean up some incomplete data. In this exercise, you're going to look at course data, which has the following format:

course_id	title	description	programming_language
1	Some Course	...	r
You're going to inspect this DataFrame and make sure there are no missing values by using the pandas DataFrame's .isnull().sum() methods. You will find that the programming_language column has some missing values.

As such, you will complete the transform_fill_programming_language() function by using the .fillna() method to fill missing values.

In [23]:
course_data = extract_course_data(db_engines)

# Print out the number of missing values per column
print(course_data.isnull().sum())

# The transformation should fill in the missing values
def transform_fill_programming_language(course_data):
    imputed = course_data.fillna({"programming_language": "r"})
    return imputed

transformed = transform_fill_programming_language(course_data)

# Print out the number of missing values per column of transformed
print(transformed.isnull().sum())

NameError: name 'extract_course_data' is not defined

Great! You've managed to find out which columns had missing values and write up a transformation function to alleviate that problem.

## Using the recommender transformation
In the last few exercises, you calculated the average rating per course and cleaned up some course data. You will use this data to produce viable recommendations for DataCamp students.

As a reminder, here are the decision rules for producing recommendations:

Use technology a student has rated the most.
Exclude courses a student has already rated.
Find the three top-rated courses from eligible courses.
In order to produce the final recommendations, you will use the average course ratings, and the list of eligible recommendations per user, stored in avg_course_ratings and courses_to_recommend respectively. You will do this by completing the transform_recommendations() function which merges both DataFrames and finds the top 3 highest rated courses to recommend per user.

In [24]:
# Complete the transformation function
def transform_recommendations(avg_course_ratings, courses_to_recommend):
    # Merge both DataFrames
    merged = courses_to_recommend.merge(avg_course_ratings)
    # Sort values by rating and group by user_id
    grouped = merged.sort_values("rating", ascending = False).groupby('user_id') 
    # Produce the top 3 values and sort by user_id
    recommendations = grouped.head(3).sort_values("user_id").reset_index()
    final_recommendations = recommendations[["user_id", "course_id","rating"]]
    # Return final recommendations
    return final_recommendations

# Use the function with the predefined DataFrame objects
recommendations = transform_recommendations(avg_course_ratings, courses_to_recommend)

NameError: name 'avg_course_ratings' is not defined

Awesome work! In the next lesson, you're going to put it all together and schedule an ETL pipeline for developing recommendations.

## The target table
In the previous exercises, you've calculated a DataFrame called recommendations. It contains pairs of user_id's' and course_id's, with a rating that represents the average rating of this course. The assumption is the highest rated course, which is eligible for a user would be best to recommend.

It's time to put this table into a database so that it can be used by several products like a recommendation engine or an emailing system.

Since it's a pandas.DataFrame object, you can use the .to_sql() method. Of course, you'll have to connect to the database using the connection URI first. The recommendations table is available in your environment.


In [25]:
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine = sqlalchemy.create_engine(connection_uri)

def load_to_dwh(recommendations):
    recommendations.to_sql("recommendations", db_engine, schema="store", if_exists="replace")

ModuleNotFoundError: No module named 'psycopg2'

Nice work. We now have all the functions to build or whole etl() function!

## Defining the DAG
In the previous exercises, you've completed the extract, transform and load phases separately. Now all of this is put together in one neat etl() function that you can discover in the console.

The etl() function extracts raw course and ratings data from relevant databases, cleans corrupt data and fills in missing value, computes average rating per course and creates recommendations based on the decision rules for producing recommendations, and finally loads the recommendations into a database.

As you might remember from the video, etl() accepts a single argument: db_engines. You can pass this to the task using op_kwargs in the PythonOperator. You can pass it a dictionary that will be filled in as kwargs in the callable.

In [26]:
# Define the DAG so it runs on a daily basis
dag = DAG(dag_id="recommendations",
          schedule_interval="0 0 * * *")

# Make sure `etl()` is called in the operator. Pass the correct kwargs.
task_recommendations = PythonOperator(
    task_id="recommendations_task",
    python_callable=etl,
    op_kwargs={"db_engines":db_engines},
)

NameError: name 'DAG' is not defined

Great, you've defined the DAG! It's time to enable it.

## Querying the recommendations
In the previous exercises, you've learned how to calculate a table with course recommendations on a daily basis. Now that this recommendations table is in the data warehouse, you could also quickly join it with other tables in order to produce important features for DataCamp students such as customized marketing emails, intelligent recommendations for students and other features.

In this exercise, you will get a taste of how the newly created recommendations table could be utilized by creating a function recommendations_for_user() which automatically gets the top recommended courses based per user ID for a particular rating threshold.

In [27]:
def recommendations_for_user(user_id, threshold=4.5):
  # Join with the courses table
  query = """
  SELECT title, rating FROM recommendations
    INNER JOIN courses ON courses.course_id = recommendations.course_id
    WHERE user_id=%(user_id)s AND rating>%(threshold)s
    ORDER BY rating DESC
  """
  # Add the threshold parameter
  predictions_df = pd.read_sql(query, db_engine, params = {"user_id": user_id, 
                                                           "threshold": threshold})
  return predictions_df.title.values

# Try the function you created
print(recommendations_for_user(12, 4.65))

NameError: name 'db_engine' is not defined

Good job! This was the final exercise of this course. By using your skills as a data engineer, you're now able to build intelligent products like recommendation engines.