# SIPSA PROJECT
### Developed by [Sébastien Lozano-Forero](https://www.linkedin.com/in/sebastienlozanoforero/)


### Imports

In [2]:
from src.logging_setup import setup_logger
from src.DataCollector import DataCollector
from src.FileNameBuilder import 

import boto3
import pandas as pd
from io import BytesIO
from botocore.exceptions import ClientError
from pathlib import Path
import datetime
from tqdm import tqdm
import requests
from bs4 import BeautifulSoup
import re
from typing import List
import logging
from dotenv import load_dotenv
import os
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

## v1.0 full

Until early september, this version was the most complete dealing with data transformation for both formats. 

### DataCollector

In [3]:
# Set up logging
logger = logging.getLogger('DataCollectorLogger')
logger.setLevel(logging.INFO)

# Create a file handler for logging
file_handler = logging.FileHandler('data_collector.log', mode='w')
file_handler.setLevel(logging.INFO)

# Create a logging format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)

# Add the file handler to the logger
logger.addHandler(file_handler)

class DataCollector:
    def __init__(self, s3: boto3.resource) -> None:
        """
        Initialize the DataCollector with S3 resource and configuration parameters.
        """
        self.url_base = 'https://www.dane.gov.co'
        self.url = 'https://www.dane.gov.co/index.php/estadisticas-por-tema/agropecuario/sistema-de-informacion-de-precios-sipsa/mayoristas-boletin-semanal-1'
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"
        }
        self.s3 = s3
        self.files_tracker_name = 'files_tracker.csv'
        self.logfile_name = 'logfile'

    def all_years_links(self) -> List[str]:
        """
        Get the set of year links available on the DANE webpage.
        """
        response = requests.get(self.url, headers=self.headers)
        soup = BeautifulSoup(response.content, "html.parser")
        link_years = soup.find_all(lambda tag: tag.name == 'a' and re.match(r'^\d+$', tag.get_text().strip()))
        return link_years

    def links_per_year(self, link: str) -> List[str]:
        """
        Get all report links for a specific year.
        """
        r = requests.get(self.url_base + link['href'], headers=self.headers)
        logger.info(f"Working on {link['href'][-4:]} files")
        soup_year = BeautifulSoup(r.content, "html.parser")
        one_year_links = [item for item in soup_year.find_all(target='_blank') if 'Anexo' in item.text]
        return one_year_links

    def check_file_exists_in_s3(self, bucket_name: str, file_name: str) -> bool:
        """
        Check if a file already exists in the S3 bucket.
        """
        try:
            self.s3.Object(bucket_name, file_name).load()
            return True
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                return False
            else:
                raise

    def load_files_tracker(self, bucket_name: str) -> pd.DataFrame:
        """
        Load the files_tracker.csv from S3 or create a new DataFrame if it does not exist.
        """
        try:
            # Check if files_tracker.csv exists in the bucket
            obj = self.s3.Object(bucket_name, self.files_tracker_name)
            response = obj.get()
            tracker_df = pd.read_csv(BytesIO(response['Body'].read()))
            logger.info("Loaded existing files tracker from S3.")
        except self.s3.meta.client.exceptions.NoSuchKey:
            # If files_tracker.csv does not exist, initialize an empty DataFrame
            tracker_df = pd.DataFrame(columns=['file', 'link', 'date_added'])
            logger.info("No existing files tracker found in S3. Creating a new one.")
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                tracker_df = pd.DataFrame(columns=['file', 'link', 'date_added'])
                logger.info("No existing files tracker found in S3. Creating a new one.")
            else:
                raise
        return tracker_df

    def update_files_tracker(self, df: pd.DataFrame, bucket_name: str):
        """
        Update the files_tracker.csv in S3.
        """
        buffer = BytesIO()
        df.to_csv(buffer, index=False)
        buffer.seek(0)
        self.s3.Bucket(bucket_name).put_object(Body=buffer, Key=self.files_tracker_name)
        logger.info("Files tracker updated successfully in S3.")

    def upload_or_update_dataframe_to_s3(self, df: pd.DataFrame, bucket_name: str, file_name: str):
        """
        Upload or update a DataFrame as a CSV file to an S3 bucket.
        """
        # Convert DataFrame to CSV in memory
        buffer = BytesIO()
        df.to_csv(buffer, index=False)
        buffer.seek(0)
        self.s3.Bucket(bucket_name).upload_fileobj(buffer, file_name, ExtraArgs={'ContentType': 'text/csv'})
        logger.info(f"DataFrame {file_name} uploaded successfully to S3 bucket {bucket_name}.")

    def download_files_per_year(self, link, bucket_name: str = None):
        """
        Download all files for a specific year and optionally upload them directly to an S3 bucket.

        Args:
            link (str): The link for a specific year.
            bucket_name (str, optional): The name of the S3 bucket to upload the files to. Defaults to None.
        """
        links_per_year = self.links_per_year(link)
        n = len(links_per_year)

        tracker_df = self.load_files_tracker(bucket_name) if bucket_name else None

        new_files_count = 0

        with tqdm(total=n, desc=f"Processing {link['href'][-4:]} files", unit='file') as pbar:
            for i, file in enumerate(links_per_year):
                file_name = f'week_{n - i}_{file["href"].split("/")[-1]}'
                file_link = self.url_base + file['href']

                if bucket_name and file_name in tracker_df['file'].values:
                    pbar.update(1)
                    continue

                try:
                    with requests.get(file_link, headers=self.headers, stream=True) as result:
                        result.raise_for_status()

                        if bucket_name:
                            destination_key = f'reports/{link.text.strip()}/{file_name}'
                            self.s3.Bucket(bucket_name).upload_fileobj(result.raw, destination_key)

                            new_entry = pd.DataFrame({
                                'file': [file_name],
                                'link': [file_link],
                                'date_added': [datetime.datetime.today().strftime('%Y-%m-%d')]
                            })
                            tracker_df = pd.concat([tracker_df, new_entry], ignore_index=True)
                            new_files_count += 1

                except requests.RequestException as e:
                    logger.error(f"Failed to download file from {file['href']}: {e}")
                    continue
                except ClientError as e:
                    logger.error(f"Failed to upload file {file_name} to S3 bucket {bucket_name}: {e}")
                    continue
                finally:
                    pbar.update(1)

        if bucket_name:
            self.update_files_tracker(tracker_df, bucket_name)
            logger.info(f"Year {link['href'][-4:]} processed: {new_files_count} new files uploaded to S3 bucket {bucket_name}.")

    def get_files(self, bucket_name: str = None):
        """
        Download all files from all years and optionally upload them to an S3 bucket.
        """
        all_years_links = self.all_years_links()
        for link in all_years_links:
            self.download_files_per_year(link, bucket_name)

        # Upload log file to S3 after processing
        if bucket_name:
            try:
                self.s3.Bucket(bucket_name).upload_file('data_collector.log', self.logfile_name)
                logger.info(f"Log file {self.logfile_name} uploaded successfully to S3 bucket {bucket_name}.")
            except ClientError as e:
                logger.error(f"Failed to upload log file to S3 bucket {bucket_name}: {e}")

    def display_files_tracker(self, bucket_name: str) -> pd.DataFrame:
        """
        Display the DataFrame contained in the files_tracker.csv file from the S3 bucket.

        Args:
            bucket_name (str): The name of the S3 bucket where files_tracker.csv is stored.

        Returns:
            pd.DataFrame: The contents of files_tracker.csv as a pandas DataFrame.
        """
        try:
            # Check if files_tracker.csv exists in the bucket
            obj = self.s3.Object(bucket_name, self.files_tracker_name)
            response = obj.get()
            tracker_df = pd.read_csv(BytesIO(response['Body'].read()))
            logger.info("Loaded files_tracker.csv from S3 successfully.")
        except self.s3.meta.client.exceptions.NoSuchKey:
            logger.warning("files_tracker.csv does not exist in S3.")
            return pd.DataFrame()  # Return an empty DataFrame if the file does not exist
        except ClientError as e:
            logger.error(f"Failed to load files_tracker.csv from S3: {e}")
            return pd.DataFrame()  # Return an empty DataFrame if there was an error

        # Display the DataFrame
        return tracker_df


### FileNameBuilder

In [4]:
import logging
from pathlib import Path
import boto3

class FileNameBuilder(DataCollector): 
    def __init__(self, s3: boto3.resource): 
        """
        Initialize FileNameBuilder with S3 resource.

        Args:
            s3 (boto3.resource): The boto3 S3 resource to interact with S3.
        """
        super().__init__(s3)

    def first_format_paths(self, bucket_name: str) -> list:
        """
        Get the paths of files in the S3 bucket that match the first format criteria.

        Args:
            bucket_name (str): The name of the S3 bucket.

        Returns:
            list: A list of file paths that match the first format.
        """
        logger.info(f"Fetching first format paths from bucket: {bucket_name}")
        bucket = self.s3.Bucket(bucket_name)
        object_names = [obj.key for obj in bucket.objects.all()]
        
        first_format_years = set(['2012', '2013', '2014', '2015', '2016', '2017', '2018'])
        final_files_paths_first = []

        for path in object_names:
            try:
                year = str(Path(path).parts[1])
                week = int(Path(path).stem.split('_')[1])

                # Check for files in years prior to 2018 and specific weeks in 2018
                if year in first_format_years:
                    if year == '2018' and week > 19:
                        continue
                    final_files_paths_first.append(path)
                    logger.debug(f"File added to first format: {path}")

            except (IndexError, ValueError) as e:
                logger.warning(f"Error processing path {path}: {e}")
        
        logger.info(f"Found {len(final_files_paths_first)} files for the first format.")
        return final_files_paths_first

    def second_format_paths(self, bucket_name: str) -> list:
        """
        Get the paths of files in the S3 bucket that match the second format criteria.

        Args:
            bucket_name (str): The name of the S3 bucket.

        Returns:
            list: A list of file paths that match the second format.
        """
        logger.info(f"Fetching second format paths from bucket: {bucket_name}")
        bucket = self.s3.Bucket(bucket_name)
        object_names = [obj.key for obj in bucket.objects.all()]
        
        second_format_years = set(['2018', '2019', '2020', '2021', '2022', '2023', '2024'])
        final_files_paths_second = []

        for path in object_names:
            try:
                year = str(Path(path).parts[1])
                week = int(Path(path).stem.split('_')[1])

                # Check for files in years after 2018 and specific weeks in 2018
                if year in second_format_years:
                    if year == '2018' and week <= 19:
                        continue
                    final_files_paths_second.append(path)
                    logger.debug(f"File added to second format: {path}")

            except (IndexError, ValueError) as e:
                logger.warning(f"Error processing path {path}: {e}")
        
        logger.info(f"Found {len(final_files_paths_second)} files for the second format.")
        return final_files_paths_second


### DataWrangler

