# Marvel ETL Pipeline


In [2]:
import requests
import hashlib
from datetime import datetime
import pandas as pd
import numpy as np
import os
import fastparquet
import psycopg2

### Definition of necessary functions for the ETL:

In this code block, the functions that will be used in the ETL (Extraction, Transformation, and Loading) process of the data from the Marvel API are defined. Each function is documented for its use and has its own responsibility within the pipeline.


In [3]:
def test_api_connection(endpoint, params):
    """
    Tests the connection to an API endpoint by sending a GET request and checks the connection status.

    Parameters:
    endpoint (str): The full URL of the API endpoint to make the request to.
    params (dict): A dictionary with the parameters to send in the GET request.

    Returns:
    bool: Returns `True` if the connection to the API was successful (status code 200), otherwise returns `False`.

    Exceptions:
    Catches any exception related to HTTP requests via `requests.RequestException` and returns `False`.
    """
    try:
        response = requests.get(endpoint, params=params)
        # Check if the request was successful
        if response.status_code == 200:
            print("Successfully connected to the Marvel API.")
            return True
        else:
            print(f"Error connecting to the Marvel API: {response.status_code} - {response.reason}")
            return False
    except requests.RequestException as e:
        print(f"Connection error to the Marvel API: {e}")
        return False    

def get_data(base_url, endpoint, params=None):
    """
    Performs a paginated GET request to an API to retrieve all available data.

    Parameters:
    base_url (str): The base URL of the API.
    endpoint (str): The specific endpoint of the API to make the request to.
    params (dict, optional): The request parameters, including pagination (limit and offset).

    Returns:
    list: A list with all the results retrieved from the API.
    None: If an error occurs during the request.
    """
    try:
        all_results = []
        limit = 100  # Maximum number of results per page allowed by the API
        offset = 0   # Pagination starts from the first result
        
        while True:
            # Update the 'offset' and 'limit' parameters in each request
            params.update({'limit': limit, 'offset': offset})
            response = requests.get(f"{base_url}/{endpoint}", params=params)
            response.raise_for_status()
            
            data = response.json()
            results = data['data']['results']
            all_results.extend(results)  # Store the retrieved results
            
            # Check if there are more results to fetch
            total_results = data['data']['total']
            if len(all_results) >= total_results:
                break
            
            # Update the offset to retrieve the next page of results
            offset += limit
        
        return all_results
    except requests.exceptions.RequestException as e:
        print(f"Request error: {e}")
        return None

def get_comics(base_url, endpoint, params=None):
    """
    Performs a paginated GET request to the API to retrieve comic data, limiting the total to 500 results.
    This limit of 500 is set to simplify the practice and testing of the data pipeline.

    Parameters:
    base_url (str): The base URL of the API.
    endpoint (str): The specific endpoint of the API for comics.
    params (dict, optional): The request parameters, including pagination (limit and offset).

    Returns:
    list: A list with the results retrieved from the API, limited to 1000 items.
    None: If an error occurs during the request.
    """
    try:
        all_results = []
        limit = 100  # Maximum number of results per page allowed by the API
        offset = 0   # Pagination starts from the first result
        
        while True:
            # Update the 'offset' and 'limit' parameters in each request
            params.update({'limit': limit, 'offset': offset})
            response = requests.get(f"{base_url}/{endpoint}", params=params)
            response.raise_for_status()
            
            data = response.json()
            results = data['data']['results']
            all_results.extend(results)  # Store the retrieved results
            
            # Check if there are more results to fetch
            total_results = data['data']['total']
            #if len(all_results) >= total_results:
            if len(all_results) >= 500:
                break
            
            # Update the offset to retrieve the next page of results
            offset += limit
        
        return all_results
    except requests.exceptions.RequestException as e:
        print(f"Request error: {e}")
        return None
    
