In [9]:
id = 'OMDB-Bronze'

In [10]:
from common_function import readConfig, appendMode, absPath, updatePipelineStatus
import pandas as pd
import os
from datetime import datetime
import time
import requests
from dotenv import load_dotenv
from pathlib import Path



In [11]:
config = readConfig(id)
config

✓ Config loaded successfully: OMDB-Bronze.json


{'pipeline_id': 'OMDB-Bronze',
 'source': {'type': 'api'},
 'validation_path': 'data\\01_bronze\\revenues',
 'target': {'database': 'bronze',
  'table': 'omdb',
  'format': 'parquet',
  'name': 'omdb',
  'path': 'data\\01_bronze',
  'mode': 'append',
  'partition_by': ['_tf_ingestion_time']}}

In [12]:


# Load environment variables
load_dotenv()

def fetchOMDBData(titles: list) -> pd.DataFrame:
    """
    Fetch movie data from OMDB API for multiple titles.
    Handles rate limits gracefully - stops when limit hit, returns what was fetched.
    
    Args:
        titles: List of movie titles to fetch
        
    Returns:
        pd.DataFrame: Movie metadata from OMDB (partial if rate limited)
    """
    
    api_key = os.getenv('OMDB_API_KEY')
    
    if not api_key:
        print("⚠️  OMDB_API_KEY not found in environment")
        return pd.DataFrame()
    
    base_url = "http://www.omdbapi.com/"
    results = []
    rate_limited = False
    
    for i, title in enumerate(titles, 1):
        if rate_limited:
            print(f"⚠️  Rate limit reached - stopping. {len(results)}/{len(titles)} titles fetched")
            break
            
        print(f"Fetching {i}/{len(titles)}: {title}")
        
        try:
            # Make API request
            response = requests.get(base_url, params={
                'apikey': api_key,
                't': title,
                'type': 'movie'
            })
            
            data = response.json()
            
            # Check for rate limit error
            if data.get('Response') == 'False':
                error = data.get('Error', '')
                if 'limit' in error.lower() or 'request limit' in error.lower():
                    print(f"  ⚠️  API rate limit reached: {error}")
                    rate_limited = True
                    break
                else:
                    print(f"  ⚠️  Not found: {title} - {error}")
                    continue
            
            # Parse Ratings array
            ratings_list = data.get('Ratings', [])
            imdb_rating = None
            rotten_tomatoes = None
            metacritic = None
            
            for rating in ratings_list:
                source = rating.get('Source', '')
                value = rating.get('Value', '')
                if 'Internet Movie Database' in source or 'IMDb' in source:
                    imdb_rating = value
                elif 'Rotten Tomatoes' in source:
                    rotten_tomatoes = value
                elif 'Metacritic' in source:
                    metacritic = value
            
            results.append({
                'title': data.get('Title'),
                'year': data.get('Year'),
                'rated': data.get('Rated'),
                'released': data.get('Released'),
                'runtime': data.get('Runtime'),
                'genre': data.get('Genre'),
                'director': data.get('Director'),
                'writer': data.get('Writer'),
                'actors': data.get('Actors'),
                'plot': data.get('Plot'),
                'language': data.get('Language'),
                'country': data.get('Country'),
                'awards': data.get('Awards'),
                'poster': data.get('Poster'),
                'imdb_rating': imdb_rating or data.get('imdbRating'),
                'rotten_tomatoes': rotten_tomatoes,
                'metacritic': metacritic,
                'metascore': data.get('Metascore'),
                'imdb_votes': data.get('imdbVotes'),
                'imdb_id': data.get('imdbID'),
                'box_office': data.get('BoxOffice'),
                'production': data.get('Production'),
                'website': data.get('Website')
            })
            
            # Rate limiting - 5 requests per second max
            time.sleep(0.25)
            
        except requests.exceptions.RequestException as e:
            print(f"  ⚠️  Network error fetching {title}: {e}")
            # Continue to next title on network errors
            continue
        except Exception as e:
            print(f"  ⚠️  Error fetching {title}: {e}")
            continue
    
    df = pd.DataFrame(results)
    
    if rate_limited:
        print(f"\n⚠️  Rate limit hit - fetched {len(df)}/{len(titles)} movies")
        print(f"   Remaining {len(titles) - len(df)} titles will be fetched on next run")
    else:
        print(f"\n✓ Fetched {len(df)} movies from OMDB")
    
    return df



def getNewTitlesToFetch(revenues_path: str, omdb_path: str) -> list:
    """
    Get list of new titles from revenues that don't exist in OMDB bronze layer.
    
    Args:
        revenues_path: Path to bronze revenues parquet
        omdb_path: Path to bronze OMDB parquet
        
    Returns:
        list: Distinct titles that need to be fetched from OMDB API
    """
    
    # Load distinct titles from revenues
    df_revenues = pd.read_parquet(revenues_path, engine='fastparquet')
    titles_in_revenues = set(df_revenues['title'].unique())
    print(f"✓ Found {len(titles_in_revenues)} distinct titles in revenues")
    
    # Load existing titles from OMDB (if exists)
    if Path(omdb_path).exists():
        df_omdb = pd.read_parquet(omdb_path, engine='fastparquet')
        titles_in_omdb = set(df_omdb['title'].unique())
        print(f"✓ Found {len(titles_in_omdb)} existing titles in OMDB bronze")
    else:
        titles_in_omdb = set()
        print("✓ No existing OMDB data found (first run)")
    
    # Find new titles (set difference)
    new_titles = titles_in_revenues - titles_in_omdb
    
    print(f"✓ {len(new_titles)} new titles to fetch from OMDB API")
    
    return sorted(list(new_titles))

In [13]:
#generate titles that are not present in db

revenues_path = os.path.join(absPath(), config["validation_path"])
omdb_path = os.path.join(absPath(), config["target"]["path"],config["target"]["name"])
titles = getNewTitlesToFetch(revenues_path,omdb_path)

✓ Found 6547 distinct titles in revenues
✓ Found 918 existing titles in OMDB bronze
✓ 5638 new titles to fetch from OMDB API


In [14]:

df = fetchOMDBData(titles)

# print(df_omdb[['title', 'year', 'genre', 'imdb_rating']])

Fetching 1/5638: 2.0
  ⚠️  API rate limit reached: Request limit reached!

⚠️  Rate limit hit - fetched 0/5638 movies
   Remaining 5638 titles will be fetched on next run


In [15]:
df["_tf_ingestion_time"] = int(time.time())  # Unix timestamp
df["_tf_ingestion_date"] = datetime.now().strftime("%Y-%m-%d")

appendMode(df, os.path.join(absPath(),f"{config['target']['path']}\\{config['target']['name']}")
           , format="parquet"
           ,partition_cols = config['target']["partition_by"])

✓ Appended 0 records to C:\Users\rogoz\Documents\own_projects\futuremind-assesment\data\01_bronze\omdb (total: 0)


In [16]:
updatePipelineStatus(id, status='success')

✓ Updated pipeline status: OMDB-Bronze - success
