# **Workshop #2**

### *Data Pipeline - `spotify` and `the_grammy_awards` dataset*
---

## ***Setting the project directory***
This script attempts to change the current working directory to the specified path.
If the directory change fails due to the directory not being found, it prints a message indicating that the user is already in the correct directory.

In [1]:
import os

try:
    os.chdir("../../Workshop #2")
except FileNotFoundError:
    print("You are already in the correct directory.")

## ***Importing dependencies***

**Modules for extraction:**
* **src.extract.spotify_extract**
* **src.extract.grammys_extract**: uses `src.database.db_operations`

**Modules for transformation:**
* **src.transform.spotify_transform**
* **src.transform.grammys_transform**

**Modules for merge:**
* **src.transform.merge**

**Modules for load:**
* **src.load_and_store.load**

---

**For this environment we are using:**
* ***Pandas*** >= 2.2.2

**From the `src.database.db_operations` module, we are also using:**
* ***SQLAlchemy*** >= 2.0.32
    * *SQLAlchemy Utils* >= 0.41.2
* ***python-dotenv*** >= 1.0.1

In [2]:
from src.extract.spotify_extract import extracting_spotify_data
from src.extract.grammys_extract import extracting_grammys_data

from src.transform.spotify_transform import transforming_spotify_data
from src.transform.grammys_transform import transforming_grammys_data
from src.transform.merge import merging_datasets

from src.load_and_store.load import loading_clean_data
from src.load_and_store.store import store_merged_data

import pandas as pd

# import matplotlib.pyplot as plt
# import seaborn as sns
# plt.style.use("ggplot")

## ***Extracting the data***
---

### **Spotify dataset**
In this section we extract the CSV using the `spotify_extract` module functions. With the use of these functions we can further synthesize our ETL process, and they will be very useful for when we create the tasks using Apache Airflow.

In [None]:
spotify_data = extracting_spotify_data("./data/raw/spotify_dataset.csv")

In [None]:
spotify_data.head()

### **Grammys dataset**

The extraction process from the PostgreSQL database is performed from the `grammys_extract` module, facilitating the generation of logs in our ETL process. There is no need to create or dispose the connection engine from the notebook, as this process is already done in the module.

In [None]:
grammys_data = extracting_grammys_data()

In [None]:
grammys_data.head()

## ***Transforming the data***
---

### *Spotify transformations*
  
- Created a `transforming_spotify_data` function to clean and transform the Spotify DataFrame by:

  - Removing unnecessary columns (e.g., `"Unnamed: 0"`).

  - Eliminating null values and resetting the DataFrame index.

  - Removing duplicates through several steps:
    - Dropped exact duplicate rows.
    - Removed duplicates based on the `"track_id"` column.
    - Mapped detailed genres to broader categories using a predefined genre mapping dictionary.
    - Dropped duplicates based on song names and artists, keeping the most popular entries.

  - Generated new columns for enhanced data analysis:
    - **`duration_min`**: Converted song duration from milliseconds to minutes.
    - **`duration_category`**: Categorized songs based on their duration.
    - **`popularity_category`**: Categorized songs based on their popularity scores.
    - **`track_mood`**: Identified the mood of songs using valence scores.
    - **`live_performance`**: Flagged songs with a high likelihood of being live performances.

  - Dropped irrelevant columns to streamline the dataset (e.g., `"loudness"`, `"mode"`, `"tempo"`).
  
  - Included logging statements to document the cleaning and transformation process, as well as to catch any potential errors.

In [None]:
spotify_df = transforming_spotify_data(spotify_data)

In [None]:
spotify_df.head()

### *Grammys Transformations*

- Created a `transforming_grammys_data` function to clean and transform the Grammy Awards DataFrame by:

  - Renaming the column `winner` to `is_nominated`.

  - Dropping unnecessary columns (e.g., `published_at`, `updated_at`, `img`).

  - Removing rows with null values in `nominee`.

  - Handling cases where both `artist` and `workers` are null:
    - Filtered out specific categories listed in the `categories` list.
    - For the remaining rows, filled `artist` with the value from `nominee`.

  - Populating the `artist` column by applying several functions:
    - **`extract_artist`**: Extracted artist names within parentheses from the `workers` column.
    - **`move_workers_to_artist`**: Moved data from `workers` to `artist` if `artist` is null and `workers` doesn't contain semicolons or commas.
    - **`extract_artists_before_semicolon`**: Extracted artist names before semicolons in `workers`, excluding any roles of interest.
    - **`extract_roles_based_on_interest`**: Extracted names associated with specific roles defined in the `roles_of_interest` list from `workers`.

  - Dropped rows with null values in `artist`.

  - Replaced certain values in the `artist` column (e.g., changing `(Various Artists)` to `Various Artists`).

  - Dropped the `workers` column as it was no longer needed.

  - Included logging statements to document the cleaning and transformation process, as well as to catch any potential errors.