def build_table(json_data, record_path=None):
    """
    Builds a pandas DataFrame from JSON formatted data.

    Parameters:
    json_data (dict/list): The data in JSON format obtained from an API.

    Returns:
    DataFrame: A pandas DataFrame containing the data, or None if there's an error.
    """
    try:
        df = pd.json_normalize(json_data, record_path)
        return df
    except ValueError:
        print("The data is not in the expected format")
        return None

def save_to_parquet(df, output_path, partition_cols=None):
    """
    Saves a DataFrame in Parquet format in the specified directory using fastparquet.

    Parameters:
    df (DataFrame): The DataFrame to save.
    path (str): The file path where the DataFrame will be saved.
    partition_cols (list, optional): Columns by which the data will be partitioned.
    """
    directory = os.path.dirname(output_path)
    if directory and not os.path.exists(directory):
        os.makedirs(directory)
    df.to_parquet(output_path, partition_cols=partition_cols, engine='fastparquet')

def get_last_extraction_date(file_path):
    """
    Retrieves the date of the last incremental extraction from a file.

    Parameters:
    file_path (str): The file path that contains the last extraction date.

    Returns:
    datetime: The date of the last extraction, or None if there's an error.
    """
    try:
        with open(file_path, 'r') as file:
            last_extraction_date = file.readline().strip()
            return datetime.strptime(last_extraction_date, '%Y-%m-%d')
    except (FileNotFoundError, ValueError):
        return None

def update_last_extraction_date(file_path, extraction_date):
    """
    Updates the date of the last incremental extraction in a file.

    Parameters:
    file_path (str): The file path where the extraction date will be saved.
    extraction_date (datetime): The date of the current extraction.
    """
    with open(file_path, 'w') as file:
        file.write(extraction_date.strftime('%Y-%m-%d'))

# def get_data(base_url, endpoint, params=None):
#     try:
#         all_results = []
       
#         while True:
#             response = requests.get(f"{base_url}/{endpoint}", params=params)
#             response.raise_for_status()
            
#             data = response.json()
#             results = data['data']['results']
#             all_results.extend(results)  # Almacena los resultados obtenidos
#             return all_results
#     except requests.exceptions.RequestException as e:
#         print(f"Error en la petición: {e}")
#         return None


## Authentication in the Marvel API

### Requirements for connecting to the Marvel API:
To connect to the Marvel API, we must define our account's public and private keys in a `.env` file. These keys are:

- **MARVEL_PUBLIC_KEY**: The public key provided by the API.
- **MARVEL_PRIVATE_KEY**: The corresponding private key.

### Connection Test

The code below performs authentication with the Marvel API and checks if the connection is successful.

### Steps:

1. **Obtain the keys from environment variables:**
   We use `os.getenv` to get the API keys from the environment variables configured in the `.env` file. If the keys are not properly configured, the program will throw an error.

2. **Generate the timestamp and hash for authentication:**
   A `timestamp` is created with the current time, and an MD5 hash is generated by combining the `timestamp`, the private key, and the public key. This hash is required by the API to authenticate the requests.

3. **Define the request parameters:**
   The parameters `ts`, `apikey`, and `hash` are sent in each request to the API to ensure correct authentication.

4. **Check the connection to the Marvel API:**
   We use the `test_api_connection()` function to verify if the Marvel API is accessible and if authentication was successful.


In [4]:
# Requirements for making a request to the Marvel API:

# Obtain the keys from environment variables
public_key = os.getenv('MARVEL_PUBLIC_KEY')
private_key = os.getenv('MARVEL_PRIVATE_KEY')

if not public_key or not private_key:
    raise ValueError("API keys are not configured in environment variables")

# Generate the timestamp using datetime
ts = str(int(datetime.now().timestamp()))

# Generate the MD5 hash
hash_str = ts + private_key + public_key
md5_hash = hashlib.md5(hash_str.encode('utf-8')).hexdigest()  # hexdigest() converts the hash to a hexadecimal representation, which is more readable for humans and commonly used in applications that require hashes.

# Define params
params = {
    'ts': ts,
    'apikey': public_key,
    'hash': md5_hash
}

# Check the connection to the API:

url_api = "http://gateway.marvel.com/v1/public/characters"

if test_api_connection(url_api, params):
    print("The Marvel API is accessible and the connection was successful.")
else:
    print("Failed to connect to the Marvel API.")


Successfully connected to the Marvel API.
The Marvel API is accessible and the connection was successful.


## Data Extraction

### Process Description

This code block performs incremental data extraction from the Marvel API, ensuring that the data is always up-to-date. We define the update frequency and then proceed to extract data on:

1. **Marvel Characters**  
2. **Marvel Comics**

The date of the last extraction is stored and used to determine whether a new extraction needs to be executed. After each successful extraction, this date is updated for future incremental extractions.

### Steps:

1. **Define the update frequency:**
   Specify that the incremental extraction should be executed once per day.

2. **Load the last extraction date:**
   Read the date of the last extraction stored in a text file, which is used to check if enough time has passed to execute the next extraction.

3. **Extract character data:**
   Connect to the Marvel API and download character data. If the extraction is successful, the data is saved in Parquet format.

4. **Extract comics related to characters:**
   Obtain a list of comics in which each character appears and store the character-comic relationship in the Data Lake in Parquet format.

5. **Extract comic data:**
   Download comic information from the Marvel API and save it in Parquet format if new data is obtained.

6. **Update the last extraction date:**
   If new data was obtained, update the last extraction date in the corresponding file.

In [5]:
# Define how often the extraction will be updated

# Path to store the last incremental extraction date
LAST_EXTRACTION_DATE_FILE = "datalake/bronze/marvel_api/last_extraction_date.txt"

# Get the date of the last incremental extraction
last_extraction_date = get_last_extraction_date(LAST_EXTRACTION_DATE_FILE)
current_date = datetime.now()

# Check if incremental extraction should run (once per day)
should_run_incremental = last_extraction_date is None or (current_date - last_extraction_date).days >= 1

# --------- Data Extraction --------- #

url_api = "http://gateway.marvel.com/v1/public"

# Data extraction of characters
endpoint_characters = "characters"

if should_run_incremental:
    data_characters = get_data(url_api, endpoint_characters, params)
    characters_df = build_table(data_characters)

    if not characters_df.empty:
        save_to_parquet(characters_df, "datalake/bronze/marvel_api/characters/characters.parquet")
    
    # Create a dataframe with a list of comics in which each character appears
    # List to store the character-comics relationship
    character_comics = []

    # Iterate over each character in the characters results list
    for character in data_characters:
        # Step 1: Get the character ID
        character_id = character['id']
        
        # Step 2: Get the list of comics in which the character appears
        comics = character['comics']['items']
        
        # Step 3: Create an empty list to store the comic IDs
        comic_ids = []
        
        # Step 4: Iterate over each comic in the comics list
        for comic in comics:
            # Extract the comic URI and split it into parts using '/' as a separator
            parts = comic['resourceURI'].split('/')
            
            # Step 5: Get the last element of the 'parts' list, which is the comic ID
            comic_id = parts[-1]
            
            # Step 6: Add the comic ID to the 'comic_ids' list
            comic_ids.append(comic_id)
        
        # Step 7: Create a relationship between the character and their list of comics
        relationship = {
            'character_id': character_id,
            'comic_ids': comic_ids
        }
        
        # Step 8: Add the relationship to the general 'character_comics' list
        character_comics.append(relationship)
    
    # Create the DataFrame from the list
    character_comics_df = build_table(character_comics)

    if not character_comics_df.empty:
        save_to_parquet(character_comics_df, "datalake/bronze/marvel_api/characters/character_comics.parquet")

# Data extraction of comics
endpoint_comics = "comics"

if should_run_incremental:
    data_comics = get_comics(url_api, endpoint_comics, params)
    comics_df = build_table(data_comics)

    if not comics_df.empty:
        save_to_parquet(comics_df, "datalake/bronze/marvel_api/comics/comics.parquet")

