In [1]:
## !pip install openpyxl

import os
import gluestick as gs
import pandas as pd
from glob import glob
from datetime import datetime
import time
from utilities import create_directories, to_snake_case, rename_columns_to_snake_case
from stream_utils import determine_stream_name

In [2]:
# Get export format
EXPORT_FORMAT = os.environ.get("DEFAULT_EXPORT_FORMAT", "singer")

# Get tenant id
TENANT_ID = os.environ.get("USER_ID", os.environ.get("TENANT", "default"))

# Establish standard directories for hotglue
ROOT_DIR = os.environ.get("ROOT_DIR", ".")
INPUT_DIR = f"{ROOT_DIR}/sync-output"
OUTPUT_DIR = f"{ROOT_DIR}/etl-output"

In [3]:
create_directories(INPUT_DIR, OUTPUT_DIR)

In [4]:
fallback_version = False
try:
    input = gs.Reader()
except AttributeError:
    input = gs.read_csv_folder(INPUT_DIR)
    fallback_version = True

In [5]:
def process_dataframe(df, file_name):
    print(f"Processing data from: {file_name}")
    
    df = rename_columns_to_snake_case(df)
    
    stream_name = determine_stream_name(df)
    
    if stream_name == "unknown_stream":
        print(f"Unknown data type in {file_name}. Skipping.")
        return
    
    df['index'] = df.index
    df['tenant'] = TENANT_ID
    df['timestamp'] = str(time.time())
    df['id'] = df['index'].astype(str) + "-" + df['tenant'].astype(str) + "-" + df['timestamp'].astype(str)
    df['created'] = datetime.utcnow().isoformat()
    
    if stream_name == "my_stream":
        # Add any specific processing here
        pass
    
    output_file_path = f"{OUTPUT_DIR}/{stream_name}.csv"
    df.to_csv(output_file_path, index=False, mode='a', header=not os.path.exists(output_file_path))
    
    gs.to_singer(df, stream_name, OUTPUT_DIR, keys=["id"])
    
    print(f"Processed {stream_name}")
    print(df.head())

In [6]:
# Get all Excel and CSV files in the input directory
input_files = glob(f"{INPUT_DIR}/*.xlsx") + glob(f"{INPUT_DIR}/*.csv")

for file_path in input_files:
    file_extension = os.path.splitext(file_path)[1].lower()
    
    if file_extension == '.xlsx':
        # Read all sheets from the Excel file
        input_data = pd.read_excel(file_path, sheet_name=None)
        
        # Process each sheet in the Excel file
        for sheet_name, df in input_data.items():
            process_dataframe(df, f"{file_path} - {sheet_name}")
    
    elif file_extension == '.csv':
        # Read the CSV file
        df = pd.read_csv(file_path)
        process_dataframe(df, file_path)
    
    else:
        print(f"Unsupported file type: {file_path}")

print("All files processed.")

Processing data from: ./sync-output/members.xlsx - Members
Processed members_stream
                 customer                         email           phone  \
0    William Cannon-NOTES  william.cannon1977@yahoo.com  (580) 284-0168   
1          Bethany  Davis         bethndavis4@gmail.com  (270) 307-4698   
2  Ken & Virginia Moulton           kenhmoulton@aol.com  (580) 531-2466   
3   Martine & Henry Poppe           henry.poppe@att.net  (580) 248-3103   
4          Susan Pueschel          sepueschel@gmail.com  (580) 585-1648   

  active_member                         street apt_suite    city state  \
0           Yes       325 Mountain Ridge Place       NaN  Lawton    OK   
1           Yes  283 Northeast Creekside Drive       NaN   Elgin    OK   
2           Yes       7825 Northwest Brady Way       NaN  Lawton    OK   
3           Yes       418 Southwest 23rd Place       NaN  Lawton    OK   
4           Yes  639 Northwest Waterford Drive       NaN  Lawton    OK   

   zip_code membersh