In [1]:
import os

In [2]:
os.chdir("../")

In [3]:
os.getcwd()

'd:\\Dropbox\\Self-Development\\Coding_Projects\\Taxi_trip_records'

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_name: str
    max_retries: int

In [5]:
from src.utils.exception import CustomException
from src.utils.logger import logger
from src.utils.utils import *
from src.constants import *

class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = load_yaml(config_filepath)
        self.params = load_yaml(params_filepath)

        create_directories([self.config['artifacts_root']])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config['data_ingestion']
        params = self.params["data_ingestion"]

        create_directories([config['root_dir']])

        data_ingestion_config = DataIngestionConfig(
            root_dir=config['root_dir'],
            source_URL=config['source_URL'],
            local_data_name=config['local_data_name'],
            max_retries=params['max_retries']
        )

        return data_ingestion_config

In [6]:
from src.utils.exception import CustomException
from src.utils.logger import logger
from src.utils.utils import *
from src.constants import *

from datetime import datetime
import requests
import time
import sys

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        
    def download_parquet_file(self, year: int, month: int, max_retries):
        """
        Downloads a Parquet file for a specific year and month.

        Parameters:
            - year (int): The year for which to download the Parquet file.
            - month (int): The month for which to download the Parquet file.
            - target_directory (str): The directory where the file should be saved.

        Raises:
            - CustomException: If an error occurs during the download.
        """
        parquet_url = self.config.source_URL.format(year, month)
        
        for i in range(max_retries):
            try:
                response = requests.get(parquet_url)
            except Exception as e:
                backoff_factor=0.3
                wait_time = backoff_factor * (2 ** i)
                time.sleep(wait_time)
                logger.error(f'Connection error with yellow_tripdata_{year}-{month:02d}.parquet: {str(e)}')

        if response.status_code == 200:
            file_name = self.config.local_data_name.format(year, month)
            file_path = os.path.join(self.config.root_dir, file_name)
            
            with open(file_path, 'wb') as file:
                file.write(response.content)

            logger.info('File {} successfully downloaded to {}'.format(file_name, self.config.root_dir))
        else:
            logger.info(f"Failed to download yellow_tripdata_{year}-{month:02d}.parquet. Status code: {response.status_code}")
            raise CustomException(f"Failed to download yellow_tripdata_{year}-{month:02d}.parquet. Status code: {response.status_code}", sys)
        
    def extract_date(self, filename):
        """
        Extracts the year and month information from the filename.

        Parameters:
            - filename (str): The name of the file.

        Returns:
            - tuple: A tuple containing the year and month extracted from the filename.
        """
        year_month = filename.split('_')[2].split('.')[0]
        year = int(year_month.split('-')[0])
        month = int(year_month.split('-')[1])
        return year, month
    
    def download_missing_parquet_files(self):
        """
        download_missing_parquet_files function to manage the download process for missing Parquet files.
        """

        # Get the list of existing files and their corresponding dates
        files = os.listdir(self.config.root_dir)
        existing_dates = set(map(self.extract_date, files))

        # Generate a list of all possible dates up to the current month
        start_year = 2009
        full_dates = [(year, month) for year in range(start_year, datetime.now().year + 1) for month in range(1, 13) if (year, month) < (datetime.now().year, datetime.now().month)]

        # Check for missing files and download them
        for date in full_dates:
            if date not in existing_dates:
                try:
                    self.download_parquet_file(date[0], date[1], self.config.max_retries)
                except Exception as e:
                    logger.error(str(e))
                    

In [8]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_missing_parquet_files()
except Exception as e:
    raise CustomException(e, sys)

[2024-01-27 11:19:15,484: INFO: utils: yaml file: config\config.yaml loaded successfully]
[2024-01-27 11:19:15,487: INFO: utils: yaml file: params.yaml loaded successfully]
[2024-01-27 11:19:15,488: INFO: utils: created directory at: artifacts]
[2024-01-27 11:19:15,489: INFO: utils: created directory at: artifacts/data_ingestion/]
[2024-01-27 11:19:15,822: ERROR: 2886654169: Connection error with yellow_tripdata_2023-11.parquet: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))]
[2024-01-27 11:19:16,450: ERROR: 2886654169: Connection error with yellow_tripdata_2023-11.parquet: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None))]
[2024-01-27 11:19:17,686: ERROR: 2886654169: Connection error with yellow_tripdata_2023-11.parquet: ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly cl