# Data Persistent Loader - HDFS + Parquet

In [70]:
from simpledbf import Dbf5
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import os
from tqdm.notebook import tqdm
import subprocess
import re
from datetime import datetime

In [71]:
# Get all the paths of the files to upload
folder = '../../../data/temporal_landing/x/'
dbf_files = [os.path.join(folder, f) for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f)) if f.endswith('.DBF')]

In [72]:
# Create a parquet file for every DBF file:
for file in tqdm(dbf_files):
    try:
        # Parse the DBF file into a dataframe
        batch = Dbf5(folder + file, codec='latin-1')
        batch = batch.to_dataframe()
        # Add the batch week column
        batch['BATCH_WEEK'] = re.search(r'\d+', os.path.basename(file)).group()
        # Add the loading date column
        batch['LOAD_DATE'] = datetime.today().strftime('%Y%m%d')

        # Create the row groups
        # Get all the available boarding dates' years
        years = sorted(pd.to_datetime(batch['FEMB'], format='%Y%m%d').dt.year.unique().tolist(), reverse=True)
        # Convert the dataframe into a pyarrow table
        batch = pa.Table.from_pandas(batch)
        # For every year create a row group
        # In this case, we will include all the columns in the row group
        my_row_groups = []

        for year in years:
            # string_column = pa.array(batch.column('FEMB'), type=pa.string())
            string_column = [str(i) for i in batch.column('FEMB').to_pylist()]
            mask = [s.startswith(str(year)) for s in string_column]
            filtered_table = batch.filter(mask)
            # Get all the rows from that year
            my_row_groups.append(filtered_table)
        # Create the Parquet file
        parquet_writer = pq.ParquetWriter(folder + os.path.basename(file).split('.')[0] + '.parquet',
                                          my_row_groups[0].schema)
        # Add every row group
        for rg in my_row_groups:
            parquet_writer.write_table(rg)
        parquet_writer.close()

    except Exception as e:
        print(f"Error generating parquet file for '{file}':{type(e).__name__}: {str(e)}")

    else:
        # Delete the DBF file from the temporal landing zone
        os.remove(os.path.abspath(os.path.join(folder, file)))

  0%|          | 0/317 [00:00<?, ?it/s]

In [73]:
def add_file_to_hdfs(file_path, hdfs_directory, ingestion_type):
    # Construct the HDFS command
    insert_cmd = 'hadoop fs -put {} {}'.format(file_path, hdfs_directory)
    # Execute the HDFS command
    subprocess.run(insert_cmd, shell=True, capture_output=True, text=True)
    # Check if the file was properly uploaded
    try:
        # Execute the command and capture the output
        subprocess.check_output('hadoop fs -test -f {}{}'.format(hdfs_directory, os.path.basename(file_path)), shell=True)
    except subprocess.CalledProcessError:
        # If the command returns a non-zero exit code, the file does not exist
        print('Error uploading {}'.format(os.path.basename(file_path)))
    else:
        # If the command returns a zero code, the file exists and do the following
        # Log to register the load
        logs = pd.DataFrame(columns=['filename', 'type', 'date', 'folder'])  # logs
        logs.loc[0] = [os.path.basename(file_path),
                       ingestion_type,
                       datetime.today().date(),
                       hdfs_directory]
        logs.to_csv('./log.csv', mode='a', index=False,
                    header=not os.path.exists('./log.csv'))
        # Delete the parquet file from the temporal landing zone
        os.remove(file_path)


In [74]:
# Get all the parquet files paths
parquet_files = [os.path.abspath(os.path.join(folder, file_name)) for file_name in os.listdir(folder) if file_name.endswith('.parquet')]

# Define the directory in HDFS to store the files
hdfs_directory = '/thesis/peru/exports/'
# Add the files
for file in tqdm(parquet_files):
    add_file_to_hdfs(file, hdfs_directory, 'historical')


  0%|          | 0/317 [00:00<?, ?it/s]