In [2]:

from dotenv import load_dotenv
import jikanpy
import os
import requests
import pandas as pd
from datetime import datetime, timezone
import plotly.express as px
import plotly.graph_objects as go
load_dotenv()

secret = os.getenv("secret")
id = os.getenv("id")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_KEY")
jikan = jikanpy.Jikan()

In [3]:
from boto3.dynamodb.types import TypeDeserializer
import boto3
from boto3.dynamodb.conditions import Key, Attr

def get_analytics_data():
    # Connect to dynamodb
    dynamodb = boto3.client(
        "dynamodb",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name="us-west-2"
    )
    df_dicts = {}
    
    
    # get top 25 data
    response_top_25 = dynamodb.scan(TableName="anime-dashboard-top-25", Limit=100)
    
    # convert to pandas dataframe
    df_dicts['top_25'] = parse_analytics_data(response_top_25['Items'])
    
    
    return df_dicts


def _deserialize(data):
    deserializer = TypeDeserializer()
    return {k: deserializer.deserialize(v) for k, v in data.items()}

def parse_analytics_data(data):
    deserialized_data = [_deserialize(x) for x in data]
    data = [pd.json_normalize(x, 'data', ['timestamp']) for x in deserialized_data]
    return pd.concat(data, ignore_index=True)

In [8]:
from decimal import Decimal
def get_top_data():
    url = "https://api.jikan.moe/v4/top/anime"
    try:
        # Make a GET request to the Jikan API
        response = requests.get(url)
        response.raise_for_status()  # Raise an HTTPError for bad responses (4xx and 5xx)

        # Parse the JSON response
        top_25 = response.json()
        res = []
        for data in top_25['data'][0:25]:  # Get only the top 25 entries
            item = {
                'mal_id': data['mal_id'],
                'title': data['title'],
                'url': data['url'],
                'image_url': data['images']['webp']['image_url'],
                'title_english': data.get('title_english'),  # Use .get() to handle missing keys
                'title_japanese': data.get('title_japanese'),
                'licensors': {
                    'names': [x['name'] for x in data['licensors']],
                    'urls':  [x['url'] for x in data['licensors']]
                },
                'studios': {
                    'names': [x['name'] for x in data['studios']],
                    'urls': [x['url'] for x in data['studios']]
                },
                'genres': [x['name'] for x in data['genres']],
                'statistics': {
                    'rank': Decimal(str(data['rank'])),
                    'score': Decimal(str(data['score'])),
                    'members': Decimal(str(data['members'])),
                    'favorites': Decimal(str(data['favorites'])),
                    'scored_by': Decimal(str(data['scored_by'])),
                    'popularity': Decimal(str(data['popularity']))
                }
            }
            res.append(item)
        print(res)
        return res
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"Error fetching data from Jikan API: {str(e)}")

In [16]:
snapshot = get_top_data()
snapshot[0]

