In [9]:
import requests
from dataclasses import dataclass
from typing import List, Optional, Dict
import conf
import os
from datetime import datetime, date, timedelta
from parsing import parse_movie_data
import json


In [10]:
def json_serial(obj):
    """JSON serializer for objects not serializable by default json code"""

    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError ("Type %s not serializable" % type(obj))

datatime

In [11]:
for i in range(7):
    (date.today()+timedelta(days=i)).strftime("%Y-%m-%d")

In [12]:
print((date.today()+timedelta(days=1)).strftime("%Y-%m-%d"))

2025-01-20


In [13]:
dict_cinema = conf.CINEMAS
url_root = conf.URL_ROOT
for i in range(1):
    for key in dict_cinema:
        day_date = (date.today()+timedelta(days=i)).strftime("%Y-%m-%d")
        url_path = url_root+dict_cinema[key]+'/d-'+day_date+'/'
        print(f"url path : {url_path}, cinema name : {key}")


url path : https://www.allocine.fr/_/showtimes/theater-C2954/d-2025-01-19/, cinema name : Mk2 bibliothèque
url path : https://www.allocine.fr/_/showtimes/theater-C0020/d-2025-01-19/, cinema name : Filmothèque du quartier Latin
url path : https://www.allocine.fr/_/showtimes/theater-C0026/d-2025-01-19/, cinema name : UGC ciné Bercy
url path : https://www.allocine.fr/_/showtimes/theater-B0045/d-2025-01-19/, cinema name : UGC Cyrano


In [14]:
def merge_seances(list1, list2):
    merged = {}
    
    # Process both lists
    for seance_list in [list1, list2]:
        for item in seance_list:
            for cinema_id, data in item.items():
                if cinema_id not in merged:
                    merged[cinema_id] = {
                        'cinemaName': data['cinemaName'],
                        'showtimes': []
                    }
                merged[cinema_id]['showtimes'].extend(data['showtimes'])
                # Sort showtimes chronologically
                merged[cinema_id]['showtimes'].sort()
    
    # Convert back to original format
    return [
        {k: v} for k, v in merged.items()
    ]

In [15]:
def merge_movie_data(existing_data: dict, new_movie: dict) -> dict:
    """Merge new movie data with existing data, properly organizing seances by cinema"""
    if not new_movie:
        return existing_data
        
    try:
        movie_id = list(new_movie.keys())[0]
        movie_data = new_movie[movie_id]
        
        # Safely get seances data
        seances_list = movie_data.get('seances', [])
        if not seances_list:
            return existing_data
            
        new_seance = seances_list[0]  # Get first seance entry
        if not new_seance:
            return existing_data
            
        if movie_id in existing_data:
            # Movie exists, update/append seances for specific cinema
            for seance in seances_list:
                if not seance:  # Skip if seance is None
                    continue
                    
                cinema_id = list(seance.keys())[0]
                
                if 'seances' not in existing_data[movie_id]:
                    existing_data[movie_id]['seances'] = {}
                    
                if cinema_id in existing_data[movie_id]['seances']:
                    # Append new showtimes to existing cinema
                    existing_showtimes = existing_data[movie_id]['seances'][cinema_id]['showtimes']
                    new_showtimes = seance[cinema_id]['showtimes']
                    # Create a set to remove duplicates
                    unique_showtimes = list(set(existing_showtimes + new_showtimes))
                    existing_data[movie_id]['seances'][cinema_id]['showtimes'] = sorted(unique_showtimes)
                else:
                    # Add new cinema entry
                    existing_data[movie_id]['seances'][cinema_id] = seance[cinema_id]
        else:
            # New movie, initialize with reorganized seances
            seances_dict = {}
            for seance in seances_list:
                if seance:  # Only process if seance is not None
                    cinema_id = list(seance.keys())[0]
                    seances_dict[cinema_id] = seance[cinema_id]
            
            movie_data['seances'] = seances_dict
            existing_data[movie_id] = movie_data
            
        return existing_data
        
    except (KeyError, IndexError, TypeError) as e:
        print(f"Error in merge_movie_data: {str(e)}")
        return existing_data

In [16]:
def load_existing_database(file_path: str) -> dict:
    """Load existing movie database if it exists, create new one if it doesn't"""
    try:
        with open(file_path, 'r', encoding='utf8') as fp:
            return json.load(fp)
    except (FileNotFoundError, json.JSONDecodeError):
        # Return empty dictionary if file doesn't exist or is invalid
        return {}

def process_cinemas(dict_cinema: dict, url_root: str, days_range: int = 2) -> dict:
    """Process multiple cinemas and dates, creating or updating the movie database"""
    database_path = './result.json'
    
    # Initialize empty database if none exists
    movie_database = {}
    
    # Process each day in range
    for i in range(days_range):
        day_date = (date.today() + timedelta(days=i)).strftime("%Y-%m-%d")
        
        # Process each cinema
        for cinema_name, cinema_id in dict_cinema.items():
            url_path = f"{url_root}{cinema_id}/d-{day_date}/"
            
            cinema_info = {
                'cinema_id': cinema_id,
                'cinema_name': cinema_name,
                'url': url_path
            }
            
            # Make API request
            headers = {'Accept': 'application/json'}
            try:
                response = requests.get(url_path, headers=headers)
                if response.status_code == 200:
                    data = response.json()
                    if "results" in data:
                        for element in data["results"]:
                            movie_info = parse_movie_data(element, cinema_info)
                            if movie_info:
                                movie_database = merge_movie_data(movie_database, movie_info)
                else:
                    print(f"Failed to fetch data for {cinema_name} on {day_date}: Status code {response.status_code}")
            except requests.RequestException as e:
                print(f"Error fetching data for {cinema_name}: {str(e)}")
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON for {cinema_name}: {str(e)}")
    
    # Save database only if we have data
    if movie_database:
        try:
            with open(database_path, 'w', encoding='utf8') as fp:
                json.dump(movie_database, fp, default=json_serial)
        except IOError as e:
            print(f"Error saving database: {str(e)}")
    
    return movie_database

if __name__ == "__main__":
    dict_cinema = conf.CINEMAS
    url_root = conf.URL_ROOT
    
    # Process all cinemas and create/update database
    updated_database = process_cinemas(dict_cinema, url_root)

In [17]:
updated_database

{'TW92aWU6MzIzOTI4': {'title': 'Babygirl',
  'synopsis': 'Romy, PDG d’une grande entreprise, a tout pour être heureuse : un mari aimant, deux filles épanouies et une carrière réussie. Mais un jour, elle rencontre un jeune stagiaire dans la société qu’elle dirige à New York. Elle entame avec lui une liaison torride, quitte à tout risquer pour réaliser ses fantasmes les plus enfouis…',
  'posterUrl': 'https://fr.web.img2.acsta.net/img/0f/3f/0f3f8ece7cfe8d3a8524359c6261475c.jpg',
  'runtime': '1h 54min',
  'genre': ['Erotique', 'Thriller'],
  'languages': ['ENGLISH'],
  'stats': {'userRating': {'score': 2.64, 'count': 495},
   'pressRating': {'score': 3.26, 'count': 35}},
  'certificate': None,
  'directors': [{'lastName': 'Reijn',
    'firstName': 'Halina',
    'pictureUrl': 'https://fr.web.img4.acsta.net/pictures/19/07/18/10/42/5823366.jpg',
    'position': 'DIRECTOR'}],
  'actors': [{'lastName': 'Kidman',
    'firstName': 'Nicole',
    'pictureUrl': 'https://fr.web.img4.acsta.net/pictu