# Update the last incremental extraction date if new data was obtained
if should_run_incremental and not comics_df.empty:
    last_extraction_date = datetime.now()
    update_last_extraction_date(LAST_EXTRACTION_DATE_FILE, last_extraction_date)
    # Print a message indicating that the extraction and data saving were completed
    print("Extraction and data saving completed.")
else:
    print("No updates")

No updates



### Data Transformations:

In this block, several transformations are performed on the data from the `characters` and `comics` tables obtained from the `bronze` layer of the data lake. These transformations include data loading, DataFrame merging, removal of duplicate and null values, optimization of data types for performance improvement, date manipulation, and renaming of columns for clarity. Finally, the transformed data is stored in the `silver` layer in Parquet format.

1. **`characters` Table**:
   - Load the Parquet files for `characters` and `character_comics`.
   - Add a new column `character_comic_appearances` to the `characters` DataFrame by joining the DataFrames based on `character_id`.
   - Filter the relevant columns for analysis.
   - Replace empty strings with null values (`NaN`) and impute default values for null fields.
   - Optimize numerical column types to smaller types (`int16`) to reduce memory usage, and convert some text columns to `categories`.
   - Format the `modified` date column to `datetime`, handling invalid values and setting a default date for null values.
   - Rename columns for better clarity.
   - Finally, store the resulting DataFrame in the `silver` layer in Parquet format.

2. **`comics` Table**:
   - Load the Parquet file for the `comics` table.
   - Filter the relevant columns for analysis.
   - Replace empty strings with null values (`NaN`) and impute default values for null fields.
   - Optimize numerical column types to smaller types (`int16` or `int32`) and convert some text columns to `categories`.
   - Format the `modified` date column to `datetime`, handling invalid values and setting a default date for null values.
   - Rename columns for better clarity.
   - Finally, store the resulting DataFrame in the `silver` layer in Parquet format.

In [12]:
# -------------- Transformations df characters -------------- #

# Fetch the characters dataframe
characters_df = pd.read_parquet("datalake/bronze/marvel_api/characters/characters.parquet")

# Fetch the character_comics dataframe
character_comics_df = pd.read_parquet("datalake/bronze/marvel_api/characters/character_comics.parquet")

# Add the 'comic_ids' column to the characters DataFrame by merging the DataFrames based on the 'character_id' column
characters_df = characters_df.merge(character_comics_df, left_on='id', right_on='character_id', how='left')

# Filter the columns of interest
filter_columns = ['id', 'name', 'description', 'thumbnail.path', 'thumbnail.extension', 'comics.available', 'comic_ids', 'series.available', 'stories.available', 'modified']
characters_df = characters_df[filter_columns]

# Replace empty strings with NaN in the 'description', 'comics.available', etc. columns
characters_df['description'].replace('', np.nan, inplace=True)
characters_df['comics.available'].replace('', np.nan, inplace=True)
characters_df['series.available'].replace('', np.nan, inplace=True)
characters_df['stories.available'].replace('', np.nan, inplace=True)

# Null value handling
imputation_mapping = {
    "comics.available": 0,
    "series.available": 0,
    "stories.available": 0,
    "description": 'Description not available'
}
characters_df = characters_df.fillna(imputation_mapping)

# Column type conversions, change the data type of numeric columns to a smaller type
conversion_mapping = {
    "comics.available": "int16",
    "series.available": "int16",
    "stories.available": "int16",
    "thumbnail.extension": "category"
}
characters_df = characters_df.astype(conversion_mapping)

# Date type conversions
# From the 'modified' column, select only the first 10 characters which is just the date
characters_df['modified'] = characters_df['modified'].str.slice(0, 10)
# Convert 'modified' column to datetime, ignoring errors
characters_df['modified'] = pd.to_datetime(characters_df['modified'], errors='coerce')
# Replace NaT values with a predetermined date, which should be greater than 1970 for .parquet compatibility
characters_df['modified'] = characters_df['modified'].fillna(pd.Timestamp('1971-01-01'))

