# Movie Revenues and IMDb data pipeline prototype

This notebook presents my idea for a pipeline that ingests `revenue_per_day csv` file, enriching it with data from the OMDB API (IMDb).
Despite the fact, that a real pipeline should be built on the basis of other tools than SQLite and Python with pandas and should not be *orchestrated* by a notebook. Nevertheless, I decided on such a solution because, despite some simplifications, it is also able to show my process of thinking, development and approaching problems in accessible way.

I also decided to implement the data presentation process in a notebook using python libraries because I currently do not have any tool in which I would be able to quickly prepare dashboards. I didnt manage to find that much of time to recognize available dedicated OpenSource solutions in this area.

Initially I wanted to do more in Pandas and SQLALchemy but executing statements even in SQLite are significantly faster. Some SQL statements can look messy and overwhelming. It is partially due to the my choice - SQLite - and its limitations. For example it doesn't have the UDF. Of course statements could be done better but it would cost more involvement and time.

## Data model
The code below implements the data model whose definition can be found in the file `./diagrams/diagram.dbml` (can be recreated in `https://dbdiagram.io/`) and which I present below:

![Cat](pics/diagram_screenshot.png)

The model consists of two STG tables:
- `stg_revenues_per_day`
- `stg_movies_details`

five dimensions DWH tables:
- `dwh_dim__movies_reviewers`
- `dwh_dim__movies`
- `dwh_dim__reviews_results`
- `dwh_dim__distributors`
- `dwh_dim__distributors`
- `dwh_dim__dates`

and one fact DWH table:
- `dwh_fact__revenues`.

The tables have prefixes dwh_ and stg_ to mimic `schema_name.table_name`. Ideally the should be splited between two schemas but SQLIte doesn't use schemas division concept.

## Pipeline

Pipeline is divided into few task groups (using the Airflow nomenclature). Arrows ~~>~~ shows the direction of the steps.


![Cat](pics/main_flow.png)


The pictures shows only idea in about order of execution of actions. Particular tasks will be represent below as next steps with usage of proper functions. It is crucial to execute them in the order presented in the notebook. Solution presented in this notebook doesn't have implemented any mechanism to define strict dependencies. 

In most cases, tasks will be represented as Python functions.

### main.start

Starts and stops are placeholders like EmptyOpertator in Airflow

### main.creation
This function iterates through a lists of ORM models defined in `models.schemas.schema`. It creates their corresponding tables in the database and sets up triggers. In the proper pipeline actions can be executed parallelly.

![Cat](pics/creation_flow.png)

**All tables have technical fields `created_date` and `modified_date`. All DWH tables have additionally `id` as primary key**

In [1]:
from sqlmodel import create_engine
from pipeline.creation import create_from_orms
from models.schemas.schema import stg_orms, dwh_orms

engine = create_engine(f"sqlite:///./db/task.db", echo=False)

#### creation.create_stg

In [2]:
create_from_orms(models=stg_orms, engine=engine)

If not existed, table 'stg_revenues_per_day' created + modified_date trigger
If not existed, table 'stg_movies_details' created + modified_date trigger


<State.SUCCESS: 'SUCCESS'>

#### creation.create_dwh

In [3]:
create_from_orms(models=dwh_orms, engine=engine)

If not existed, table 'dwh_dim__movies_reviewers' created + modified_date trigger
If not existed, table 'dwh_dim__movies' created + modified_date trigger
If not existed, table 'dwh_dim__reviews_results' created + modified_date trigger
If not existed, table 'dwh_dim__distributors' created + modified_date trigger
If not existed, table 'dwh_dim__dates' created + modified_date trigger
If not existed, table 'dwh_fact__revenues' created + modified_date trigger


<State.SUCCESS: 'SUCCESS'>

### main.ingestion

Here are two tasks. They have to be executed in series. Data gathered in the first step is used in next to do API calls.

![Cat](pics/ingestion_flow.png)

#### ingestion.ingest_revenues

This function loads a CSV file into memory as a Pandas DataFrame based on the parameters defined in the `CSVIngesterDefinition` object (`revenues_per_day_definition`). From DataFrame the temp table is created in DB. Using the temp table there are UPSERTs to proper `stg_revenues_per_day`. In seperate session the temp table is dropped.

Initially I introduced more elegant version with SQLAlchemy merge but it turned out to be sinificantly slower than temp table and UPSERT. I left the old ingester in files because it is more generic and could be used to load data to different tables with ORM classes.
Fn `csv_ingester_revenues` used here is tailored for the specific table and specific CSV.