In [None]:
grammys_df = transforming_grammys_data(grammys_data)

In [None]:
grammys_df.head()

## ***Merging the data***
---

### *Merging Spotify and Grammys Datasets*

- Created a `merge_datasets` function to combine the Spotify and Grammy Awards DataFrames by:

  - **Cleaning key columns for accurate merging**:
    - Converted the `track_name` column in the Spotify DataFrame to lowercase and stripped whitespace, creating a new column `track_name_clean`.
    - Converted the `nominee` column in the Grammys DataFrame to lowercase and stripped whitespace, creating a new column `nominee_clean`.

  - **Merging the datasets**:
    - Performed a left join on the cleaned columns `track_name_clean` and `nominee_clean` to merge the DataFrames.
    - Used suffixes to differentiate overlapping columns, appending `_grammys` to columns from the Grammys DataFrame when necessary.

  - **Handling missing values**:
    - Filled null values in the `title` and `category` columns with `"Not applicable"`.
    - Filled null values in the `is_nominated` column with `False`.

  - **Dropping unnecessary columns**:
    - Removed columns that were no longer needed after the merge, such as `"year"`, `"artist"`, `"nominee"`, `"nominee_clean"`, and `"track_name_clean"`.

- Included helper functions to streamline the data processing:

  - **`fill_null_values`**: Filled null values in specified columns with a given value to ensure data completeness.

  - **`drop_columns`**: Dropped specified columns from the DataFrame to eliminate redundancy and maintain a clean dataset.

- **Used logging throughout** the process to monitor the merging steps and catch any potential issues, enhancing traceability and debugging capability.

In [None]:
merged_df = merging_datasets(spotify_df, grammys_df)

In [None]:
merged_df.query("is_nominated == True").head()

When performing the merge we can notice a change in the number of rows: an increase of approximately 600 records. Why is this happening?

In [None]:
merged_df.info()

When we look at the number of unique values for *track_id* we see that it is still the same as the Spotify dataset. However, this ensures that suddenly the rise in the number of rows is due to an increase in duplicates within our dataframe.

In [None]:
merged_df["track_id"].nunique()

We are going to group the duplicated songs in order to know what could condition this increase in the number of records.

In [15]:
duplicated_counts = (merged_df
                     .groupby(["track_id", "track_name", "artists", "album_name"])
                     .size()
                     .reset_index(name="duplicate_count")
                     .sort_values(by="duplicate_count", ascending=False))

duplicated_values = duplicated_counts.query("duplicate_count > 1")

In [None]:
duplicated_values.head()

**Looking at the results for this song, we can see that:**

* The fact that a record starts repeating several times may be due to the fact that the song has several nominations.

* **However, the merge criteria can also lead to confusion**: here we find that Elvis is nominated for *Best R&B Vocal Performance, Female*. Elvis is so ubiquitous that he ended up being nominated in a female category.

    * This is caused by the similarity of the values in *track_name*. In order to avoid it, we should add one more criterion to the merge process, but it would imply a much more careful and specific cleanup in the *artist* columns of both datasets..

In [None]:
merged_df.query("track_id == '0gaZi6J3Pk7FG7GNMHsK5o'").head()

## ***Loading the data***
---

The `loading_clean_data` function loads a Pandas DataFrame into a database. It logs the process, creates an engine, and tries to load data using `load_clean_data`. If successful, it logs a success message, otherwise logs any errors. The `load_clean_data` function checks if the table exists, creates it if needed, and loads the data.

In [None]:
loading_clean_data(merged_df, "merged_data")

## ***Storing the data***
---

The `store_merged_data` function uploads a merged DataFrame as a CSV file to Google Drive. It authenticates the drive, converts the DataFrame to CSV format, and creates a new file in the specified Google Drive folder. The content is uploaded, and a success message is logged.

In [None]:
store_merged_data("merged_data", merged_df, "1x3tS43kSxC2oKhq7xCiJFzXqGerGvcy7")