# Rename columns
characters_df.rename(columns={
    "id": "character_id",
    "name": "character_name",
    "thumbnail.path": "thumbnail_path",
    "thumbnail.extension": "thumbnail_extension",
    "comics.available": "total_comics",
    "comic_ids": "character_comic_appearances",
    "series.available": "total_series",	
    "stories.available": "total_stories"
}, inplace=True)

characters_df['character_comic_appearances'] = characters_df['character_comic_appearances'].astype(str)

characters_df.info(memory_usage='deep')

# Store data in parquet format in the silver layer
if not characters_df.empty:
    save_to_parquet(characters_df, "datalake/silver/marvel_api/characters/characters.parquet")

# -------------- Transformations comics table -------------- #
comics_df = pd.read_parquet("datalake/bronze/marvel_api/comics/comics.parquet")
comics_filter_columns = ['id', 'digitalId', 'title', 'issueNumber', 'variantDescription', 'description', 'isbn', 'upc', 'diamondCode', 'characters.items', 'stories.available', 'events.available', 'modified']
comics_df = comics_df[comics_filter_columns]
comics_df

# Replace empty strings with NaN in the columns
comics_df['variantDescription'].replace('', np.nan, inplace=True)
comics_df['description'].replace('', np.nan, inplace=True)
comics_df['isbn'].replace('', np.nan, inplace=True)
comics_df['upc'].replace('', np.nan, inplace=True)
comics_df['diamondCode'].replace('', np.nan, inplace=True)

# Null value handling
imputation_mapping = {
    "variantDescription": 'Description not available',
    "description": 'Description not available',
    "isbn": 'No',
    "upc": 'No',
    "diamondCode": 'No'
}
comics_df = comics_df.fillna(imputation_mapping)

# Column type conversions, change the data type of numeric columns to a smaller type
conversion_mapping = {
    "id": "int32",
    "digitalId": "int32",
    "title": "category",
    "description": "category",
    "isbn": "category",
    "upc": "category",
    "diamondCode": "category",
    "issueNumber": "int16",
    "stories.available": "int16",
    "events.available": "int16"
}
comics_df = comics_df.astype(conversion_mapping)

comics_df['characters.items'] = comics_df['characters.items'].astype(str)

# Date type conversions
# Convert 'modified' column to datetime, handling invalid values
comics_df['modified'] = comics_df['modified'].str.slice(0, 10)
# Convert 'modified' column to datetime, ignoring errors
comics_df['modified'] = pd.to_datetime(comics_df['modified'], errors='coerce')
# Replace NaT values with a predetermined date, which should be greater than 1970 for .parquet compatibility
comics_df['modified'] = comics_df['modified'].fillna(pd.Timestamp('1970-01-01'))

# Rename columns
comics_df.rename(columns={
    "id": "comic_id",
    "digitalId": "comic_digitalId",
    "variantDescription": "variant_description",
    "diamondCode": "diamond_code",
    "issueNumber": "issue_number",	
    "stories.available": "total_stories",
    "events.available": "total_events",
    "characters.items": "characters_items"
}, inplace=True)