In [13]:
from pipeline.ingestion import csv_ingester_revenues
from models.definitions.ingestion import revenues_per_day_definition

csv_ingester_revenues(definition=revenues_per_day_definition, engine=engine)

CSV loaded to memory. Starting db merge


<State.SUCCESS: 'SUCCESS'>

#### ingestion.fetch_and_ingest_movie_details

This function retrieves a list of distinct movie titles from the database, queries the OMDB API for details, and stores the results in a temp table. In case of faulty responses or API errors, it keeps track of the number of failed attempts and stops once a certain threshold is reached. The fetched data is merged into the main table, and the temporary table is dropped after successful execution.
The `OMDBAPIFetchDefinition` object (`movies_details_api_fetch_definition`) stoers parameters which define the API (url, faulty attempts number, dry run)

Initially I introduced more elegant version with SQLAlchemy merge but it turned out to be sinificantly slower than temp table and UPSERT.
`Movie not found!` this error is not taken into account when calculating the number of incorrect attempts.

**You can limit the api calls for test purposes using `limit_calls`** <br>
`OMDB_API_KEY` valid must be passed to function. **Below you can put your API key as env varaible.**

**\\/\\/\\/\\/\\/\\/\\/**  `OMDB_API_KEY`  **\\/\\/\\/\\/\\/\\/\\/**

In [11]:
%env OMDB_API_KEY=TO_BE_DEFINED!!

env: OMDB_API_KEY=TO_BE_DEFINED!!


**\\/\\/\\/\\/\\/\\/\\/**  `OMDB_API_KEY`  **\\/\\/\\/\\/\\/\\/\\/**

In [9]:
import os
from pipeline.ingestion import fetch_movies_details
from models.definitions.ingestion import movies_details_api_fetch_definition

In [10]:
fetch_movies_details(definition=movies_details_api_fetch_definition,
                     api_key=os.environ.get('OMDB_API_KEY'),
                     engine=engine,
                     limit_calls=10000
                    )

  1%|██▌                                                                                                                                                                                             | 33/2509 [00:02<03:41, 11.17it/s]

Faulty API response for Poku00e9mon the Movie 2000. Movie not found!
Faulty API response for Poku00e9mon 3 the Movie: Spell of the Unown. Request limit reached!
Faulty API response for Amu00e9lie. Movie not found!
Faulty API response for Beauty and the Beast2000 IMAX Release. Movie not found!
Faulty API response for Peter Pan 2: Return to Never Land. Movie not found!
Faulty API response for E.T. the Extra-Terrestrial20th Anniversary. Request limit reached!
Faulty API response for Van Wilder: Party Liaison. Request limit reached!
Faulty API response for Spider-Man/Men in Black IIDouble Bill. Request limit reached!
Faulty API response for Lawrence of Arabia2002 Re-release. Request limit reached!
Faulty API response for Star Wars: Episode II - Attack of the Clones2002 IMAX Release. Movie not found!
Faulty API response for Singin' in the Rain2002 Re-release. Movie not found!
Faulty API response for The Lion King2002 IMAX Release. Movie not found!
Faulty API response for He Loves Me... He L




<State.SUCCESS: 'SUCCESS'>

### main.transformation

The transformation of data and population of DWH tables is done by executing SQL statements which are stored in `./pipeline/transformation` directory.  

![Cat](pics/transformation_flow.png)

All statements are written as UPSERTS

In [9]:
from pipeline.transformation import populate_using_sql

#### transformation.populate_dim__dates
This statement collects all distinct dates from STG sources and transform them into format `YYYY-MM-DD`.

**The conversion from format `DD MMM YYYY` is pretty nasty due to the SQLite limitations in that matter!**

In [10]:
populate_using_sql(filename="dwh_dim__dates.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

#### transformation.populate_dim__distributors
This statement collects all distinct movie distributors. The value `-` is considered as `NULL`.


In [11]:
populate_using_sql(filename="dwh_dim__distributors.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

#### transformation.populate_dim__movies_reviewers
This statement collects all distinct entities which are reviewing and scoring movies. Additinal INSERT is with `IMDb` as reviewer. Originally the IMDb scores are not included in `Ratings` but I decided to unify it.


In [12]:
populate_using_sql(filename="dwh_dim__movies_reviewers.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

#### transformation.populate_dim__movies
The step with biggest number of changes in data. Data is taken from JSON stored in STG table.
- `Year` field is splited (if it consist of span of years like `2001-2005` into two. In proper `YYYY-MM-DD` format and stored as relation to dim table `start_year_date_id` and `end_year_date_id`. The year is represented by first day of year so for example 2001 => 2001-01-01.
- `Rated` and `type` is set as Enum in ORM.
- `Released` and `DVD` transformed in proper `YYYY-MM-DD` format and stoared as relation to dim table as `release_date_id`, `dvd_release_date_id` .
- `Genre`, `Director`, `Writer` and `Actors`, `Language`, `Country` are transformed into JSON arrays. <br><br>
**The conversion from format `DD MMM YYYY` is pretty nasty due to the SQLite limitations in that matter!**

In [13]:
populate_using_sql(filename="dwh_dim__movies.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

#### transformation.populate_dim__reviews_results
All ratings are transformed into percent value from 0-100.

In [14]:
populate_using_sql(filename="dwh_dim__reviews_results.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

#### transformation.populate_fact__revenues
In the end the fact table is populated with 3 foreign keys.

In [15]:
populate_using_sql(filename="dwh_fact__revenues.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

### main.create_views

Create views function is similar to `populate_using_sql` and in this case (for now) all can be run paralelly. There is no dependency. Therefore I didn't add the graph.

In [23]:
from pipeline.create_views import create_using_sql

create_using_sql(filename="actors_revenues.sql", engine=engine)
create_using_sql(filename="countries_revenues.sql", engine=engine)
create_using_sql(filename="directors_revenues.sql", engine=engine)
create_using_sql(filename="movies_revenues.sql", engine=engine)
create_using_sql(filename="per_genre_revenues.sql", engine=engine)
create_using_sql(filename="per_month_revenues.sql", engine=engine)
create_using_sql(filename="per_rating_revenues.sql", engine=engine)
create_using_sql(filename="per_year_revenues.sql", engine=engine)
create_using_sql(filename="writers_revenues.sql", engine=engine)

<State.SUCCESS: 'SUCCESS'>

### main.stop

This is the end of the pipeline.

## Dasboard
The graph shows total revenues (average for dates) with the dropdown which allows to change the context on the fly. Please bear in mind that it's still about the revenues of the movies, not individual units. 
Example: if Daniel Radcliffe, the actor playing Harry Potter is in first place, it doesn't mean that he's the richest, but that the films in which he played turned out to be the most profitable (in dataset).

**If you don't have time to run all commands and fetch significant amount of data from API I left the little demo pics in `./pics/` with `demo_` prefix.**

In [2]:
import ipywidgets
from ipywidgets.widgets import fixed
from models.definitions.dashboard import dropdown, most_successful_plots_definitions
from pipeline.dashboard.successful import get_most_successful_graph

interactive_output = ipywidgets.interactive(get_most_successful_graph,
                                            selected=dropdown,
                                            engine=fixed(engine),
                                            definition=fixed(most_successful_plots_definitions))
display(interactive_output)

interactive(children=(Dropdown(description='Select', index=1, options=(('Month', 'per_month'), ('Genres', 'gen…

# Summary
- Some records were downloaded with different names than they were originally in CSV. This is due to the API bug or feature, which, for example, when getting the position "Sarnie Zniwo" returns "Sarnie zniwo, czyli Pokusa statuetkowego szlaku" which is good but in some cases causes incorrect operation, e.g. for the title "Together With You" it downloaded "A Christmas Together with You" and I assume it should be "Together With You (Well, If We Get Along)". A query with these inconsistencies should be prepared and the data bringed into compliance (even manually).

- Some titles from CSV are corrupted or encoded in a way that the API does not recognize them properly. These names should also be caught (even manually) and corrected.

- "any_ingester" which was left in the code but will not conduct db merge well for the current models due to the `created_date` and `updated_date` fields that I introduced later. This function would always update the records. To restore its correct operation, two models would have to be created for each table.

- Perhaps data for generating the chart should be cached in memory because if there were more of them it could cause slow loading.

- My main goal was to prepare plot which is somehow interactive. There was plans to prepare more but the weekend ended :( . It is pity that I didn't used the normalized scores somehow in context of revenues/box office. The plots with number of theaters also would be interesting.

- Thank you for the task. Very cool. I had fun doing it. Of course I would still improve many things in it despite the "primitive" tools.

- If Michael Bay would direct the new Harry Potter with Daniel Radcliffe still in the lead role, and the premiere would be in the middle of year the film would be financial hit :D