In [1]:
import pandas as pd
import json
import requests
import numpy as np

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
class RestaurantDataProcessor:
    def __init__(self, url, country_code_file, output_file_path):
        """
        Initializes the RestaurantDataProcessor with URLs and file paths.
        
        param url: URL to fetch restaurant JSON data.
        param country_code_file: Path to the CSV file containing country codes.
        param output_file_path: Path where the processed CSV data will be saved.
        """
        self.url = url
        self.country_code_file = country_code_file
        self.output_file_path = output_file_path
        self.main_restaurant_df = None 

    def fetch_json_data(self):
        """
        Fetches JSON data from the specified URL.

        return: Parsed JSON data or None if an error occurs.
        """
        
        response = requests.get(self.url)
        if response.ok:
            return response.json()
        else: 
            return None

    def read_country_code_data(self):
        """
        Reads country code data from a CSV file.

        return: DataFrame containing country code data or None if file not found.
        """
        try:
            return pd.read_csv(self.country_code_file)
        except FileNotFoundError as e:
            print(f"Error reading country code file: {e}")
            return None

    def merge_data(self, parsed_data, country_code_df):
        """
        Merges restaurant JSON data with country code data from the CSV file.

        param parsed_data: JSON data containing restaurant information.
        param country_code_df: DataFrame containing country codes.
        return: Merged DataFrame with restaurant data and country codes.
        """
        self.main_restaurant_df = pd.json_normalize(parsed_data, "restaurants")
        merged_df = pd.merge(
            self.main_restaurant_df, country_code_df, 
            how='left', left_on='restaurant.location.country_id', 
            right_on='Country Code'
        )
        return merged_df

    def process_merged_data(self, merged_df):
        """
        Processes the merged DataFrame by extracting, renaming, and cleaning the data.

        param merged_df: Merged DataFrame with restaurant and country code data.
        return: Processed DataFrame ready for export.
        """

        # Extracts required fields
        extract_restaurant_df = merged_df[[
            'restaurant.R.res_id', 'restaurant.name', 'Country', 
            'restaurant.location.city', 'restaurant.user_rating.votes', 
            'restaurant.user_rating.aggregate_rating', 'restaurant.cuisines'
        ]]
        
        # Dictionary of columns to rename
        renamed_columns = {
            'restaurant.R.res_id': 'Restaurant Id',
            'restaurant.name': 'Restaurant Name',
            'restaurant.location.city': 'City',
            'restaurant.user_rating.votes': 'User Rating Votes',
            'restaurant.user_rating.aggregate_rating': 'User Aggregate Rating',
            'restaurant.cuisines': 'Cuisines'
        }
        
        renamed_restaurant_df = extract_restaurant_df.rename(columns=renamed_columns)
        renamed_restaurant_df['User Aggregate Rating'] = renamed_restaurant_df['User Aggregate Rating'].astype('float64')
        final_restaurant_df = renamed_restaurant_df.dropna()
        return final_restaurant_df

    def export_df_to_csv(self, df):
        """
        Exports the final DataFrame to a CSV file.

        param df: DataFrame to be exported.
        """
        try:
            df.to_csv(self.output_file_path, index=False)
            print(f"Restaurant Data exported successfully to {self.output_file_path}")
        except Exception as e:
            print(f"Error exporting data: {e}")

    def run(self):
        """
        Orchestrates the data fetching, processing, and exporting workflow.
        """

        # Fetches json data from url
        json_data = self.fetch_json_data()
        
        if json_data:
            country_code_df = self.read_country_code_data()
            if country_code_df is not None:
                merged_df = self.merge_data(json_data, country_code_df)
                final_df = self.process_merged_data(merged_df)
                self.export_df_to_csv(final_df)

In [3]:
class EventDataProcessor:
    def __init__(self, main_restaurant_df, specified_year, specified_month, output_file_path):
        """
        Initializes the RestaurantEventsProcessor with the main restaurant DataFrame.
        param main_restaurant_df: DataFrame containing main restaurant data.
        param specified_year: Year to filter the dataset by (in int)
        param specified_month: Month to filter the dataset by (in int)
        """
        self.main_restaurant_df = main_restaurant_df
        self.specified_year = specified_year
        self.specified_month = specified_month
        self.output_file_path = output_file_path

    def expand_and_normalize_events(self):
        """
        Expands the 'restaurant.zomato_events' column in the DataFrame, normalizing the nested structures.
        """

        # Unpack the lists in the events column vertically
        expanded_events_df = self.main_restaurant_df.explode("restaurant.zomato_events")
        
        # Unpack the dictionaries within each element of the column
        unpacked_events_df = pd.json_normalize(expanded_events_df["restaurant.zomato_events"])
        
        # Reset the index of expanded_events_df
        expanded_events_df.reset_index(drop=True, inplace=True)

        # Merge the unpacked DataFrame with the original DataFrame (if there are events to merge)
        events_df = pd.concat([expanded_events_df, unpacked_events_df], axis=1)
        
        return events_df

    def filter_events_by_date(self, events_df, year, month):
        """
        Filters events based on their start and end dates to include only those relevant to the specified month and year.

        param events_df: DataFrame with event data to filter.
        param year: Year to filter events by.
        param month: Month to filter events by.
        return: DataFrame with events filtered by the specified month and year.
        """
        # Ensure event date columns are in datetime format
        events_df['event.start_date'] = pd.to_datetime(events_df['event.start_date'])
        events_df['event.end_date'] = pd.to_datetime(events_df['event.end_date'])

        # Events that start on or before the end of the specified month and year
        start_date_condition = (events_df['event.start_date'].dt.year < year) | \
                            ((events_df['event.start_date'].dt.year == year) & (events_df['event.start_date'].dt.month <= month))
        # Events that end on or after the start of the specified month and year
        end_date_condition = (events_df['event.end_date'].dt.year > year) | \
                            ((events_df['event.end_date'].dt.year == year) & (events_df['event.end_date'].dt.month >= month))

        # Apply conditions
        filtered_events_df = events_df[start_date_condition & end_date_condition]
        
        return filtered_events_df

    def process_events_data(self):
        """
        Processes event data by expanding, normalizing, filtering by date, and preparing for export.
        """
        events_df = self.expand_and_normalize_events()
        filtered_events_df = self.filter_events_by_date(events_df, self.specified_year, self.specified_month)

        # Extract and rename required columns
        final_columns = [
            "event.event_id", "restaurant.id", "restaurant.name", 
            "restaurant.photos_url", "event.title", "event.start_date", "event.end_date"
        ]
        final_events_df = filtered_events_df[final_columns]

        columns_to_rename = {
            "event.event_id": "Event Id", 
            "restaurant.id": "Restaurant Id",
            "restaurant.name": "Restaurant Name", 
            "restaurant.photos_url": "Photo URL",
            "event.title": "Event Title", 
            "event.start_date": "Event Start Date",
            "event.end_date": "Event End Date"
        }
        
        final_events_df = final_events_df.rename(columns=columns_to_rename)
        return final_events_df

    def export_events_to_csv(self, df, file_path):
        """
        Exports the processed events DataFrame to a CSV file.

        param df: DataFrame with the events data to export.
        param file_path: Path to save the exported CSV file.
        """
        try:
            df.to_csv(file_path, index=False)
            print(f"Events data exported successfully to {file_path}")
        except Exception as e:
            print(f"Error exporting events data: {e}")

    def run(self):
        """
        Orchestrates the processing and exporting of restaurant event data.

        param output_file_path: Path where the processed events data CSV will be saved.
        """
        final_events_df = self.process_events_data()
        self.export_events_to_csv(final_events_df, self.output_file_path)