# Store data in parquet format in the silver layer
if not comics_df.empty:
    save_to_parquet(comics_df, "datalake/silver/marvel_api/comics/comics.parquet")


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20 entries, 0 to 19
Data columns (total 10 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   character_id                 20 non-null     int64         
 1   character_name               20 non-null     object        
 2   description                  20 non-null     object        
 3   thumbnail_path               20 non-null     object        
 4   thumbnail_extension          20 non-null     category      
 5   total_comics                 20 non-null     int16         
 6   character_comic_appearances  20 non-null     object        
 7   total_series                 20 non-null     int16         
 8   total_stories                20 non-null     int16         
 9   modified                     20 non-null     datetime64[ns]
dtypes: category(1), datetime64[ns](1), int16(3), int64(1), object(4)
memory usage: 9.5 KB


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  characters_df['description'].replace('', np.nan, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  characters_df['comics.available'].replace('', np.nan, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate 

### Data Loading into AWS Redshift:

This block handles loading the transformed data, located in the `silver` layer of the datalake, into an AWS Redshift database. The process includes the following steps:

1. **Loading Data from the `silver` Layer**:
   - The corresponding DataFrames for the `characters` and `comics` tables are loaded from Parquet files located in the datalake.

2. **Accessing Redshift Credentials**:
   - The credentials needed to connect to Redshift (host, database, user, password, and port) are obtained via environment variables.

3. **Connecting to Redshift**:
   - A connection to the Redshift database is established using `psycopg2`. If the connection is successful, a confirmation message is printed.

4. **Creating Tables in Redshift**:
   - SQL queries are defined and executed to create the `characters` and `comics` tables in Redshift, if they do not already exist. These tables include the transformed and optimized columns from the previous step.

5. **Inserting Data**:
   - A function is defined to insert data from the DataFrames into the Redshift tables. The data from each DataFrame is converted into a list of tuples and inserted into Redshift using dynamically generated SQL queries.

6. **Closing the Connection**:
   - The cursor and the connection to Redshift are closed, and a message is printed indicating that the data insertion has been completed.


In [11]:
characters_df = pd.read_parquet("datalake/silver/marvel_api/characters/characters.parquet")
comics_df = pd.read_parquet("datalake/silver/marvel_api/comics/comics.parquet")

# Access environment variables
redshift_host = os.getenv('REDSHIFT_HOST')
redshift_db = os.getenv('REDSHIFT_DB')
redshift_user = os.getenv('REDSHIFT_USER')
redshift_password = os.getenv('REDSHIFT_PASSWORD')
redshift_port = os.getenv('REDSHIFT_PORT')

# Establish connection with Redshift using psycopg2
conn = psycopg2.connect(
    host=redshift_host,
    dbname=redshift_db,
    user=redshift_user,
    password=redshift_password,
    port=redshift_port
)

if conn:
    print('successful connection')
else:
    print("Error connecting to Redshift")

# Create a cursor to execute queries
cur = conn.cursor()

# Define queries to create tables in Redshift
queries = [
    """
    CREATE TABLE IF NOT EXISTS public.characters (
        character_id INT NOT NULL PRIMARY KEY,
        character_name VARCHAR(255),
        description VARCHAR(MAX),
        thumbnail_path VARCHAR(255),
        thumbnail_extension VARCHAR(50),
        total_comics SMALLINT,
        character_comic_appearances VARCHAR(MAX),
        total_series SMALLINT,
        total_stories SMALLINT,
        modified TIMESTAMP
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS public.comics (
        comic_id INT NOT NULL PRIMARY KEY,
        comic_digitalId INT NOT NULL,
        title VARCHAR(255),
        issue_number SMALLINT,
        variant_description VARCHAR(MAX),
        description VARCHAR(MAX),
        isbn VARCHAR(50),
        upc VARCHAR(50),
        diamond_code VARCHAR(50),
        characters_items VARCHAR(MAX),
        total_stories SMALLINT,
        total_events SMALLINT,
        modified TIMESTAMP
    );
    """
]

# Execute the queries to create the tables in Redshift
for query in queries:
    cur.execute(query)

# Commit the operations
conn.commit()

# Function to insert data using psycopg2
def insert_data(cursor, conn, df, table_name):
    # Convert the DataFrame into a list of tuples
    data_tuples = list(df.itertuples(index=False, name=None))
    
    # Generate the SQL for data insertion
    placeholders = ', '.join(['%s'] * len(df.columns))
    columns = ', '.join(df.columns)
    insert_query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"

    # Execute the insertion into Redshift
    cursor.executemany(insert_query, data_tuples)
    conn.commit()

# Save the DataFrames to the Redshift database
insert_data(cur, conn, characters_df, 'characters')
insert_data(cur, conn, comics_df, 'comics')

# Close the cursor and the connection
cur.close()
conn.close()

# Print a message indicating the data insertion was completed
print("Data insertion into Redshift database completed.")



conexion exitosa
Inserción de datos en la base de datos Redshift completada.
