1. Load data from s3
2. Concatenate
3. Merge on new ID column
4. Save to postgres

* df1 data is pipe delimited and the ID columns need to be combined to match the df2 ID column
* df2 data is fixed width

In [4]:
import pandas as pd
import s3fs
from sqlalchemy import create_engine
import psycopg2
from credentials import *

In [5]:
# LOAD FUNCTIONS

def concat_dfs(csvs, sep=',', fwf=False):
    """ 
    * take a list of pandas dataframes and concatenate them vertically
    * sep is the delimiter
    * fwf indicates whether the file is fixed width as opposed to delimited
    """

    # turn the list of csvs into a list of Pandas dataframes
    dfs = [ pd.read_fwf(file) if fwf else pd.read_csv(file, sep=sep) for file in csvs ]

    print("Original dataframes:")
    for df in dfs:
        print(df)

    # concatenate dataframes and reset index
    result = pd.concat(dfs, ignore_index=True)
    
    return result

def load_from_s3():
   # pipe delimited
    df1 = concat_dfs(PIPE_DELIMITED_FILES, sep='|')

    # fixed width
    df2 = pd.read_fwf(FIXED_WIDTH_FILE)

    print("Loaded files from s3")
    print(df1)
    print(df2)

    return [df1,df2]


def merge_dfs(df1, df2):
    # concatenate the 3 separate ID rows in df1 and remove the partial ID columns
    df1.insert(0, 'ID', df1['ID1'].astype('str') + df1['ID2'] + df1['ID3'])
    df1 = df1[['ID', 'A', 'B']]

    # merge the dataframes on ID and sort by ID
    result = df1.merge(df2, left_on='ID', right_on='ID').sort_values(by='ID')

    print("Merged dataframes")
    print(result)

    return result

def save_to_rds(df):
    try:
        # connect to database
        conn = psycopg2.connect(
            host=DATABASE_ENDPOINT,
            database=DATABASE_NAME,
            user=USERNAME,
            password=PASSWORD,
            port=PORT
        )
        print("Connected to database")

        # create engine
        engine = create_engine(f'postgresql://{USERNAME}:{PASSWORD}@{DATABASE_ENDPOINT}:{PORT}/{DATABASE_NAME}')

        # insert data
        try:
            df.to_sql('table1', con=engine, if_exists='append', chunksize=20000)
        except:
            print("Data format doesn't match database format")

        # commit to database
        conn.commit()
        print("Saved to database")

        # close connection
        conn.close()
        print("Connection closed")

    except psycopg2.OperationalError as e:
        print(e)

In [7]:
# LOAD DATA FROM S3 AND FORMAT

dfs = load_from_s3()

Original dataframes:
   ID1 ID2   ID3  A  B
0    8   z  text  9  7
1    4   z  text  6  1
2    3   z  text  8  3
3    0   z  text  7  4
   ID1 ID2   ID3  A  B
0    9   z  text  1  2
1    5   z  text  3  5
2    2   z  text  9  4
   ID1 ID2   ID3  A  B
0    1   z  text  1  2
1    6   z  text  6  3
2    7   z  text  7  2
Loaded files from s3
   ID1 ID2   ID3  A  B
0    8   z  text  9  7
1    4   z  text  6  1
2    3   z  text  8  3
3    0   z  text  7  4
4    9   z  text  1  2
5    5   z  text  3  5
6    2   z  text  9  4
7    1   z  text  1  2
8    6   z  text  6  3
9    7   z  text  7  2
       ID  D  E  F
0  0ztext  1  2  3
1  1ztext  4  5  6
2  2ztext  7  8  9
3  3ztext  1  2  3
4  4ztext  4  5  6
5  5ztext  7  8  9
6  6ztext  1  2  3
7  7ztext  4  5  6
8  8ztext  7  8  9
9  9ztext  1  2  3


In [8]:
# MERGE THE 2 DATAFRAMES

df = merge_dfs(dfs[0], dfs[1])

Merged dataframes
       ID  A  B  D  E  F
3  0ztext  7  4  1  2  3
7  1ztext  1  2  4  5  6
6  2ztext  9  4  7  8  9
2  3ztext  8  3  1  2  3
1  4ztext  6  1  4  5  6
5  5ztext  3  5  7  8  9
8  6ztext  6  3  1  2  3
9  7ztext  7  2  4  5  6
0  8ztext  9  7  7  8  9
4  9ztext  1  2  1  2  3


In [10]:
# SAVE TO POSTGRES

save_to_rds(df)

Connected to database
Saved to database
Connection closed