In [4]:
class RatingStatisticsProcessor:
    def __init__(self, main_restaurant_df, output_file_path):
        """
        Initializes the RatingStatisticsProcessor with the main restaurant DataFrame and the output path for the JSON file.
        
        param main_restaurant_df: DataFrame containing main restaurant data.
        param output_file_path: The path where the rating statistics JSON will be saved.
        """
        self.main_restaurant_df = main_restaurant_df
        self.output_file_path = output_file_path

    def filter_and_convert_ratings(self, specified_texts):
        """
        Filters the DataFrame for specified rating texts and converts the aggregate rating column to float.
        
        param specified_texts: List of rating texts to filter by.
        return: DataFrame filtered for specified rating texts with aggregate rating as float.
        """
        filtered_df = self.main_restaurant_df[
            self.main_restaurant_df['restaurant.user_rating.rating_text'].isin(specified_texts)
        ]
        filtered_df['restaurant.user_rating.aggregate_rating'] = filtered_df['restaurant.user_rating.aggregate_rating'].astype("float64")
        return filtered_df

    def analyze_rating_distribution(self, filtered_df):
        """
        Analyzes the distribution of aggregate ratings for each rating text.
        
        param filtered_df: DataFrame filtered by specified rating texts.
        return: DataFrame with the distribution statistics of ratings.
        """
        aggregates = ['min', 'max']
        rating_statistics = filtered_df.groupby('restaurant.user_rating.rating_text', as_index=False)['restaurant.user_rating.aggregate_rating'].agg(aggregates)
        
        # Rename user rating column
        rating_statistics = rating_statistics.rename(columns={"restaurant.user_rating.rating_text": "User Rating"})
        return rating_statistics

    def export_to_json(self, rating_statistics):
        """
        Exports the rating statistics DataFrame to a JSON file.
        
        param rating_statistics: DataFrame containing rating statistics.
        """
        try:
            rating_statistics.to_json(self.output_file_path, orient='records')
            print(f"Rating statistics exported successfully")
        except Exception as e:
            print(f"Error exporting rating statistics: {e}")

    def run(self, specified_texts):
        """
        Orchestrates the filtering, analysis, and exporting of restaurant rating data.
        
        param specified_texts: List of rating texts to filter by.
        """
        filtered_df = self.filter_and_convert_ratings(specified_texts)
        rating_statistics = self.analyze_rating_distribution(filtered_df)
        self.export_to_json(rating_statistics)

In [5]:
if __name__ == "__main__":
    restaurant_data_url = "https://raw.githubusercontent.com/Papagoat/brain-assessment/main/restaurant_data.json"
    country_code_file = "Country-Code.csv"
    restaurants_output_file_path = "restaurants.csv"

    restaurant_processor = RestaurantDataProcessor(restaurant_data_url, country_code_file, restaurants_output_file_path)
    restaurant_processor.run()


    # Specified year and month to filter the event dataset (in int)
    event_year = 2019
    event_month = 4
    
    events_output_file_path = "restaurant_events.csv"

    event_processor = EventDataProcessor(restaurant_processor.main_restaurant_df, event_year, event_month, events_output_file_path)
    event_processor.run()

    statistics_output_file_path = "restaurant_data.json"
    specified_texts = ['Excellent', 'Very Good', 'Good', 'Average', 'Poor']
    
    rating_processor = RatingStatisticsProcessor(restaurant_processor.main_restaurant_df, statistics_output_file_path)
    rating_processor.run(specified_texts)

Restaurant Data exported successfully to restaurants.csv
Events data exported successfully to restaurant_events.csv
Rating statistics exported successfully


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  filtered_df['restaurant.user_rating.aggregate_rating'] = filtered_df['restaurant.user_rating.aggregate_rating'].astype("float64")