In [7]:
class DataWrangler(FileNameBuilder):
    def __init__(self, 
                 bucket_name:str, 
                 s3: boto3.resource):
        super().__init__(s3)
        self.bucket_name = bucket_name
        self.s3 = s3
        self.categories_dict = categories_dict
        self.city_to_region = city_to_region


    def first_format_data_extraction(self,
                                    file_path:str)-> pd.DataFrame:
        """
        Extracts and processes data from an Excel file stored in an S3 bucket, handling multiple file formats.

        This function attempts to retrieve an Excel file from a specified S3 bucket and process its data.
        It uses two different engines (`openpyxl` and `xlrd`) to read the file, as some files may require 
        different engines depending on their format. If both attempts fail, an error message is logged. 
        It also filters the data to only include rows where the first column contains alphabetic characters 
        (words) and is not empty.

        Args:
            s3 (boto3.resource): A Boto3 resource object used to access the S3 bucket.
            bucket_name (str): The name of the S3 bucket where the file is stored.
            file_path (str): The key (path) of the file within the S3 bucket.

        Returns:
            pd.DataFrame: A pandas DataFrame containing the data from the file. If an error occurs or the 
                          file cannot be read, an empty DataFrame is returned.

        Processing:
        - Retrieves the file from the specified S3 bucket.
        - Attempts to read the file using the `openpyxl` engine for `.xlsx` files first. If that fails, 
          it tries using the `xlrd` engine for `.xls` files.
        - Filters the DataFrame to include only rows where the first column contains alphabetic characters 
          and is not NaN.

        Example Usage:
            first_format_data_extraction(s3, bucket_name, file_path)

        Notes:
            - This function is designed to handle files stored in S3 and may require appropriate AWS 
              credentials to access the S3 bucket.
            - It assumes that the data file is an Excel format (`.xlsx` or `.xls`), and will print a warning 
              if the file cannot be processed.
            - The function does not currently handle other file formats (e.g., `.csv`, `.json`).
        """
        bucket = self.s3.Bucket(self.bucket_name)
        obj = bucket.Object(file_path)

        # Determine file format by extension
    #     file_extension = Path(file_path).suffix.lower()
    #     print(f"Processing file: {file_path}, with extension: {file_extension}")

        # Try to read the file from S3

        xls_data = obj.get()['Body'].read()

        try: 
            dataframe = pd.read_excel(BytesIO(xls_data), engine='openpyxl') 
        except: 
            try:
                dataframe = pd.read_excel(BytesIO(xls_data), engine='xlrd')
            except: 
                print(f'[INFO] {file_path} failed to download')


        if dataframe.shape[0]>0: 
            dataframe = dataframe[dataframe[dataframe.columns[0]].apply(lambda x: bool(re.search(r'[a-zA-Z]', str(x))) and pd.notna(x))]

    # Using a classifier for the extension proved to be a wrong idea as some of the files with one extension would 
    # work with command from another one. 
    #     # Handle .xlsx files with openpyxl engine
    #     if file_extension == '.xlsx':
    #         dataframe = pd.read_excel(BytesIO(xls_data), engine='openpyxl')

    #     # Handle .xls files with xlrd engine
    #     elif file_extension == '.xls':
    #         dataframe = pd.read_excel(BytesIO(xls_data), engine='xlrd')
    #     else:
    #         print(f"Unsupported file extension: {file_path}")
    #         dataframe = pd.DataFrame()  # Handle unsupported file extensions

        return dataframe

    def first_format_data_transformation(self,
                                         dataframe: pd.DataFrame, 
                                         file_path:str) -> pd.DataFrame:
        """
        Transforms the raw data extracted from a file into a structured format with relevant categories and products.

        This function processes the raw data extracted from an Excel file, organizing it into various food categories and
        associated products. The transformation includes:
        - Keeping the first five columns and renaming them.
        - Removing rows where the 'ciudad' column is null.
        - Identifying food categories based on sections of the file marked by the word 'cuadro'.
        - Within each food category, identifying product blocks based on blank values in the 'precio_minimo' column.
        - Adding relevant metadata such as the week number and year from the file path.

        Args:
            dataframe (pd.DataFrame): The raw data extracted from an Excel file.
            file_path (str): The file path or S3 key from which the file was downloaded. Used to extract metadata such as the 
                             year and week number.

        Returns:
            pd.DataFrame: A structured DataFrame with cleaned and organized data. The resulting DataFrame will have the 
                          following columns: 
                          - 'ciudad': The city where the prices were recorded.
                          - 'precio_minimo': The minimum price for the product.
                          - 'precio_maximo': The maximum price for the product.
                          - 'precio_medio': The average price for the product.
                          - 'tendencia': The trend for the product's price.
                          - 'categoria': The food category to which the product belongs (e.g., 'verduras_hortalizas').
                          - 'producto': The specific product being tracked.
                          - 'mercado': The marketplace where the product was sold (if available).
                          - 'semana_no': The week number, extracted from the file name.
                          - 'anho': The year, extracted from the file name.

        Processing Steps:
        - The function first cleans the data by retaining only relevant columns and removing rows with missing cities.
        - It identifies sections of the data marked by the word 'cuadro' and treats them as different food categories.
        - Each category is further broken down into products, where blank values in the 'precio_minimo' column indicate 
          the beginning of a new product block.
        - City and marketplace names are cleaned and standardized.
        - Week number and year information is added based on the file name.

        Example Usage:
            dataframe = pd.read_excel('my_file.xlsx')
            transformed_df = first_format_data_transformation(dataframe, 'data/week_12_2023.xlsx')

        Notes:
            - The function expects that the raw data follows a specific structure with 'cuadro' as a marker for categories and 
              blank 'precio_minimo' values as markers for products within categories.
            - It assumes that the file name contains metadata, specifically the week number and year, in the format 'week_<number>_<year>.xlsx'.
            - The function handles exceptions silently (e.g., missing 'cuadro' titles) and skips problematic parts of the data.
            - 'reports/2015/week_16_Anexo_13_17abr_2015.xls' fails 
        """
        # Keep only the first five columns and rename them
        dataframe = dataframe.iloc[:, 0:5]
        dataframe.columns = ['ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio', 'tendencia']
        dataframe['ciudad'] = dataframe['ciudad'].str.lower().str.replace('bogotá, d.c.', 'bogota')

        # Remove rows where 'ciudad' is null
        dataframe = dataframe[~dataframe['ciudad'].isnull()]

        # This formatting would have eight food categories within the same spreadsheet divided only by a big title.
        # Such title would include the word 'cuadro'. So, to separate categories, we look for blocks of data contained
        # within two consecutive appearances of such words.

        # Get row indexes where the word 'cuadro' is present
        index_cuadro = dataframe[dataframe['ciudad'].str.contains('cuadro')].index

        # Creating target dataframe for all data
        df_final = pd.DataFrame()

        # Iterating over food categories.
        for i_categoria in range(len(index_cuadro) + 1):
            # week 16 of 2015 does not have the 'cuadro' titles. 
            try:
                # Capturing first category
                if i_categoria == 0:
                    dataframe_categoria = dataframe[1:index_cuadro[i_categoria]]
                # capturing intermediate categories
                elif (i_categoria <= 6) and (i_categoria > 0):
                    dataframe_categoria = dataframe[index_cuadro[i_categoria - 1] + 2:index_cuadro[i_categoria]]
                # Capturing last category
                else:
                    dataframe_categoria = dataframe[index_cuadro[i_categoria - 1] + 2:]

                # Within each category block, add category name
                dataframe_categoria['categoria'] = self.categories_dict[i_categoria + 1]

                # within each category block, there are several products. In the whole reporting, products are very likely to
                # contain several rows (same food item in different locations). What identifies such product blocks is the
                # fact that the precio_minimo column will be blank. So the product data would be contain within two
                # consecutive occurrences of blank prices.
                index_producto = dataframe_categoria[dataframe_categoria['precio_minimo'].isnull()].index

                # creating target data frame for product category
                df_categoria_final = pd.DataFrame()

                # Iterating over products within food category
                for i_producto in range(len(index_producto)):

                    # Capturing the first product in the category
                    if i_producto == 0:
                        dataframe_producto = dataframe_categoria.loc[
                                            index_producto[i_producto] - 1:index_producto[i_producto + 1] - 1].reset_index(
                            drop=True)

                    # Capturing all intermediate products
                    elif i_producto < len(index_producto) - 1:
                        dataframe_producto = dataframe_categoria.loc[
                                        index_producto[i_producto]:index_producto[i_producto + 1] - 1].reset_index(
                                drop=True)

                    # Capturing last product within category
                    else:
                        dataframe_producto = dataframe_categoria.loc[index_producto[i_producto]:].reset_index(drop=True)

                    # Adding product name column to each block of products
                    dataframe_producto['producto'] = dataframe_producto['ciudad'][0]

                    # Keeping only city name under the ciudad column
                    dataframe_producto['ciudad'] = dataframe_producto['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)

                    # The name of the marketplaces is included on some of the city names. So we try to retrieve it
                    try:
                        dataframe_producto['mercado'] = dataframe_producto['ciudad'].str.split(',').str[1].str.strip()
                    except:
                        dataframe_producto['mercado'] = np.nan

                    # Getting a clean version of city name
                    try:
                        dataframe_producto['ciudad'] = dataframe_producto['ciudad'].str.split(',').str[0].str.strip()
                    except:
                        None

                    # Dropping first row
                    dataframe_producto = dataframe_producto.drop(0)

                    # Putting together all data for products within food category
                    df_categoria_final = pd.concat([df_categoria_final, dataframe_producto], ignore_index=True)

                # Putting together all data
                df_final = pd.concat([df_final, df_categoria_final], ignore_index=True)
            except: 
                None
            # Once data per file is complete, time stamps are added: year and week number
            df_final['semana_no'] = int(Path(file_path).name.split('_')[1])  # file_path.stem[5:7]
            df_final['anho'] = Path(file_path).stem[-4:]
        try: 
            df_final = df_final[['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio',
           'tendencia', 'categoria', 'mercado', 'semana_no', 'anho']]
        except: 
            print(f'[INFO] {file_path} has no all columns')

        return df_final



    def second_format_data_extraction(self, 
                                      file_path:str) -> pd.DataFrame:
        """
        Extracts and processes data from an Excel file stored in an S3 bucket using multiple sheets for the second format.

        This function retrieves an Excel file from the specified S3 bucket and processes its data, which is spread across
        multiple sheets (typically 8). It handles two possible formats (`openpyxl` for `.xlsx` and `xlrd` for `.xls`), attempting 
        both methods if necessary. The function extracts the relevant data from each sheet, processes it, and returns a combined 
        DataFrame.

        Args:
            s3 (boto3.resource): A Boto3 resource object used to access the S3 bucket.
            bucket_name (str): The name of the S3 bucket where the file is stored.
            file_path (str): The key (path) of the file within the S3 bucket.

        Returns:
            pd.DataFrame: A combined pandas DataFrame with data extracted from all relevant sheets in the Excel file. 
                          The resulting DataFrame will have the following columns:
                          - 'producto': The product being tracked.
                          - 'ciudad': The city where the prices were recorded.
                          - 'precio_minimo': The minimum price for the product.
                          - 'precio_maximo': The maximum price for the product.
                          - 'precio_medio': The average price for the product.
                          - 'tendencia': The trend for the product's price.
                          - 'categoria': The food category of the product.
                          - 'mercado': The marketplace where the product was sold (if available).
                          - 'semana_no': The week number, extracted from the file name.
                          - 'anho': The year, extracted from the file name.

        Processing Steps:
        - The function retrieves the file from S3 using the provided `file_path` and reads the Excel data using both the `openpyxl`
          and `xlrd` engines as necessary.
        - It extracts the names of all the sheets in the file.
        - For each sheet, it processes the data by:
          - Renaming columns to a standardized format.
          - Removing rows with missing city names.
          - Cleaning city and marketplace names.
          - Adding metadata like the product category, week number, and year from the file name.
        - The processed data from all sheets is concatenated into a single DataFrame and returned.

        Example Usage:
            s3 = boto3.resource('s3')
            bucket_name = 'my-bucket'
            file_path = 'data/week_12_2023.xlsx'
            dataframe = second_format_data_extraction(s3, bucket_name, file_path)

        Notes:
            - The function handles both `.xlsx` and `.xls` formats. It will try to read the file with `openpyxl` first, 
              then fall back to `xlrd` if needed.
            - The function assumes that the file has a specific structure, including multiple sheets, and that the 
              sheet names can be used to extract data relevant to food categories.
            - Errors encountered while processing specific files or sheets are logged but not raised, allowing the function 
              to continue processing other files.
        """
        bucket = self.s3.Bucket(self.bucket_name)
        obj = bucket.Object(file_path)
        #  Read the Excel file from S3
        xls_data = obj.get()['Body'].read()

        # Create an ExcelFile object with specified encoding
        try: 
            xl = pd.ExcelFile(BytesIO(xls_data), engine='openpyxl') 
        except: 
            try:
                xl = pd.ExcelFile(BytesIO(xls_data), engine='xlrd')
            except: 
                print(f'[INFO] {file_path} failed to download')
        # Get the list of sheet names
        ref_dic = {i: xl.sheet_names[i] for i in range(len(xl.sheet_names))}


        full_dataframe = pd.DataFrame()
        for index in range(1,9):

            try: 
                dataframe = pd.read_excel(BytesIO(xls_data), engine='openpyxl', sheet_name = ref_dic[index]) 
            except: 
                try:
                    dataframe = pd.read_excel(BytesIO(xls_data), engine='xlrd', sheet_name = ref_dic[index])
                except: 
                    print(f'[INFO] {file_path} failed to download')

            if file_path == 'reports/2018/week_20_Sem_12may__18may_2018.xlsx':
                dataframe['mercado'] = dataframe['Mercado mayorista'].str.split(',').str[1].str.strip()
                dataframe['ciudad'] = dataframe['Mercado mayorista'].str.split(',').str[0].str.strip()
                dataframe.columns = dataframe.columns.str.lower().str.replace(' ','_').str.replace('í','i').str.replace('á','a')
            else:

                if pd.isnull(dataframe.iloc[9, 0]):
                    dataframe = dataframe.iloc[10:, :6]
                else:
                    dataframe = dataframe.iloc[9:, :6]
                dataframe.columns = ['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio', 'tendencia']
            dataframe = dataframe[~dataframe['ciudad'].isnull()]
            dataframe['ciudad'] = dataframe['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)
            dataframe['ciudad'] = dataframe['ciudad'].str.lower().str.replace('bogotá, d.c.', 'bogota')
            dataframe['ciudad'] = dataframe['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)

            # Adding categoria and ciudad info
            dataframe['categoria'] = categories_dict[index]


            # The name of the marketplaces is included on some of the city names. So we try to retrieve it
            try:
                dataframe['mercado'] = dataframe['ciudad'].str.split(',').str[1].str.strip()
            except:
                dataframe['mercado'] = np.nan

            # Getting a clean version of city name
            try:
                dataframe['ciudad'] = dataframe['ciudad'].str.split(',').str[0].str.strip()
            except:
                pass

            # Once data per file is complete, time stamps are added: year and week number
            dataframe['semana_no'] = int(Path(file_path).name.split('_')[1])  # file_path.stem[5:7]
            dataframe['anho'] = Path(file_path).stem[-4:]
            full_dataframe = full_dataframe.append(dataframe)
        full_dataframe = full_dataframe.reset_index(drop = True)

        try: 
            full_dataframe = full_dataframe[['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio',
           'tendencia', 'categoria', 'mercado', 'semana_no', 'anho']]
        except: 
            print(f'[INFO] {file_path} has not all columns')
        return full_dataframe

    def building_complete_report(self):
        """
    Constructs a complete report by extracting and transforming data from two different file formats stored in an S3 bucket.

    This method processes two batches of files from an S3 bucket:
    
    - The first batch is associated with a specific format used for files prior to and including the 19th week of 2018.
    - The second batch is associated with files from after the 19th week of 2018.
    
    For each batch:
    
    1. The method retrieves the file paths for both formats from S3.
    2. For the first format, it extracts the data from each file and applies the required transformations before appending it to a final DataFrame.
    3. For the second format, it extracts the data from each file and appends it to a final DataFrame.
    4. The method finally concatenates both DataFrames (from the two formats) into a single complete report.

    Returns:
        pd.DataFrame: A concatenated DataFrame containing all the data from both file formats, fully extracted and transformed.

    Workflow:
    - Calls `FileNameBuilder.first_format_paths()` to get file paths for the first format.
    - Calls `FileNameBuilder.second_format_paths()` to get file paths for the second format.
    - For each file in the first batch:
        - Uses `first_format_data_extraction()` to extract data from the file.
        - Uses `first_format_data_transformation()` to transform the extracted data.
    - For each file in the second batch:
        - Uses `second_format_data_extraction()` to extract data from the file.
    - Finally, concatenates the processed data from both batches into one DataFrame.

    Args:
        None

    Returns:
        pd.DataFrame: A DataFrame containing all data from both formats processed and concatenated together.

    Example:
        report = self.building_complete_report()
    
    """
        file_names = FileNameBuilder(s3 = self.s3)
        first_format_paths_aws = file_names.first_format_paths(bucket_name = self.bucket_name)
        second_format_paths_aws = file_names.second_format_paths(bucket_name = self.bucket_name)
        first_format_final = pd.DataFrame()
        print('[INFO] First batch of files')
        # print(path_to_file_in_aws)
        for file_path in tqdm(first_format_paths_aws):
            dataframe = self.first_format_data_extraction(file_path = file_path)

            first_format_final = first_format_final.append(
                self.first_format_data_transformation(dataframe = dataframe, 
                                                      file_path = file_path)
            )
        print('[INFO] second batch of files')
        second_format_final = pd.DataFrame()
        for file_path in tqdm(second_format_paths_aws):
            second_format_final = second_format_final.append(self.second_format_data_extraction(file_path = file_path))


        return pd.concat([first_format_final,second_format_final])

## v2.0 development 

### Logging 

In [9]:
# %%writefile src/logging_setup.py

# import logging
# import sys
# from pathlib import Path
# from datetime import datetime

# # Configure logging
# def setup_logger():
#     """
#     Sets up and configures a logger for the application.

#     This function creates a logger that outputs log messages both to the console (stdout) and a log file. 
#     It ensures the log directory and file are created if they do not already exist.

#     The log messages include the timestamp, log level, and message. The log level is set to `INFO` for 
#     both console and file handlers, meaning that messages of level `INFO` and higher will be recorded.

#     Returns:
#         logger (logging.Logger): Configured logger object for the application.

#     Logging Details:
#         - Log Directory: A directory named 'logs' is created in the current working directory if it does not exist.
#         - Log File: Logs are written to 'sipsa_process.log' in the 'logs' directory.
#         - Log Format: '%(asctime)s - %(levelname)s - %(message)s'
#         - Log Levels: Both console and file handlers are set to `INFO`.
#         - Handlers:
#             - Console (`stdout`) Handler: Outputs logs to the console.
#             - File Handler: Outputs logs to 'sipsa_process.log' with overwrite mode (`mode='w'`).
    
#     Example Usage:
#         >>> logger = setup_logger()
#         >>> logger.info("Logger is successfully configured.")

#     Last update:
#         Sept 28, 2024. 
#     """
    
#     # Create logfile path
#     log_path = Path.cwd()/'logs'
#     if not log_path.exists():
#         print(f'{log_path} created successfully')
#         log_path.mkdir(parents = True, 
#                    exist_ok = True)
        
#     logger = logging.getLogger(__name__)
#     logger.setLevel(logging.INFO)

#     # Create handlers
#     today_date = datetime.today().strftime(format = '%m_%d_%Y')
#     c_handler = logging.StreamHandler(sys.stdout)
#     f_handler = logging.FileHandler(log_path/f'sipsa_process_{today_date}.log', mode='w')

#     # Set levels
#     c_handler.setLevel(logging.INFO)
#     f_handler.setLevel(logging.INFO)

#     # Create formatters and add them to handlers
#     formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
#     c_handler.setFormatter(formatter)
#     f_handler.setFormatter(formatter)

#     # Add handlers to the logger
#     logger.addHandler(c_handler)
#     logger.addHandler(f_handler)

#     return logger


Overwriting src/logging_setup.py


### DataCollector

In [7]:
# %%writefile src/DataCollector.py

# import boto3
# import requests
# import re
# from bs4 import BeautifulSoup
# import pandas as pd
# import datetime
# from io import BytesIO
# from typing import List
# from botocore.exceptions import ClientError
# from pathlib import Path
# from tqdm import tqdm
# import logging

# class DataCollector:    
#     """
#     A class used to collect, manage, and store data files from the DANE website into an S3 bucket.

#     This class interacts with the DANE website to fetch and download data files. It checks the existence
#     of files in an S3 bucket and updates a file tracker to ensure no duplicate downloads occur. It also provides
#     methods to upload or update files in the S3 bucket, as well as methods to display the tracking information.
    

#     Attributes:
#         s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#         url_base (str): The base URL of the DANE website.
#         url (str): The URL of the DANE webpage for the data.
#         headers (dict): HTTP headers used for web requests.
#         files_tracker_name (str): The name of the file tracker CSV in S3.
#         logfile_name (str): The name of the log file.
#         logger (logging.Logger): Logger instance for logging messages.

#     Methods:
#         __init__(s3: boto3.resource, logger: logging.Logger) -> None:
#             Initializes the DataCollector with S3 resource and logger.

#         all_years_links() -> List[BeautifulSoup]:
#             Fetches the set of year links available on the DANE webpage.

#         links_per_year(link: BeautifulSoup) -> List[BeautifulSoup]:
#             Retrieves all report links for a specific year.

#         check_file_exists_in_s3(bucket_name: str, file_name: str) -> bool:
#             Checks if a specific file already exists in the S3 bucket.

#         load_files_tracker(bucket_name: str) -> pd.DataFrame:
#             Loads the files_tracker.csv from S3 or creates a new DataFrame if it does not exist.

#         update_files_tracker(df: pd.DataFrame, bucket_name: str):
#             Updates the files_tracker.csv in S3.

#         upload_or_update_dataframe_to_s3(df: pd.DataFrame, bucket_name: str, file_name: str):
#             Uploads or updates a DataFrame as a CSV file to an S3 bucket.

#         download_files_per_year(link: BeautifulSoup, bucket_name: str = None):
#             Downloads all files for a specific year and optionally uploads them to an S3 bucket.

#         get_files(bucket_name: str = None):
#             Downloads all files from all years and optionally uploads them to an S3 bucket.

#         display_files_tracker(bucket_name: str) -> pd.DataFrame:
#             Displays the DataFrame contained in the files_tracker.csv file from the S3 bucket.
#             """
    
#     def __init__(self, s3: boto3.resource, logger: logging.Logger) -> None:
#         """
#         Initializes the DataCollector class with the provided S3 resource and logger.

#         Args:
#             s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#             logger (logging.Logger): Logger instance for logging messages.
#             """
#         self.url_base = 'https://www.dane.gov.co'
#         self.url = 'https://www.dane.gov.co/index.php/estadisticas-por-tema/agropecuario/sistema-de-informacion-de-precios-sipsa/mayoristas-boletin-semanal-1'
#         self.headers = {
#             "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
#         }
#         self.s3 = s3
#         self.files_tracker_name = 'files_tracker.csv'
#         self.logfile_name = 'logfile'
#         self.logger = logger

#     def all_years_links(self) -> List[BeautifulSoup]:
#         """
#         Retrieves the set of year links available on the DANE webpage.

#         Returns:
#             List[BeautifulSoup]: A list of BeautifulSoup objects representing the links for each year.
#         """
#         try:
#             response = requests.get(self.url, headers=self.headers)
#             response.raise_for_status()
#         except requests.RequestException as e:
#             self.logger.error(f"Failed to fetch URL {self.url}: {e}")
#             return []

#         soup = BeautifulSoup(response.content, "html.parser")
#         link_years = soup.find_all(lambda tag: tag.name == 'a' and re.match(r'^\d+$', tag.get_text().strip()))
#         return link_years

#     def links_per_year(self, link: BeautifulSoup) -> List[BeautifulSoup]:
#         """
#         Retrieves all report links for a specific year.

#         Args:
#             link (BeautifulSoup): The BeautifulSoup object containing the link to the year page.

#         Returns:
#             List[BeautifulSoup]: A list of BeautifulSoup objects representing the report links for the specified year.
#         """
#         try:
#             r = requests.get(self.url_base + link['href'], headers=self.headers)
#             r.raise_for_status()
#         except requests.RequestException as e:
#             self.logger.error(f"Failed to fetch URL {self.url_base + link['href']}: {e}")
#             return []

#         self.logger.info(f"Working on {link['href'][-4:]} files")
#         soup_year = BeautifulSoup(r.content, "html.parser")
#         one_year_links = [item for item in soup_year.find_all(target='_blank') if 'Anexo' in item.text]
#         return one_year_links

#     def check_file_exists_in_s3(self, bucket_name: str, file_name: str) -> bool:
#         """
#         Checks if a specific file already exists in the S3 bucket.

#         Args:
#             bucket_name (str): The name of the S3 bucket.
#             file_name (str): The name of the file to check.

#         Returns:
#             bool: True if the file exists in the S3 bucket, False otherwise.
#         """
#         try:
#             self.s3.Object(bucket_name, file_name).load()
#             return True
#         except ClientError as e:
#             if e.response['Error']['Code'] == '404':
#                 return False
#             else:
#                 self.logger.error(f"ClientError when checking {file_name} in bucket {bucket_name}: {e}")
#                 raise

#     def load_files_tracker(self, bucket_name: str) -> pd.DataFrame:
#         """
#         Loads the files_tracker.csv from S3 or creates a new DataFrame if it does not exist.

#         Args:
#             bucket_name (str): The name of the S3 bucket containing the files_tracker.csv.

#         Returns:
#             pd.DataFrame: A DataFrame containing the file tracking information.
#         """
#         try:
#             obj = self.s3.Object(bucket_name, self.files_tracker_name)
#             response = obj.get()
#             tracker_df = pd.read_csv(BytesIO(response['Body'].read()))
#             self.logger.info("Loaded existing files tracker from S3.")
#         except ClientError as e:
#             if e.response['Error']['Code'] == 'NoSuchKey':
#                 tracker_df = pd.DataFrame(columns=['file', 'link', 'date_added'])
#                 self.logger.info("No existing files tracker found in S3. Creating a new one.")
#             else:
#                 self.logger.error(f"ClientError when accessing {self.files_tracker_name}: {e}")
#                 raise
#         return tracker_df

#     def update_files_tracker(self, df: pd.DataFrame, bucket_name: str):
#         """
#         Updates the files_tracker.csv in S3 with new file information.

#         Args:
#             df (pd.DataFrame): The DataFrame containing file tracking information to update.
#             bucket_name (str): The name of the S3 bucket where the files_tracker.csv is stored.
#         """
#         buffer = BytesIO()
#         df.to_csv(buffer, index=False)
#         buffer.seek(0)
#         try:
#             self.s3.Bucket(bucket_name).put_object(Body=buffer, Key=self.files_tracker_name)
# #             self.logger.info("Files tracker updated successfully in S3.")
#         except ClientError as e:
#             self.logger.error(f"Failed to update files tracker in S3 bucket {bucket_name}: {e}")
#             raise

#     def upload_or_update_dataframe_to_s3(self, df: pd.DataFrame, bucket_name: str, file_name: str):
#         """
#         Uploads or updates a DataFrame as a CSV file to an S3 bucket.

#         Args:
#             df (pd.DataFrame): The DataFrame to be uploaded.
#             bucket_name (str): The name of the S3 bucket to upload to.
#             file_name (str): The name of the file to be uploaded.
#         """
#         buffer = BytesIO()
#         df.to_csv(buffer, index=False)
#         buffer.seek(0)
#         try:
#             self.s3.Bucket(bucket_name).upload_fileobj(buffer, file_name, ExtraArgs={'ContentType': 'text/csv'})
#             self.logger.info(f"DataFrame {file_name} uploaded successfully to S3 bucket {bucket_name}.")
#         except ClientError as e:
#             self.logger.error(f"Failed to upload DataFrame {file_name} to S3 bucket {bucket_name}: {e}")
#             raise

#     def download_files_per_year(self, link: BeautifulSoup, bucket_name: str = None):
#         """
#         Downloads all files for a specific year and optionally uploads them to an S3 bucket.

#         Args:
#             link (BeautifulSoup): The BeautifulSoup object representing the year link.
#             bucket_name (str, optional): The name of the S3 bucket to upload the files to. Defaults to None.
#         """
#         links_per_year = self.links_per_year(link)
#         n = len(links_per_year)

#         tracker_df = self.load_files_tracker(bucket_name) if bucket_name else pd.DataFrame(columns=['file', 'link', 'date_added'])

#         new_files_count = 0

#         with tqdm(total=n, desc=f"Processing {link['href'][-4:]} files", unit='file') as pbar:
#             for i, file in enumerate(links_per_year):
#                 file_name = f'week_{n - i}_{file["href"].split("/")[-1]}'
#                 file_link = self.url_base + file['href']

#                 if bucket_name and file_name in tracker_df['file'].values:
#                     pbar.update(1)
#                     continue

#                 try:
#                     with requests.get(file_link, headers=self.headers, stream=True) as result:
#                         result.raise_for_status()

#                         if bucket_name:
#                             destination_key = f'reports/{link.text.strip()}/{file_name}'
#                             self.s3.Bucket(bucket_name).upload_fileobj(result.raw, destination_key)

#                             new_entry = pd.DataFrame({
#                                 'file': [file_name],
#                                 'link': [file_link],
#                                 'date_added': [datetime.datetime.today().strftime('%Y-%m-%d')]
#                             })
#                             tracker_df = pd.concat([tracker_df, new_entry], ignore_index=True)
#                             new_files_count += 1

#                 except requests.RequestException as e:
#                     self.logger.error(f"Failed to download file from {file_link}: {e}")
#                     continue
#                 except ClientError as e:
#                     self.logger.error(f"Failed to upload file {file_name} to S3 bucket {bucket_name}: {e}")
#                     continue
#                 finally:
#                     pbar.update(1)

#         if bucket_name:
#             self.update_files_tracker(tracker_df, bucket_name)
#             self.logger.info(f"Year {link['href'][-4:]} processed: {new_files_count} new files uploaded to S3 bucket {bucket_name}.")

#     def get_files(self, bucket_name: str = None):
#         """
#         Download all files from all years and optionally upload them to an S3 bucket.
#         """
#         all_years_links = self.all_years_links()
#         for link in all_years_links:
#             self.download_files_per_year(link, bucket_name)

#         # Upload log file to S3 after processing
#         if bucket_name:
#             try:
#                 self.logger.info(f"Log file {self.logfile_name} uploaded successfully to S3 bucket {bucket_name}.")
#             except ClientError as e:
#                 self.logger.error(f"Failed to upload log file to S3 bucket {bucket_name}: {e}")

#     def display_files_tracker(self, bucket_name: str) -> pd.DataFrame:
#         """
#         Display the DataFrame contained in the files_tracker.csv file from the S3 bucket.
#         """
#         tracker_df = self.load_files_tracker(bucket_name)
#         return tracker_df


Overwriting src/DataCollector.py


### FileNameBuilder

In [10]:
# %%writefile src/FileNameBuilder.py

# from typing import List
# import boto3
# import logging
# from pathlib import Path

# class FileNameBuilder:
#     """
#     A class used to build file paths for data files stored in an S3 bucket based on specific format criteria.

#     This class interacts with an S3 bucket to identify and categorize files according to predefined format
#     specifications. It uses criteria such as year and file type to segregate files into 'first format' and 
#     'second format' categories. This comes from DANE changing format of the report throughout the years. 
    
#     DANE first format refers to a single tab file with information displayed with several titles through it
#     DANE second format refers to a file containing several tabs (per food class)

#     Attributes:
#         s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#         logger (logging.Logger): Logger instance for logging messages.

#     Methods:
#         __init__(s3: boto3.resource, logger: logging.Logger):
#             Initializes the FileNameBuilder class with the provided S3 resource and logger.

#         first_format_paths(bucket_name: str) -> List[str]:
#             Retrieves file paths from the S3 bucket that match the first format criteria.

#         second_format_paths(bucket_name: str) -> List[str]:
#             Retrieves file paths from the S3 bucket that match the second format criteria.
#     """
#     def __init__(self, 
#                  s3: boto3.resource, 
#                  logger:logging.Logger):
#         """
#         Initializes the FileNameBuilder class with the provided S3 resource and logger.

#         Args:
#             s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#             logger (logging.Logger): Logger instance for logging messages.
#         """
#         self.s3 = s3
#         self.logger = logger

#     def first_format_paths(self, bucket_name: str) -> List[str]:
#         """
#         Retrieves file paths from the S3 bucket that match the first format criteria.

#         The first format includes files from the years 2012 to 2017, and files from the year 2018 up to 
#         and including week 19. It filters files based on their extensions (.xls, .xlsx).

#         Args:
#             bucket_name (str): The name of the S3 bucket containing the files.

#         Returns:
#             List[str]: A list of file paths matching the first format criteria.
#         """
#         self.logger.info(f"Fetching first format paths from bucket: {bucket_name}")
#         bucket = self.s3.Bucket(bucket_name)
#         object_names = [obj.key for obj in bucket.objects.all() if obj.key.endswith(('.xls', '.xlsx'))]

#         first_format_years = {'2012', '2013', '2014', '2015', '2016', '2017'}
#         final_files_paths_first = []

#         for path in object_names:
#             try:
#                 parts = Path(path).parts
#                 if len(parts) < 2:
#                     continue
#                 year = parts[1]
#                 week = int(Path(path).stem.split('_')[1])

#                 # Check for files in years prior to 2018
#                 if year in first_format_years:
#                     final_files_paths_first.append(path)
#                     self.logger.debug(f"File added to first format: {path}")
#                 elif year == '2018' and week <= 19:
#                     final_files_paths_first.append(path)
#                     self.logger.debug(f"File added to first format: {path}")

#             except (IndexError, ValueError) as e:
#                 self.logger.warning(f"Error processing path {path}: {e}")

#         self.logger.info(f"Found {len(final_files_paths_first)} files for the first format.")
#         return final_files_paths_first

#     def second_format_paths(self, bucket_name: str) -> List[str]:
#          """
#         Retrieves file paths from the S3 bucket that match the second format criteria.

#         The second format includes files from the years 2018 (after week 19) to 2024. It filters files based 
#         on their extensions (.xls, .xlsx).

#         Args:
#             bucket_name (str): The name of the S3 bucket containing the files.

#         Returns:
#             List[str]: A list of file paths matching the second format criteria.
#         """
#         self.logger.info(f"Fetching second format paths from bucket: {bucket_name}")
#         bucket = self.s3.Bucket(bucket_name)
#         object_names = [obj.key for obj in bucket.objects.all() if obj.key.endswith(('.xls', '.xlsx'))]

#         second_format_years = {'2018', '2019', '2020', '2021', '2022', '2023', '2024'}
#         final_files_paths_second = []

#         for path in object_names:
#             try:
#                 parts = Path(path).parts
#                 if len(parts) < 2:
#                     continue
#                 year = parts[1]
#                 week = int(Path(path).stem.split('_')[1])

#                 if year in second_format_years:
#                     if year == '2018' and week <= 19:
#                         continue
#                     final_files_paths_second.append(path)
#                     self.logger.debug(f"File added to second format: {path}")

#             except (IndexError, ValueError) as e:
#                 self.logger.warning(f"Error processing path {path}: {e}")

#         self.logger.info(f"Found {len(final_files_paths_second)} files for the second format.")
#         return final_files_paths_second


### DataWrangler

In [10]:
# %%writefile src/DataWrangler.py


# import pandas as pd
# from src.FileNameBuilder import FileNameBuilder
# from config import CATEGORIES_DICT, CITY_TO_REGION
# import boto3
# import logging
# from pathlib import Path
# from io import BytesIO
# from tqdm import tqdm
# import re


# class DataWrangler(FileNameBuilder):
#     """
#     A class to handle data extraction and transformation from Excel files stored in an S3 bucket.

#     This class extends the FileNameBuilder class to include data wrangling functionalities such as 
#     extracting and transforming data from different formats of Excel files, constructing complete reports,
#     and categorizing data based on predefined schemas.

#     Attributes:
#         bucket_name (str): The name of the S3 bucket to interact with.
#         s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#         logger (logging.Logger): Logger instance for logging messages.
#         categories_dict (dict): A dictionary mapping category indices to category names.
#         city_to_region (dict): A dictionary mapping city names to their respective regions.

#     Methods:
#         __init__(bucket_name: str, s3: boto3.resource, logger: logging.Logger):
#             Initializes the DataWrangler class with S3 resource, bucket name, and logger.

#         first_format_data_extraction(file_path: str) -> pd.DataFrame:
#             Extracts and processes data from an Excel file stored in an S3 bucket using the first format.

#         first_format_data_transformation(dataframe: pd.DataFrame, file_path: str) -> pd.DataFrame:
#             Transforms the raw data extracted from a file into a structured format with relevant categories and products.

#         second_format_data_extraction(file_path: str) -> pd.DataFrame:
#             Extracts and processes data from an Excel file stored in an S3 bucket using multiple sheets for the second format.

#         building_complete_report() -> pd.DataFrame:
#             Constructs a complete report by extracting and transforming data from two different file formats stored in an S3 bucket.
#     """
#     def __init__(self, 
#                  bucket_name: str, 
#                  s3: boto3.resource, 
#                  logger:logging.Logger):
#         """
#         Initializes the DataWrangler class with S3 resource, bucket name, and logger.

#         Args:
#             bucket_name (str): The name of the S3 bucket to interact with.
#             s3 (boto3.resource): An S3 resource object to interact with AWS S3.
#             logger (logging.Logger): Logger instance for logging messages.
#         """
#         FileNameBuilder.__init__(self, s3, logger)
#         self.bucket_name = bucket_name
#         self.s3 = s3
#         self.logger = logger
#         self.categories_dict = CATEGORIES_DICT
#         self.city_to_region = CITY_TO_REGION
        
#     def first_format_data_extraction(self, file_path: str) -> pd.DataFrame:
#         """
#         Extracts and processes data from an Excel file stored in an S3 bucket using the first format.

#         This method reads Excel files stored in an S3 bucket, handling different file formats and engines.
#         It returns a DataFrame with the extracted data or an empty DataFrame if extraction fails.

#         Args:
#             file_path (str): The path of the file in the S3 bucket.

#         Returns:
#             pd.DataFrame: A DataFrame containing the extracted data, or an empty DataFrame if extraction fails.
#         """
#         bucket = self.s3.Bucket(self.bucket_name)
#         obj = bucket.Object(file_path)
#         xls_data = obj.get()['Body'].read()

#         dataframe = None
#         try:
#             dataframe = pd.read_excel(BytesIO(xls_data), engine='openpyxl')
#         except Exception as e:
#             self.logger.debug(f"openpyxl failed for {file_path}: {e}")
#         if dataframe is None:
#             try:
#                 dataframe = pd.read_excel(BytesIO(xls_data), engine='xlrd')
#             except Exception as e:
#                 self.logger.error(f"Failed to read Excel file {file_path} with xlrd: {e}")
#                 return pd.DataFrame()  # Return empty DataFrame if reading fails

#         if dataframe.empty:
#             self.logger.warning(f"No data found in {file_path}")
#             return pd.DataFrame()

#         dataframe = dataframe[dataframe[dataframe.columns[0]].apply(
#             lambda x: bool(re.search(r'[a-zA-Z]', str(x))) and pd.notna(x))]

#         return dataframe

#     def first_format_data_transformation(self, dataframe: pd.DataFrame, file_path: str) -> pd.DataFrame:
#         """
#         Transforms the raw data extracted from a file into a structured format with relevant categories and products.

#         This method processes the extracted data by renaming columns, identifying product categories, and cleaning city names.
#         It returns a structured DataFrame containing the transformed data.

#         Args:
#             dataframe (pd.DataFrame): The raw extracted data as a DataFrame.
#             file_path (str): The path of the file in the S3 bucket.

#         Returns:
#             pd.DataFrame: A DataFrame containing the transformed data, or an empty DataFrame if transformation fails.
#         """
#         # Keep only the first five columns and rename them
#         dataframe = dataframe.iloc[:, 0:5]
#         dataframe.columns = ['ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio', 'tendencia']
#         dataframe['ciudad'] = dataframe['ciudad'].str.lower().str.replace('bogotá, d.c.', 'bogota')

#         # Remove rows where 'ciudad' is null
#         dataframe = dataframe[~dataframe['ciudad'].isnull()]

#         # Get row indexes where the word 'cuadro' is present
#         index_cuadro = dataframe[dataframe['ciudad'].str.contains('cuadro', case=False, na=False)].index.tolist()

#         if not index_cuadro:
#             self.logger.warning(f"No 'cuadro' titles found in {file_path}")
#             return pd.DataFrame()  # Return empty DataFrame if no categories found

#         # Create target dataframe for all data
#         df_final = pd.DataFrame()

#         # Iterate over food categories
#         for i_categoria in range(len(index_cuadro) + 1):
#             try:
#                 if i_categoria == 0:
#                     dataframe_categoria = dataframe.iloc[1:index_cuadro[i_categoria]]
#                 elif i_categoria <= 6:
#                     dataframe_categoria = dataframe.iloc[index_cuadro[i_categoria - 1] + 2:index_cuadro[i_categoria]]
#                 else:
#                     dataframe_categoria = dataframe.iloc[index_cuadro[i_categoria - 1] + 2:]

#                 # Add category name
#                 dataframe_categoria = dataframe_categoria.copy()
#                 dataframe_categoria['categoria'] = self.categories_dict.get(i_categoria + 1, 'unknown')

#                 # Identify products within category
#                 index_producto = dataframe_categoria[dataframe_categoria['precio_minimo'].isnull()].index.tolist()
#                 if not index_producto:
#                     continue

#                 df_categoria_final = pd.DataFrame()

#                 for i_producto in range(len(index_producto)):
#                     if i_producto < len(index_producto) - 1:
#                         start_idx = index_producto[i_producto]
#                         end_idx = index_producto[i_producto + 1]
#                     else:
#                         start_idx = index_producto[i_producto]
#                         end_idx = None

#                     dataframe_producto = dataframe_categoria.loc[start_idx:end_idx].reset_index(drop=True)

#                     # Add product name
#                     producto_name = dataframe_producto.at[0, 'ciudad']
#                     dataframe_producto['producto'] = producto_name

#                     # Clean city names
#                     dataframe_producto['ciudad'] = dataframe_producto['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)

#                     # Extract marketplace if present
#                     dataframe_producto['mercado'] = dataframe_producto['ciudad'].str.extract(r',\s*(.*)')[0]
#                     dataframe_producto['ciudad'] = dataframe_producto['ciudad'].str.split(',').str[0].str.strip()
#                     dataframe_producto = dataframe_producto[~dataframe_producto['precio_medio'].isnull()]
                    
#                     # Drop first row (product name)
#                     dataframe_producto = dataframe_producto.iloc[1:].reset_index(drop=True)

#                     df_categoria_final = pd.concat([df_categoria_final, dataframe_producto], ignore_index=True)

#                 df_final = pd.concat([df_final, df_categoria_final], ignore_index=True)

#             except Exception as e:
#                 self.logger.error(f"Error processing category {i_categoria} in file {file_path}: {e}")
#                 continue

#         if df_final.empty:
#             self.logger.warning(f"No data extracted from {file_path} after transformation.")
#             return df_final

#         # Add timestamps
#         try:
#             df_final['semana_no'] = int(Path(file_path).stem.split('_')[1])
#             df_final['anho'] = Path(file_path).stem[-4:]
#         except Exception as e:
#             self.logger.error(f"Error extracting week and year from {file_path}: {e}")
#             df_final['semana_no'] = None
#             df_final['anho'] = None

#         # Reorder columns
#         df_final = df_final[['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio',
#                              'tendencia', 'categoria', 'mercado', 'semana_no', 'anho']]
        
#         df_final= df_final[~df_final['precio_medio'].isnull()]
        
#         return df_final

#     def second_format_data_extraction(self, file_path: str) -> pd.DataFrame:
#         """
#         Extracts and processes data from an Excel file stored in an S3 bucket using multiple sheets for the second format.

#         This method reads Excel files stored in an S3 bucket, handling different file formats and sheets. 
#         It returns a structured DataFrame containing the extracted data.

#         Args:
#             file_path (str): The path of the file in the S3 bucket.

#         Returns:
#             pd.DataFrame: A DataFrame containing the extracted data, or an empty DataFrame if extraction fails.
#         """
#         bucket = self.s3.Bucket(self.bucket_name)
#         obj = bucket.Object(file_path)
#         xls_data = obj.get()['Body'].read()

#         xl = None
#         try:
#             xl = pd.ExcelFile(BytesIO(xls_data), engine='openpyxl')
#         except Exception as e:
#             self.logger.debug(f"openpyxl failed for {file_path}: {e}")
#         if xl is None:
#             try:
#                 xl = pd.ExcelFile(BytesIO(xls_data), engine='xlrd')
#             except Exception as e:
#                 self.logger.error(f"Failed to read Excel file {file_path} with xlrd: {e}")
#                 return pd.DataFrame()

#         full_dataframe = pd.DataFrame()
#         for index in range(1, 9):
#             sheet_name = xl.sheet_names[index]
#             dataframe = None
#             try:
#                 dataframe = pd.read_excel(BytesIO(xls_data), sheet_name=sheet_name)
#             except Exception as e:
#                 self.logger.error(f"Failed to read sheet {sheet_name} in {file_path}: {e}")
#                 continue

#             if dataframe.empty:
#                 self.logger.warning(f"No data found in sheet {sheet_name} of {file_path}")
#                 continue

#             if file_path == 'reports/2018/week_20_Sem_12may__18may_2018.xlsx':
#                 dataframe['mercado'] = dataframe['Mercado mayorista'].str.split(',').str[1].str.strip()
#                 dataframe['ciudad'] = dataframe['Mercado mayorista'].str.split(',').str[0].str.strip()
#                 dataframe.columns = dataframe.columns.str.lower().str.replace(' ','_').str.replace('í','i').str.replace('á','a')
#             else:

#                 if pd.isnull(dataframe.iloc[9, 0]):
#                     dataframe = dataframe.iloc[10:, :6]
#                 else:
#                     dataframe = dataframe.iloc[9:, :6]
#                 dataframe.columns = ['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio', 'tendencia']
#             dataframe = dataframe[~dataframe['ciudad'].isnull()]
#             dataframe['ciudad'] = dataframe['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)
#             dataframe['ciudad'] = dataframe['ciudad'].str.lower().str.replace('bogotá, d.c.', 'bogota')
#             dataframe['ciudad'] = dataframe['ciudad'].str.replace(r'\s*\([^)]*\)', '', regex=True)

#             # Adding categoria and ciudad info
#             dataframe['categoria'] = self.categories_dict[index]


#             # The name of the marketplaces is included on some of the city names. So we try to retrieve it
#             try:
#                 dataframe['mercado'] = dataframe['ciudad'].str.split(',').str[1].str.strip()
#             except:
#                 dataframe['mercado'] = np.nan

#             # Getting a clean version of city name
#             try:
#                 dataframe['ciudad'] = dataframe['ciudad'].str.split(',').str[0].str.strip()
#             except:
#                 pass
#             # Once data per file is complete, time stamps are added: year and week number
#             dataframe['semana_no'] = int(Path(file_path).name.split('_')[1])  # file_path.stem[5:7]
#             dataframe['anho'] = Path(file_path).stem[-4:]
          

#             # Add to full_dataframe
#             full_dataframe = pd.concat([full_dataframe, dataframe], ignore_index=True)

#         if full_dataframe.empty:
#             self.logger.warning(f"No data extracted from {file_path} after processing all sheets.")
#             return full_dataframe

#         # Reorder columns
#         full_dataframe = full_dataframe[['producto', 'ciudad', 'precio_minimo', 'precio_maximo', 'precio_medio',
#                                          'tendencia', 'categoria', 'mercado', 'semana_no', 'anho']]
#         return full_dataframe

#     def building_complete_report(self) -> pd.DataFrame:
#         """
#         Constructs a complete report by extracting and transforming data from two different file formats stored in an S3 bucket.

#         This method consolidates data from the two different file formats (first and second format) stored in an S3 bucket.
#         It combines and transforms the data into a structured report for analysis or further processing.

#         Returns:
#             pd.DataFrame: A complete report DataFrame containing data from both file formats.
#         """
#         first_format_paths_aws = self.first_format_paths(bucket_name=self.bucket_name)
#         second_format_paths_aws = self.second_format_paths(bucket_name=self.bucket_name)

#         first_format_final = pd.DataFrame()
#         self.logger.info('[INFO] First batch of files')

#         for file_path in tqdm(first_format_paths_aws):
#             dataframe = self.first_format_data_extraction(file_path)
#             if not dataframe.empty:
#                 transformed_df = self.first_format_data_transformation(dataframe, file_path)
#                 first_format_final = pd.concat([first_format_final, transformed_df], ignore_index=True)

#         self.logger.info('[INFO] Second batch of files')
#         second_format_final = pd.DataFrame()
#         for file_path in tqdm(second_format_paths_aws):
#             dataframe = self.second_format_data_extraction(file_path)
#             second_format_final = pd.concat([second_format_final, dataframe], ignore_index=True)

#         complete_report = pd.concat([first_format_final, second_format_final], ignore_index=True)
#         return complete_report


Overwriting src/DataWrangler.py


In [11]:
import pandas as pd
print(pd.__version__)

1.3.5


### DataValidator

In [30]:
# %%writefile src/DataValidator.py

# import pandas as pd
# import re
# from unidecode import unidecode
# import logging

# class DataValidator:
#     """
#     The DataValidator class is responsible for validating and cleaning data from a DataFrame.
#     It provides methods to validate city names, product names, prices, trends, and categories.
#     It also has a function to remove accents and format text, ensuring consistency and validity of the data.

#     Attributes:
#         valid_cities (list): A list of valid Colombian city names for validation.
#         valid_products (list): A list of valid product names for validation.
#         valid_tendencias (list): A list of valid trends ('tendencia') for validation.
#         valid_categorias (list): A list of valid categories for validation.
#         logger (logging.Logger): A logger instance to log information, warnings, and errors.

#     Methods:
#         validate_city(city: str) -> bool:
#             Checks if the provided city name is valid.

#         validate_product(product: str) -> bool:
#             Checks if the provided product name is valid.

#         validate_price(price) -> bool:
#             Checks if the provided price is a non-negative integer.

#         validate_tendencia(tendencia: str) -> bool:
#             Checks if the provided trend ('tendencia') is valid.

#         validate_categoria(categoria: str) -> bool:
#             Checks if the provided category is valid.

#         remove_accents_trails_caps(text: str) -> str:
#             Removes accents, trailing spaces, and converts text to lowercase.

#         validate_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
#             Validates the entire DataFrame, removing rows that fail validation, and returns a cleaned DataFrame.
#     """
#     def __init__(self, 
#                  logger: logging.Logger):
#         """
#         Initializes the DataValidator with predefined reference data for validation and a logger instance.

#         Args:
#             logger (logging.Logger): A logger instance for logging messages.
#         """
#         # Load or define reference data for validation
#         self.valid_cities = [
#             'bogota', 'bucaramanga', 'cali', 'cartagena', 'duitama', 'ibague',
#        'ipiales', 'medellin', 'neiva', 'pamplona', 'pasto', 'sogamoso',
#        'tunja', 'villavicencio', 'barranquilla', 'buenaventura',
#        'cartago', 'cucuta', 'manizales', 'monteria', 'palmira', 'pereira',
#        'popayan', 'rionegro', 'san_gil', 'sincelejo', 'socorro', 'tulua',
#        'valledupar', 'chiquinquira', 'marinilla', 'cajamarca',
#        'carmen_de_viboral', 'el_santuario', 'la_ceja', 'san_vicente',
#        'sonson', 'armenia', 'penol', 'santa_barbara', 'yarumal',
#        'la_virginia', 'la_union', 'la_parada', 'la_dorada', 'charala',
#        'guepsa', 'moniquira', 'puente_nacional', 'santana', 'velez',
#        'caparrapi', 'nocaima', 'villeta', 'honda', 'ubate',
#        'cartagena', 'yolombo',
#        'yopal', 'malambo', 'el_carmen_de_viboral', 'santa_marta',
#        'florencia', 'tuquerres', 'san_andres_de_tumaco', 'arauca',
#        'consaca', 'sandona', 'ancuya', 'tibasosa',
#        'san_sebastian_de_mariquita', 'san_vicente_ferrer'
#         ]
        
#         self.valid_products = [
#             'acelga', 'ahuyama', 'ajo', 'ajo_importado', 'aji_dulce',
#        'aji_topito_dulce', 'apio', 'arveja_verde_en_vaina'
#         ]
        
#         self.valid_tendencias = ['+', '-', '=', '++', '--', '+++', '---']
#         self.valid_categorias = [
#             'verduras_hortalizas', 'frutas_frescas', 'tuberculos_raices_platanos', 'granos_cereales',
#             'huevos_lacteos', 'carnes', 'pescados', 'productos_procesados'
#         ]
#         self.logger = logger
        
#     def validate_city(self, city: str) -> bool:
#         """
#         Check if the provided city name is valid.

#         Args:
#             city (str): The city name to validate.

#         Returns:
#             bool: True if the city is valid, otherwise False.
#         """
#         return city in self.valid_cities
# #         return True

    
#     def validate_product(self, product: str) -> bool:
#         """
#         Check if the provided product name is valid.

#         Args:
#             product (str): The product name to validate.

#         Returns:
#             bool: True if the product is valid, otherwise False.
#         """
# #         return product in self.valid_products
#         return True

#     def validate_price(self, price) -> bool:
#         """
#         Check if the provided price is a non-negative integer.

#         Args:
#             price (int or float): The price value to validate.

#         Returns:
#             bool: True if the price is a non-negative integer, otherwise False.
#         """
#         try:
#             return price >= 0
#         except TypeError:
#             return False

#     def validate_tendencia(self, tendencia: str) -> bool:
#         """
#         Check if the provided trend ('tendencia') is valid.

#         Args:
#             tendencia (str): The trend value to validate.

#         Returns:
#             bool: True if the trend is valid, otherwise False.
#         """
#         return tendencia in self.valid_tendencias

#     def validate_categoria(self, categoria: str) -> bool:
#         """
#         Check if the provided category is valid.

#         Args:
#             categoria (str): The category value to validate.

#         Returns:
#             bool: True if the category is valid, otherwise False.
#         """
#         return categoria in self.valid_categorias
#     # Function to remove accents
    
#     def remove_accents_trails_caps(self, text):
#         """
#         Removes accents, converts text to lowercase, and replaces spaces with underscores.

#         Args:
#             text (str): The text to be cleaned.

#         Returns:
#             str: Cleaned text without accents, all lowercase, and spaces replaced by underscores.
#         """
#         return unidecode(text.lower().replace(' ','_').replace(',','').replace('(','').replace(')',''))

#     def validate_dataframe(self, dataframe: pd.DataFrame) -> pd.DataFrame:
#         """
#         Validates the entire DataFrame, removing rows that fail validation.
#         It applies all individual validation methods to ensure that the data is consistent.

#         Args:
#             dataframe (pd.DataFrame): The DataFrame to validate.

#         Returns:
#             pd.DataFrame: A DataFrame containing only valid rows. If no rows are valid, returns an empty DataFrame.
#         """
#         try: 
#             # Validate each column and store the valid rows in a new DataFrame
#             dataframe['ciudad'] = dataframe['ciudad'].apply(self.remove_accents_trails_caps)
#             dataframe['producto'] = dataframe['producto'].apply(self.remove_accents_trails_caps)

#             valid_df = dataframe[
#                 dataframe['ciudad'].apply(self.validate_city) &
#                 dataframe['producto'].apply(self.validate_product) &
#                 dataframe['precio_minimo'].apply(self.validate_price) &
#                 dataframe['precio_maximo'].apply(self.validate_price) &
#                 dataframe['precio_medio'].apply(self.validate_price) &
#                 dataframe['tendencia'].apply(self.validate_tendencia) &
#                 dataframe['categoria'].apply(self.validate_categoria)
#             ]

#             # Log the rows that were removed
#             invalid_rows = dataframe[~dataframe.index.isin(valid_df.index)]
#             if not invalid_rows.empty:
#                 self.logger.warning(f"Invalid rows found and removed")

#             return valid_df
#         except: 
#             dataframe = pd.DataFrame()
#             return dataframe

Overwriting src/DataValidator.py


### DataIngestor

In [21]:
# table_name = 'product_prices'
text = f"""
CREATE TABLE {table_name} (
    producto VARCHAR(255),
    ciudad VARCHAR(255),
    precio_minimo INT,
    precio_maximo INT,
    precio_medio INT,
    tendencia VARCHAR(10),
    categoria VARCHAR(255),
    mercado VARCHAR(255),
    semana_no INT,
    anho INT
);
"""
print(text)


CREATE TABLE product_prices (
    producto VARCHAR(255),
    ciudad VARCHAR(255),
    precio_minimo INT,
    precio_maximo INT,
    precio_medio INT,
    tendencia VARCHAR(10),
    categoria VARCHAR(255),
    mercado VARCHAR(255),
    semana_no INT,
    anho INT
);



In [11]:
# %%writefile src/DataIngestor.py

# import sqlalchemy
# from sqlalchemy import create_engine
# import logging
# import pandas as pd
# from sqlalchemy.exc import SQLAlchemyError

# class DataIngestor:
#     """
#     The DataIngestor class is responsible for inserting data from a Pandas DataFrame into a PostgreSQL database.
#     It provides methods to handle the insertion of large DataFrames into a specified table using the SQLAlchemy engine.

#     Attributes:
#         engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine used to connect to the PostgreSQL database.
#         logger (logging.Logger): A logger instance to log information, warnings, and errors.

#     Methods:
#         insert_dataframe_to_db(dataframe: pd.DataFrame, table_name: str) -> None:
#             Inserts the contents of a DataFrame into a specified PostgreSQL table in chunks.
#     """
#     def __init__(self, 
#                  engine:sqlalchemy.engine.base.Engine,
#                 logger:logging.Logger)-> None:
#         """
#         Initializes the DataIngestor with a SQLAlchemy engine and a logger instance.

#         Args:
#             engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine for the database connection.
#             logger (logging.Logger): A logger instance for logging messages.
#         """
#         self.engine = engine
#         self.logger = logger
        
#     def insert_dataframe_to_db(self,
#                                dataframe: pd.DataFrame, 
#                                table_name: str)-> None:
    
#         """
#         Inserts a DataFrame into a PostgreSQL table. Handles large DataFrames by inserting data in chunks.

#         Args:
#             dataframe (pd.DataFrame): The DataFrame to be inserted into the database.
#             table_name (str): The name of the PostgreSQL table where the data will be inserted.

#         Returns:
#             None

#         Raises:
#             SQLAlchemyError: If an error occurs while inserting data into the database.

#         Example:
#             # Create an engine and logger
#             engine = create_engine("postgresql+psycopg2://user:password@localhost:5432/mydatabase")
#             logger = logging.getLogger(__name__)
            
#             # Initialize DataIngestor
#             ingestor = DataIngestor(engine, logger)
            
#             # Example DataFrame to insert
#             dataframe = pd.DataFrame({'column1': [1, 2], 'column2': ['A', 'B']})
            
#             # Insert DataFrame into table 'my_table'
#             ingestor.insert_dataframe_to_db(dataframe=dataframe, table_name='table_name')
#         """

#         try:
#             # Insert data in chunks to handle large DataFrames
#             dataframe.to_sql(table_name, 
#                              self.engine, 
#                              if_exists='append', 
#                              index=False, 
#                              chunksize=500)

#             self.logger.info(f"Data successfully inserted into {table_name}.")

#         except SQLAlchemyError as e:
#             self.logger.error(f"Error inserting data: {e}")
#             raise

#         finally:
#             # Close the connection
#             self.engine.dispose()


Overwriting src/DataIngestor.py


### ProcessHandler

In [6]:
# %%writefile src/ProcessHandler.py

# from src.DataCollector import DataCollector
# from src.DataWrangler import DataWrangler
# from src.FileNameBuilder import FileNameBuilder
# from src.DataValidator import DataValidator
# from src.DataIngestor import DataIngestor

# import pandas as pd
# from tqdm import tqdm
# from pathlib import Path
# import boto3
# import logging
# import sqlalchemy

# class ProcessHandler(DataWrangler, DataIngestor, DataCollector, DataValidator, FileNameBuilder):
#     """
#     The ProcessHandler class orchestrates the entire process of collecting, transforming, validating, 
#     and ingesting data from S3 into a PostgreSQL database. It combines functionalities from multiple classes 
#     to handle different stages of data processing, ensuring efficient and robust data management.

#     Inheritance:
#         DataWrangler: For data extraction and transformation.
#         DataIngestor: For data insertion into PostgreSQL.
#         DataCollector: For collecting and tracking files from S3.
#         DataValidator: For validating data against predefined criteria.
#         FileNameBuilder: For constructing paths for files in the S3 bucket.

#     Attributes:
#         s3 (boto3.resource): The S3 resource for interacting with AWS S3.
#         engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine for the PostgreSQL database connection.
#         bucket_name (str): The name of the S3 bucket to interact with.
#         table_name (str): The name of the PostgreSQL table to which data will be ingested.
#         logger (logging.Logger): Logger instance for logging process information.
#         files_tracker_df (pd.DataFrame): DataFrame tracking processed files and their status.

#     Methods:
#         __init__(self, s3, engine, bucket_name, table_name, logger):
#             Initializes the ProcessHandler with necessary attributes and loads the file tracker.

#         executing_process(self, output_dataframe: bool = False) -> pd.DataFrame:
#             Executes the data processing workflow, including data extraction, transformation, validation, and ingestion.

#         update_files_tracker_with_rds_load(self, file_name: str):
#             Updates the file tracker in S3 with the status of files loaded into the RDS.

#         querying_db(self, query: str) -> pd.DataFrame:
#             Executes a query on the PostgreSQL database and returns the result as a DataFrame.
#     """
#     def __init__(self, 
#                  s3:boto3.resource, 
#                  engine:sqlalchemy.engine.base.Engine, 
#                  bucket_name:str, 
#                  table_name:str, 
#                  logger:logging.Logger):
#         """
#         Initializes the ProcessHandler with necessary resources and configurations.

#         Args:
#             s3 (boto3.resource): The S3 resource to interact with AWS S3.
#             engine (sqlalchemy.engine.base.Engine): The SQLAlchemy engine for the PostgreSQL database connection.
#             bucket_name (str): The name of the S3 bucket.
#             table_name (str): The name of the table in the PostgreSQL database.
#             logger (logging.Logger): Logger instance for logging information.

#         Initializes the base classes and sets up the files tracker.
#         """
#         # Initialize DataCollector and other base classes
        
#         DataCollector.__init__(self, s3, logger)
#         DataIngestor.__init__(self, engine, logger)
#         DataWrangler.__init__(self, bucket_name, s3, logger)
#         DataValidator.__init__(self, logger)
#         FileNameBuilder.__init__(self, s3, logger)
#         # Set class attributes
#         self.s3 = s3
#         self.engine = engine
#         self.bucket_name = bucket_name
#         self.table_name = table_name
#         self.logger = logger

#         # Load files tracker after initializing the DataCollector
#         self.files_tracker_df = self.load_files_tracker(self.bucket_name)  
        
#         # Ensure 'rds_load' column exists in the files_tracker_df
#         if 'rds_load' not in self.files_tracker_df.columns:
#             self.files_tracker_df['rds_load'] = 'no'
        
# #         self.data_validator = DataValidator()  # Initialize the DataValidator

#     def executing_process(self, output_dataframe: bool = False) -> pd.DataFrame:
#         """
#         Executes the complete data processing workflow, including data extraction, transformation, validation, 
#         and ingestion into the PostgreSQL database. Only processes files not marked as 'rds_load' in the files tracker.

#         Args:
#             output_dataframe (bool): If True, returns the final concatenated DataFrame from all processed files.

#         Returns:
#             pd.DataFrame: The concatenated DataFrame of all processed files if output_dataframe is True. 
#                           Otherwise, returns None.
#         """
#         # Fetch all files from the source
#         self.get_files(self.bucket_name)

#         # Generate paths for the different file formats
#         first_format_paths_aws = self.first_format_paths(bucket_name=self.bucket_name)
#         second_format_paths_aws = self.second_format_paths(bucket_name=self.bucket_name)

#         first_format_final = pd.DataFrame()
#         self.logger.info('Started working on first batch of files')

#         # Process files in the first format
#         for file_path in tqdm(first_format_paths_aws):
            
#             file_name = Path(file_path).name
#             # Skip files that are already loaded into RDS
#             if not self.files_tracker_df.empty and self.files_tracker_df.loc[self.files_tracker_df['file'] == file_name, 'rds_load'].values[0] == 'yes':
# #                 logger.info(f"[INFO] Skipping file {file_name} as it is already loaded into RDS.")
#                 continue

#             dataframe = self.first_format_data_extraction(file_path)
#             if not dataframe.empty:
#                 transformed_df = self.first_format_data_transformation(dataframe, file_path)

#                 # Validate DataFrame before inserting into the database
#                 valid_df = self.validate_dataframe(transformed_df)

#                 if output_dataframe:
#                     first_format_final = pd.concat([first_format_final, transformed_df], ignore_index=True)
#                 self.insert_dataframe_to_db(dataframe=valid_df, table_name=self.table_name)
#                 self.update_files_tracker_with_rds_load(file_name)  # Update tracker after successful load
#         self.logger.info('Started working on second batch of files')
#         second_format_final = pd.DataFrame()
        
#         # Process files in the second format
#         for file_path in tqdm(second_format_paths_aws):
#             file_name = Path(file_path).name
#             # Skip files that are already loaded into RDS
#             if not self.files_tracker_df.empty and self.files_tracker_df.loc[self.files_tracker_df['file'] == file_name, 'rds_load'].values[0] == 'yes':
# #                 logger.info(f"[INFO] Skipping file {file_name} as it is already loaded into RDS.")
#                 continue

#             dataframe = self.second_format_data_extraction(file_path)
            
#             # Validate DataFrame before inserting into the database
#             valid_df = self.validate_dataframe(dataframe)
            
#             self.insert_dataframe_to_db(dataframe=valid_df, table_name=self.table_name)
#             self.update_files_tracker_with_rds_load(file_name)  # Update tracker after successful load
#             if output_dataframe:
#                 second_format_final = pd.concat([second_format_final, dataframe], ignore_index=True)

#         if output_dataframe:
#             complete_report = pd.concat([first_format_final, second_format_final], ignore_index=True)
#             return complete_report


#     def update_files_tracker_with_rds_load(self, file_name: str):
#         """
#         Updates the 'rds_load' status in the files tracker to 'yes' after successful insertion into the RDS.

#         Args:
#             file_name (str): The name of the file to update in the files tracker.

#         Returns:
#             None
#         """
#         # Check if the file is already present in the tracker and update it
#         if file_name in self.files_tracker_df['file'].values:
#             self.files_tracker_df.loc[self.files_tracker_df['file'] == file_name, 'rds_load'] = 'yes'
#         else:
#             # If not present, add a new entry
#             new_entry = pd.DataFrame({'file': [file_name], 'rds_load': ['yes']})
#             self.files_tracker_df = pd.concat([self.files_tracker_df, new_entry], ignore_index=True)
        
#         # Update the tracker in S3
#         self.update_files_tracker(self.files_tracker_df, self.bucket_name)

#     def querying_db(self, query: str) -> pd.DataFrame:
#         """
#         Executes a SQL query on the PostgreSQL database and returns the result as a DataFrame.

#         Args:
#             query (str): The SQL query to execute.

#         Returns:
#             pd.DataFrame: The result of the query as a Pandas DataFrame.

#         Example:
#             query = "SELECT * FROM product_prices WHERE ciudad = 'bogota'"
#             result_df = self.querying_db(query=query)
#         """
#         # Running query and importing it 
#         with self.engine.begin() as conn:
#             df = pd.read_sql(sql=query, con=conn)

#         print(f'[Info] Data Frame with {df.shape[0]} rows and {df.shape[1]} columns imported successfully.')
#         return df

Overwriting src/ProcessHandler.py


In [4]:
# %%writefile main.py

# from src.logging_setup import setup_logger
# from src.ProcessHandler import ProcessHandler
# from dotenv import load_dotenv
# from sqlalchemy import create_engine
# import boto3
# import os

# # Loading credentials
# load_dotenv()
# aws_access_key_id = os.environ['aws_access_key_id']
# aws_secret_access_key = os.environ['aws_secret_access_key']
    
# db_user = os.environ['db_user']
# db_pass = os.environ['db_pass']
# db_host = os.environ['db_host']
# db_port = os.environ['db_port']
# db_name = os.environ['db_name']

# table_name = os.environ['table_name']
# bucket_name = os.environ['bucket_name']


# # Creating connection to database
# engine = create_engine(f'postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}')

# # Creating boto3 session (access the S3 bucket)
# s3 = boto3.resource('s3',
#                     aws_access_key_id = aws_access_key_id, 
#                     aws_secret_access_key = aws_secret_access_key)

# # Initialize logger
# logger = setup_logger()


# sipsa_process = ProcessHandler(s3 = s3, 
#                                engine = engine, 
#                                bucket_name = bucket_name, 
#                                table_name = table_name, 
#                                logger = logger)

# sipsa_process.executing_process()

Writing main.py


## testing playground

In [33]:
# names = FileNameBuilder(s3 = s3, logger = logger)
# first_format = names.first_format_paths(bucket_name = bucket_name)
# second_format = names.second_format_paths(bucket_name = bucket_name)

In [34]:
# j = random.randint(a = 0, b = len(second_format))
# file_path = second_format[j]

In [35]:
# testing_validation = DataWrangler(bucket_name = bucket_name, 
#                                   s3 = s3, 
#                                   logger = logger )
# validator = DataValidator(logger=logger)

In [36]:
# import random
# i = random.randint(a = 0, b = len(first_format))
# file_path = first_format[i]
# # file_path = 'reports/2015/week_16_Anexo_13_17abr_2015.xls'
# # dataframe = first_format_data_extraction(file_path = file_path)
# dataframe = testing_validation.first_format_data_extraction(file_path = file_path)

# dataframe = testing_validation.first_format_data_transformation(dataframe = dataframe, 
#                                                     file_path = file_path)

# dataframe = validator.validate_dataframe(dataframe = dataframe)

# dataframe

In [37]:
# i = random.randint(a = 0, b = len(second_format))
# file_path = second_format[i]
# dataframe = testing_validation.second_format_data_extraction(file_path = file_path)
# dataframe = validator.validate_dataframe(dataframe = dataframe)
# dataframe

In [16]:

from src.logging_setup import setup_logger
from src.ProcessHandler import ProcessHandler
from dotenv import load_dotenv
from sqlalchemy import create_engine
import boto3
import os

# Loading credentials
load_dotenv()
aws_access_key_id = os.environ['aws_access_key_id']
aws_secret_access_key = os.environ['aws_secret_access_key']
    
db_user = os.environ['db_user']
db_pass = os.environ['db_pass']
db_host = os.environ['db_host']
db_port = os.environ['db_port']
db_name = os.environ['db_name']

table_name = os.environ['table_name']
bucket_name = os.environ['bucket_name']


# Creating connection to database
engine = create_engine(f'postgresql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}')

# Creating boto3 session (access the S3 bucket)
s3 = boto3.resource('s3',
                    aws_access_key_id = aws_access_key_id, 
                    aws_secret_access_key = aws_secret_access_key)

# Initialize logger
logger = setup_logger()


sipsa_process = ProcessHandler(s3 = s3, 
                               engine = engine, 
                               bucket_name = bucket_name, 
                               table_name = table_name, 
                               logger = logger)



2024-10-02 13:44:01,298 - INFO - Loaded existing files tracker from S3.
2024-10-02 13:44:01,298 - INFO - Loaded existing files tracker from S3.


In [17]:
query = f"""
SELECT * FROM {table_name} 
"""

In [18]:
    df = sipsa_process.querying_db(query = query)


[Info] Data Frame with 2627989 rows and 10 columns imported successfully.


In [19]:
df.producto.unique()

array(['acelga', 'ahuyama', 'ajo', 'ajo_importado', 'aji_dulce',
       'aji_topito_dulce', 'apio', 'arveja_verde_en_vaina',
       'arveja_verde_en_vaina_pastusa', 'berenjena', 'brocoli',
       'calabacin', 'calabaza', 'cebolla_cabezona_blanca',
       'cebolla_cabezona_blanca_bogotana',
       'cebolla_cabezona_blanca_importada',
       'cebolla_cabezona_blanca_pastusa',
       'cebolla_cabezona_blanca_peruana',
       'cebolla_cabezona_roja_importada', 'cebolla_cabezona_roja_ocanera',
       'cebolla_cabezona_roja_peruana', 'cebolla_junca',
       'cebolla_junca_aquitania', 'cebolla_junca_berlin',
       'cebolla_junca_tenerife', 'cebolla_junca_pastusa',
       'cebolla_puerro', 'cebollin_chino', 'chocolo_mazorca', 'cidra',
       'cilantro', 'coles', 'coliflor', 'espinaca', 'frijol_verde_bolo',
       'frijol_verde_cargamanto', 'frijol_verde_en_vaina', 'haba_verde',
       'habichuela', 'habichuela_larga', 'lechuga_batavia',
       'lechuga_crespa_morada', 'lechuga_crespa_verde', 

In [21]:
df[df['producto']=='mostaza_doy_pack']

Unnamed: 0,producto,ciudad,precio_minimo,precio_maximo,precio_medio,tendencia,categoria,mercado,semana_no,anho
1181462,mostaza_doy_pack,cartagena,16667,17500,16875,=,productos_procesados,,20,2018
1186113,mostaza_doy_pack,cartagena,16667,17500,16875,=,productos_procesados,bazurto,21,2018
1190753,mostaza_doy_pack,cartagena,16667,17500,16875,=,productos_procesados,bazurto,22,2018
1195383,mostaza_doy_pack,cartagena,16667,17500,16875,=,productos_procesados,bazurto,23,2018
1200012,mostaza_doy_pack,cartagena,16667,17500,16875,=,productos_procesados,bazurto,24,2018
...,...,...,...,...,...,...,...,...,...,...
2623467,mostaza_doy_pack,barranquilla,13598,13750,13662,+,productos_procesados,granabastos,8,2024
2623468,mostaza_doy_pack,sincelejo,14545,15985,15303,+,productos_procesados,nuevo mercado,8,2024
2627788,mostaza_doy_pack,barranquilla,13580,14098,13891,+,productos_procesados,barranquillita,9,2024
2627789,mostaza_doy_pack,barranquilla,13258,13674,13460,-,productos_procesados,granabastos,9,2024