[{'mal_id': 52991, 'title': 'Sousou no Frieren', 'url': 'https://myanimelist.net/anime/52991/Sousou_no_Frieren', 'image_url': 'https://cdn.myanimelist.net/images/anime/1015/138006.webp', 'title_english': "Frieren: Beyond Journey's End", 'title_japanese': '葬送のフリーレン', 'licensors': {'names': ['Crunchyroll'], 'urls': ['https://myanimelist.net/anime/producer/1468/Crunchyroll']}, 'studios': {'names': ['Madhouse'], 'urls': ['https://myanimelist.net/anime/producer/11/Madhouse']}, 'genres': ['Adventure', 'Drama', 'Fantasy'], 'statistics': {'rank': Decimal('1'), 'score': Decimal('9.31'), 'members': Decimal('1018122'), 'favorites': Decimal('61814'), 'scored_by': Decimal('591629'), 'popularity': Decimal('164')}}, {'mal_id': 5114, 'title': 'Fullmetal Alchemist: Brotherhood', 'url': 'https://myanimelist.net/anime/5114/Fullmetal_Alchemist__Brotherhood', 'image_url': 'https://cdn.myanimelist.net/images/anime/1208/94745.webp', 'title_english': 'Fullmetal Alchemist: Brotherhood', 'title_japanese': '鋼の錬金

{'mal_id': 52991,
 'title': 'Sousou no Frieren',
 'url': 'https://myanimelist.net/anime/52991/Sousou_no_Frieren',
 'image_url': 'https://cdn.myanimelist.net/images/anime/1015/138006.webp',
 'title_english': "Frieren: Beyond Journey's End",
 'title_japanese': '葬送のフリーレン',
 'licensors': {'names': ['Crunchyroll'],
  'urls': ['https://myanimelist.net/anime/producer/1468/Crunchyroll']},
 'studios': {'names': ['Madhouse'],
  'urls': ['https://myanimelist.net/anime/producer/11/Madhouse']},
 'genres': ['Adventure', 'Drama', 'Fantasy'],
 'statistics': {'rank': Decimal('1'),
  'score': Decimal('9.31'),
  'members': Decimal('1018122'),
  'favorites': Decimal('61814'),
  'scored_by': Decimal('591629'),
  'popularity': Decimal('164')}}

In [7]:
# df contains the data from the dynamodb table for testing purposes

# load 3 tables
dynamodb = boto3.resource(
        "dynamodb",
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name="us-west-2"
    )
facts_table = dynamodb.Table("anime_top_25_facts")
anime_dimension = dynamodb.Table("anime_dimension")
top_25_rank = dynamodb.Table("top_25_rank_history")

In [14]:
def get_timestamp():
    timestamp = datetime.now(timezone.utc).isoformat(timespec='seconds').replace("+00:00", "Z")
    return timestamp

def update_fact_table(mal_id, snapshot_timestamp, statisitcs):
    if mal_id is None or statisitcs is None:
        print("mal_id or statistics is None")
        return
    item = {
        'mal_id': mal_id,
        'snapshot_timestamp': snapshot_timestamp,
        'favorites': Decimal(statisitcs['favorites']),
        'scored_by': Decimal(statisitcs['scored_by']),
        'members': Decimal(statisitcs['members']),
        'popularity': Decimal(statisitcs['popularity']),
        'score': Decimal(statisitcs['score']),
    }
    facts_table.put_item(Item=item)
    print(f"Inserted new record for mal_id: {mal_id}")
    return

def update_anime_dimension_item(mal_id, snapshot_item):
    if mal_id is None or snapshot_item is None:
        print("mal_id or snapshot_item is None")
        return
    
    # check if mal_id already exists in the table
    response = anime_dimension.query(
        KeyConditionExpression=Key('mal_id').eq(mal_id)
    )
    
    # if mal_id does not exist, insert a new record
    if response['Count'] == 0:
        item = {
            'mal_id': mal_id,
            'title': snapshot_item['title'],
            'title_english': snapshot_item['title_english'],
            'title_japanese': snapshot_item['title_japanese'],
            'image_url': snapshot_item['image_url'],
            'url': snapshot_item['url'],
            'genres': snapshot_item['genres'],
            'licensors_names': snapshot_item['licensors']['names'],
            'licensors_urls': snapshot_item['licensors']['urls'],
            'studios_names': snapshot_item['studios']['names'],
            'studios_urls': snapshot_item['studios']['urls'],
        }
        anime_dimension.put_item(Item=item)
        print(f"Inserted new record for mal_id: {mal_id}")
        return
    print(f"Record already exists for mal_id: {mal_id}")
    return


# SCD Type 2 handling
def update_top_25_rank(mal_id, item_rank, snapshot_timestamp):
    # query to get the current rank
    response = top_25_rank.query(
        KeyConditionExpression=Key('mal_id').eq(mal_id),
        FilterExpression=Attr('current_flag').eq(True)
    )
    
    # get the current record
    current_record = response.get('Items', [])
    
    # if there is no current record, insert a new record
    if len(current_record) == 0:
        item = {
            'mal_id': mal_id,
            'rank': item_rank,
            'effective_date': snapshot_timestamp,
            'end_date': None,
            'current_flag': True
        }
        top_25_rank.put_item(Item=item)
        print(f"Inserted new record for mal_id: {mal_id} with rank: {item_rank}")
        return
    
    # if there is a current record, get the current record
    current_record = current_record[0]

    # check if the rank has changed
    if current_record['rank'] != item_rank:
        # update the current record
        top_25_rank.update_item(
            Key={
                'mal_id': mal_id,
                'effective_date': current_record['effective_date']
            },
            UpdateExpression="SET current_flag = :f, end_date = :ed",
            ExpressionAttributeValues={
                ':f': False,
                ':ed': snapshot_timestamp
            }
        )
        
        # insert a new record with updated rank
        item = {
            'mal_id': mal_id,
            'rank': item_rank,
            'effective_date': snapshot_timestamp,
            'end_date': None,
            'current_flag': True
        }
        top_25_rank.put_item(Item=item)
        print(f"Updated record for mal_id: {mal_id}")
    else:
        print(f"Rank has not changed for mal_id: {mal_id}")
    
    return
        

In [17]:
for item in snapshot:
    mal_id = item.get('mal_id') # mal_id of the anime
    snapshot_timestamp = get_timestamp() # timestamp of the snapshot
    
    print(f"Handling facts table for mal_id: {mal_id}")
    # handle facts table
    update_fact_table(mal_id, snapshot_timestamp, item.get('statistics'))
    
    print(f"Handling anime dimension table for mal_id: {mal_id}")
    # handle anime dimension table
    update_anime_dimension_item(mal_id, item)
        
    print(f"Handling top 25 rank table for mal_id: {mal_id}")
    # handle top 25 rank table (SCD)
    item_rank = item.get('statistics', {}).get('rank')
    if item_rank is not None:    
        update_top_25_rank(mal_id, item_rank, snapshot_timestamp)
    else:
        raise ValueError(f"Rank is missing for mal_id: {mal_id}")
        
    
    

Handling facts table for mal_id: 52991
Inserted new record for mal_id: 52991
Handling anime dimension table for mal_id: 52991
Record already exists for mal_id: 52991
Handling top 25 rank table for mal_id: 52991
Rank has not changed for mal_id: 52991
Handling facts table for mal_id: 5114
Inserted new record for mal_id: 5114
Handling anime dimension table for mal_id: 5114
Record already exists for mal_id: 5114
Handling top 25 rank table for mal_id: 5114
Rank has not changed for mal_id: 5114
Handling facts table for mal_id: 9253
Inserted new record for mal_id: 9253
Handling anime dimension table for mal_id: 9253
Record already exists for mal_id: 9253
Handling top 25 rank table for mal_id: 9253
Rank has not changed for mal_id: 9253
Handling facts table for mal_id: 60022
Inserted new record for mal_id: 60022
Handling anime dimension table for mal_id: 60022
Record already exists for mal_id: 60022
Handling top 25 rank table for mal_id: 60022
Rank has not changed for mal_id: 60022
Handling fac

# Use airflow to orchestrate the pipeline