## Check and load partitions

This code is a Python function designed to check and load data partitions from a local folder based on a DataFrame containing folder names and associated file lists. The function returns a dictionary with loaded DataFrames, each corresponding to a specific folder.

The function returns a dictionary (dataframe_dicc) containing loaded DataFrames, where each key is the name of the corresponding folder.



## Google Drive API: Pip install assistance

In [3]:
pip install gdown google-api-python-client google-auth google-auth-oauthlib google-auth-httplib2

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 5, Finished, Available)

Collecting gdown
  Downloading gdown-5.1.0-py3-none-any.whl (17 kB)
Collecting google-api-python-client
  Downloading google_api_python_client-2.118.0-py2.py3-none-any.whl (12.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.1/12.1 MB[0m [31m150.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting google-auth-httplib2
  Downloading google_auth_httplib2-0.2.0-py2.py3-none-any.whl (9.3 kB)
Collecting httplib2<1.dev0,>=0.15.0 (from google-api-python-client)
  Downloading httplib2-0.22.0-py3-none-any.whl (96 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.9/96.9 kB[0m [31m38.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting google-api-core!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0,<3.0.0.dev0,>=1.31.5 (from google-api-python-client)
  Downloading google_api_core-2.17.1-py3-none-any.whl (137 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m137.0/137.0 kB[0m [31m55.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting uritemplate<5,

## Libraries Import

In [4]:
import os
import io
import pandas as pd
import builtin.utils as ut

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 6, Finished, Available)

## Check and Load Function

 `check_and_load_partitions(local_folder_path, check_df_path)`

Checks and loads partitions from the specified local folder based on a provided DataFrame with folder names and associated file lists, and returns a dictionary of loaded DataFrames.

#### Parameters:
- `local_folder_path` (str): Local path of the folder to analyze.
- `check_df_path` (str): Path to the DataFrame file containing folder names and file lists.

#### Returns:
- `dataframe_dicc` (dict): Dictionary containing loaded DataFrames with corresponding folder names.

#### Example:
```python
check_and_load_partitions('/local/path/to/folder', '/path/to/check_dataframe.parquet')


In [5]:
# Check and load partitions based on a DataFrame with folder names and file lists
def check_and_load_partitions(local_folder_path, check_df_path):
    """
    Checks and loads partitions from the specified local folder based on a provided DataFrame
    with folder names and associated file lists, and returns a dictionary of loaded DataFrames.

    Parameters:
    - local_folder_path (str): Local path of the folder to analyze.
    - check_df_path (str): Path to the DataFrame file containing folder names and file lists.

    Returns:
    - dataframe_dicc (dict): Dictionary containing loaded DataFrames with corresponding folder names.

    Example:
    - check_and_load_partitions('/local/path/to/folder', '/path/to/check_dataframe.parquet')
    """

    # Initialize an empty dictionary to store loaded DataFrames
    dataframe_dicc = {}
    dataframe_object = pd.DataFrame()

    # Read the check DataFrame from the specified path
    check_df = pd.read_parquet(check_df_path)

    # Iterate through each row in the check DataFrame
    for row in check_df.itertuples():
        check_name = row.name
        check_list = row.files
        aux_folder_path = os.path.join(local_folder_path, check_name)

        # Check if the folder corresponding to the current row exists locally
        if os.path.isdir(aux_folder_path):
            aux_list = os.listdir(aux_folder_path)
            dataframe_list = []

            # Iterate through each file in the folder
            for check_file in aux_list:

                # Check if the file is not in the expected file list
                if not check_file in check_list:
                    potential_file_path = os.path.join(aux_folder_path, check_file)
                    
                    # Read the JSON file into a DataFrame
                    aux_df = pd.read_json(potential_file_path, lines=True)
                    dataframe_list.append(aux_df)
                
                else:
                    potential_file_path = pd.DataFrame()
                    # Read the JSON file into a DataFrame
                    aux_df = potential_file_path
                    dataframe_list.append(aux_df)

            # Concatenate DataFrames from the folder and add a 'date' column
           
            dataframe_object = pd.concat(dataframe_list, axis=0, ignore_index=True)
            if not dataframe_object.empty:
                dataframe_object['date'] = dataframe_object['time'].apply(ut.mili_to_datetime)

            # Store the DataFrame in the dictionary with the folder name as the key
            dataframe_dicc[check_name] = dataframe_object
            print(f'{check_name}: Partition Loaded 100%')

        # Check if the row corresponds to the main folder
        elif check_name == local_folder_path.split('/')[-1]:
            aux_list = os.listdir(local_folder_path)
            dataframe_list = []

            # Iterate through each file in the main folder
            for check_file in aux_list:
                # Check if the file is not in the expected file list
                if not check_file in check_list:
                    potential_file_path = os.path.join(local_folder_path, check_file)
                    
                    # Read the JSON file into a DataFrame
                    aux_df = pd.read_json(potential_file_path, lines=True)
                    dataframe_list.append(aux_df)
                
                else:
                    potential_file_path = pd.DataFrame()
                    # Read the JSON file into a DataFrame
                    aux_df = potential_file_path
                    dataframe_list.append(aux_df)


            # Concatenate DataFrames from the main folder and add a 'date' column
            
            dataframe_object = pd.concat(dataframe_list, axis=0, ignore_index=True)
            
            
            # Store the DataFrame in the dictionary with the folder name as the key
            dataframe_dicc[check_name] = dataframe_object
            print(f'{check_name}: Partition Loaded 100%')

    return dataframe_dicc


StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 7, Finished, Available)

## Update Database Function

 `update_database(dataframe_dicc, database_path)`

Updates a database with DataFrames from a dictionary and returns the updated dictionary.

#### Parameters:
- `dataframe_dicc` (dict): Dictionary containing DataFrames to be added to the database.
- `database_path` (str): Path to the database folder.

#### Returns:
- `dicc` (dict): Updated dictionary containing DataFrames.

#### Example:
```python
update_database({'review-parquet1': df1, 'review-parquet2': df2}, '/path/to/database')


In [6]:
# Update a database with DataFrames from a dictionary
def update_database(dataframe_dicc, database_path):
    """
    Updates a database with DataFrames from a dictionary and returns the updated dictionary.

    Parameters:
    - dataframe_dicc (dict): Dictionary containing DataFrames to be added to the database.
    - database_path (str): Path to the database folder.

    Returns:
    - dicc (dict): Updated dictionary containing DataFrames.

    Example:
    - update_database({'review-parquet1': df1, 'review-parquet2': df2}, '/path/to/database')
    """

    # Initialize an empty dictionary to store DataFrames
    dicc = {}

    # Get the name of the first file in the database path
    first_file = os.listdir(database_path)[0].split('.')[0]

    # Check if the first file matches the key of the provided dictionary
    if first_file == list(dataframe_dicc.keys())[0]:
        file_path = os.path.join(database_path, os.listdir(database_path)[0])
        parquet_name = first_file

        # Read the existing DataFrame from the first file in the database
        db_df = pd.read_parquet(file_path)

        # Extract and concatenate the DataFrame from the provided dictionary
        if not len(dataframe_dicc[first_file]) == 0:
            dicc_df = pd.DataFrame(dataframe_dicc[first_file])
        else:
            dicc_df = pd.DataFrame()
            

        dataframe_object = pd.concat([db_df, dicc_df], axis=0, ignore_index=True)

        # Add the concatenated DataFrame to the updated dictionary
        dicc[parquet_name] = dataframe_object

        return dicc

    else:
        # Iterate through each file in the database path
        for parquet in os.listdir(database_path):

            file_path = os.path.join(database_path, parquet)

            # Construct the name for the key in the updated dictionary
            parquet_name = f"review-{parquet.split('.')[0]}"

            # Read the existing DataFrame from the current file in the database
            db_df = pd.read_parquet(file_path)

            # Extract and concatenate the DataFrame from the provided dictionary
            if not len(dataframe_dicc[parquet_name]) == 0:
                dicc_df = pd.DataFrame(dataframe_dicc[parquet_name])
            else:
                dicc_df = pd.DataFrame()
                
            dataframe_object = pd.concat([db_df, dicc_df], axis=0, ignore_index=True)

            # Add the concatenated DataFrame to the updated dictionary with the appropriate key
            aux_name = parquet.split('.')[0]
            dicc[aux_name] = dataframe_object 

    return dicc


StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 8, Finished, Available)

## Exercise Test

In [7]:
sitios_path = '/lakehouse/default/Files/otra_prueba/metadataSitiosPrueba'
sitios_check = '/lakehouse/default/Files/otra_prueba/pruebaList/metadataSitiosPrueba.parquet'
sitios_dicc = check_and_load_partitions(sitios_path, sitios_check)
prueba_path = '/lakehouse/default/Files/otra_prueba/databasePrueba'

#Actualizar la data
database_sitios = '/lakehouse/default/Files/otra_prueba/databasePrueba/Metadata_sitios_parquet'
sitios_dicc= update_database(sitios_dicc, database_sitios)
#Montar la data
ut.dataframe_to_parquet(sitios_dicc, 'Metadata_sitios_parquet',prueba_path)

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 9, Finished, Available)

metadataSitiosPrueba: Partition Loaded 100%
Dataframes saved successfully in df_database/Metadata_sitios_parquet


In [8]:
estados_path = '/lakehouse/default/Files/otra_prueba/reviewEstadosPrueba'
estados_check = '/lakehouse/default/Files/otra_prueba/pruebaList/reviewEstadosPrueba.parquet'
estados_dicc = check_and_load_partitions(estados_path, estados_check)
prueba_path = '/lakehouse/default/Files/otra_prueba/databasePrueba'

#Actualizar la data
database_estados = '/lakehouse/default/Files/otra_prueba/databasePrueba/Review_estados_parquet'
estados_dicc= update_database(estados_dicc, database_estados)
#Montar la data
ut.dataframe_to_parquet(estados_dicc, 'Review_estados_parquet',prueba_path)

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 10, Finished, Available)

review-Texas: Partition Loaded 100%
review-Washington: Partition Loaded 100%
review-Wyoming: Partition Loaded 100%
Dataframes saved successfully in df_database/Review_estados_parquet
Dataframes saved successfully in df_database/Review_estados_parquet


## Original Routing:

Since this point. All routes used are related with the original database. Here we can find all path needed in order to link information extracted from the requested drive folder and database created

## Metadata - Sitios Partition Update

In [9]:
'''estado_path = '/lakehouse/default/Files/original/reviews-estados'
estado_listCheck = '/lakehouse/default/Files/notes_and_list/reviews-estados.parquet'

estadoDicc = check_and_load_partitions(estado_path, estado_listCheck)
df_database = '/lakehouse/default/Files/df_database'

#Data Update
estado_database = '/lakehouse/default/Files/df_database/Review_estados_parquet'
estadoDicc = update_database(estadoDicc, estado_database)

#Upload Data
ut.dataframe_to_parquet(estadoDicc,'Review_estados_parquet',df_database)'''

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 11, Finished, Available)

"estado_path = '/lakehouse/default/Files/original/reviews-estados'\nestado_listCheck = '/lakehouse/default/Files/notes_and_list/reviews-estados.parquet'\n\nestadoDicc = check_and_load_partitions(estado_path, estado_listCheck)\ndf_database = '/lakehouse/default/Files/df_database'\n\n#Data Update\nestado_database = '/lakehouse/default/Files/df_database/Review_estados_parquet'\nestadoDicc = update_database(estadoDicc, estado_database)\n\n#Upload Data\nut.dataframe_to_parquet(estadoDicc,'Review_estados_parquet',df_database)"

## Review - Estados Partition Update

In [10]:
'''metadata_path = '//lakehouse/default/Files/original/metadata-sitios'
metadata_listCheck = '/lakehouse/default/Files/notes_and_list/metadata-sitios.parquet'

#metadataDicc = check_and_load_partitions(metadata_path, metadata_listCheck)
#df_database = '/lakehouse/default/Files/df_database'

#Data Update
metadata_database = '/lakehouse/default/Files/df_database/Metadata_sitios_parquet'
metadataDicc = update_database(metadataDicc, metadata_database)

#Upload Data
ut.dataframe_to_parquet(metadataDicc,'Metadata_sitios_parquet',df_database)'''

StatementMeta(, 75228702-638b-401d-af11-858e354c4c4d, 12, Finished, Available)

"metadata_path = '//lakehouse/default/Files/original/metadata-sitios'\nmetadata_listCheck = '/lakehouse/default/Files/notes_and_list/metadata-sitios.parquet'\n\n#metadataDicc = check_and_load_partitions(metadata_path, metadata_listCheck)\n#df_database = '/lakehouse/default/Files/df_database'\n\n#Data Update\nmetadata_database = '/lakehouse/default/Files/df_database/Metadata_sitios_parquet'\nmetadataDicc = update_database(metadataDicc, metadata_database)\n\n#Upload Data\nut.dataframe_to_parquet(metadataDicc,'Metadata_sitios_parquet',df_database)"