- Row-oriented databases and OLTP go hand-in-hand.
- Column-oriented databases and OLAP go hand-in-hand.
- OLTP means the system is optimized for transactions.
- APIs mostly use a more structured form of data, for example JSON.

- Use the requests module to get the Hacker News post's JSON object.
- Print out the response, parsed as a JSON.
- Parsing as JSON again, assign the "score" key of the post to post_score.

In [1]:
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)

{'by': 'neis', 'descendants': 0, 'id': 16222426, 'score': 17, 'time': 1516800333, 'title': 'Duolingo-Style Learning for Data Science: DataCamp for Mobile', 'type': 'story', 'url': 'https://medium.com/datacamp/duolingo-style-learning-for-data-science-datacamp-for-mobile-3861d1bc02df'}
17


- Complete the extract_table_to_pandas() function definition to include the tablename argument within the query.
- Fill in the connection URI. The username and password are repl and password, respectively. The host is localhost and port is 5432. The database is pagila.
- Complete the function calls of extract_table_to_pandas() to extract the film and customer tables.

In [4]:
# import sqlalchemy
# import pandas as pd

# # Function to extract table to a pandas DataFrame
# def extract_table_to_pandas(tablename, db_engine):
#     query = "SELECT * FROM {}".format(tablename)
#     return pd.read_sql(query, db_engine)

# # Connect to the database using the connection URI
# connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
# db_engine = sqlalchemy.create_engine(connection_uri)

# # Extract the film table into a pandas DataFrame
# extract_table_to_pandas("film", db_engine)

# # Extract the customer table into a pandas DataFrame
# extract_table_to_pandas("customer", db_engine)

- Use the .astype() method to convert the rental_rate column into a column of string objects, and assign the results to rental_rate_str.
- Split rental_rate_str on '.' and expand the results into columns. Assign the results to rental_rate_expanded.
- Assign the newly created columns into films_df using the column names rental_rate_dollar and rental_rate_cents respectively, setting them to the expanded version using the appropriate index.

In [5]:
# # Get the rental rate column as a string
# rental_rate_str = film_df.rental_rate.astype("str")

# # Split up and expand the column
# rental_rate_expanded = rental_rate_str.str.split(".", expand=True)

# # Assign the columns to film_df
# film_df = film_df.assign(
#     rental_rate_dollar =rental_rate_expanded[0],
#     rental_rate_cents =rental_rate_expanded[1],
# )

Before you can do transformations using PySpark, you need to get the data into the Spark framework. You saw how to do this using PySpark. 

```
spark.read.jdbc("jdbc:postgresql://localhost:5432/pagila",
                "customer",
                {"user":"repl","password":"password"})
```

- Take the mean rating per film_id, and assign the result to ratings_per_film_df.
- Complete the .join() statement to join on the film_id column.
- Show the first 5 results of the resulting DataFrame.

In [6]:
# # Use groupBy and mean to aggregate the column
# ratings_per_film_df = rating_df.groupBy('film_id').mean('film_id')

# # Join the tables using the film_id column
# film_df_with_ratings = film_df.join(
#     ratings_per_film_df,
#     film_df.film_id==ratings_per_film_df.film_id
# )

# # Show the 5 first results
# print(film_df_with_ratings.show(5))

You saw that there's a difference between OLAP and OLTP operations. A small recap:

- OLAP: Online analytical processing
- OLTP: Online transaction processing
- It's essential to use the right database for the right job. 
- Typically, analytical databases are column-oriented.
- Massively parallel processing (MPP) databases are usually column-oriented.
- Databases optimized for OLAP are usually not great at OLTP operations.
- Analytical and application databases have different use cases and should be separated if possible.


- Write the pandas DataFrame film_pdf to a parquet file called "films_pdf.parquet".
- Write the PySpark DataFrame film_sdf to a parquet file called "films_sdf.parquet".

In [7]:
# # Write the pandas DataFrame to parquet
# film_pdf.to_parquet("films_pdf.parquet")

# # Write the PySpark DataFrame to parquet
# film_sdf.write.parquet("films_sdf.parquet")

- Complete the connection URI for to create the database engine. The user and password are repl and password respectively. The host is localhost, and the port is 5432. This time, the database is dwh.
- Finish the call so we use the "store" schema in the database. If the table exists, replace it completely.

In [8]:
# # Finish the connection URI
# connection_uri = "postgresql://repl:password@localhost:5432/dwh"
# db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# # Transformation step, join with recommendations data
# film_pdf_joined = film_pdf.join(recommendations)

# # Finish the .to_sql() call to write to store.film
# film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")

# # Run the query to fetch the data
# pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_dwh)

- Complete the etl() function by making use of the functions defined in the exercise description.
- Make sure etl_task uses the etl callable.
- Set up the correct upstream dependency. Note that etl_task should wait for wait_for_table to be finished.
- The sample code contains a sample run. This means the ETL pipeline runs when you run the code.

In [9]:
# # Define the ETL function
# def etl():
#     film_df = extract_film_to_pandas()
#     film_df = transform_rental_rate(film_df)
#     load_dataframe_to_film(film_df)

# # Define the ETL task using PythonOperator
# etl_task = PythonOperator(task_id='etl_film',
#                           python_callable=etl,
#                           dag=dag)

# # Set the upstream to wait_for_table and sample run etl()
# etl_task.set_upstream(wait_for_table)
# etl()

You'll need to move the dag.py file containing the DAG you defined in the previous exercise to, the DAGs folder. Here are the steps to find it:

- The airflow home directory is defined in the AIRFLOW_HOME environment variable. Type echo $AIRFLOW_HOME to find out.

- In this directory, find the airflow.cfg file. Use head to read the file, and find the value of the dags_folder.

Now you can find the folder and move the dag.py file there: mv ./dag.py <dags_folder>.

Which files does the DAGs folder have after you moved the file?

- It has two DAG files: dag.py and dag_recommendations.py.

`0 0 * * *` means:
- Daily at midnight.it runs at 0:00, so at midnight.