In [31]:
# Importing the required modules and packages
import numpy as np
import boto3
from config import aws_access_key_id, aws_secret_access_key, bucket_name
import requests
from config import password
import os
import pandas as pd
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import IntegrityError

In [32]:
# Importing the required modules and packages
import boto3
from config import aws_access_key_id, aws_secret_access_key, bucket_name
import pandas as pd
import os

# Setting up the S3 bucket and creating a connection to it using the credentials
bucket_name = bucket_name
aws_access_key_id = aws_access_key_id
aws_secret_access_key = aws_secret_access_key
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

# Looping through each file in the bucket and reading them into DataFrames
dataframe_dict = {}
for obj in s3.list_objects(Bucket=bucket_name)['Contents']:
    file_name = obj['Key']
    s3_object = s3.get_object(Bucket=bucket_name, Key=file_name)
    file_extension = os.path.splitext(file_name)[1]
    if file_extension == '.xlsx':
        data_dict = pd.read_excel(s3_object['Body'], sheet_name=None)
        data_list = []
        for key in data_dict:
            data_list.append(data_dict[key])
        data = pd.concat(data_list, ignore_index=True)
        dataframe_dict[file_name] = data
    elif file_extension == '.csv':
        data = pd.read_csv(s3_object['Body'])
        dataframe_dict[file_name] = data
    else:
        raise ValueError(f"Unsupported file type: {file_extension}")
dataframe_dict

{'cardio_cancer_resp.csv':        FIPS                  County  cardio_death  total_cancer  bladder  \
 0         0                      US         216.3         449.4     19.4   
 1      2013  Aleutians East, Alaska          95.1         142.2      0.0   
 2      2016  Aleutians West, Alaska          89.8         378.4      0.0   
 3      2020       Anchorage, Alaska         177.0         440.1     17.9   
 4      2050          Bethel, Alaska         263.4         397.4      0.0   
 ...     ...                     ...           ...           ...      ...   
 3130  55113       Sawyer, Wisconsin         227.8         495.6     28.5   
 3131  55115      Shawano, Wisconsin         218.7         446.9     19.7   
 3132  55119       Taylor, Wisconsin         206.7         412.9     14.0   
 3133  55125        Vilas, Wisconsin         227.9         504.3     31.4   
 3134  55129     Washburn, Wisconsin         204.0         457.9     19.9   
 
       brain  breast  breast_insitu  cervix  col

In [33]:
# Creating a connection to the PostgreSQL database using the password and the database name

engine = create_engine(f'postgresql://postgres:{password}@localhost:5432/swatdatabase')

In [34]:
# Loop through each file in dataframe_dict
for file_name, df in dataframe_dict.items():
    # Check if 'df' is empty
    if df.empty:
        continue

    # CLEANING PART 1
    # Clean each df before adding them to the new_df
    # Specific case because of repetitive numbers. Check if both 'geo' and 'id' are in column names
    geo_id_cols = [col for col in df.columns if 'geo' in col.lower() and 'id' in col.lower()]

    # Loop through each geo_id column and determine which has a first value with a shorter length
    drop_col = None
    for col in geo_id_cols:
        first_val = str(df[col][0])
        if drop_col is None or len(first_val) > len(str(df[drop_col][0])):
            drop_col = col

    # Drop the column associated with the longer first value
    if drop_col is not None:
        df.drop(columns=[drop_col], inplace=True)

    # CLEANING PART 2
    df.columns = df.columns.to_series().apply(lambda x: x.strip().lower())
    df.columns = df.columns.str.replace('ovay', 'ovary', regex=True)
    df.columns = df.columns.str.replace("(?i)fips|geo[_ ]?id", "fips", regex=True)
    df.columns = df.columns.str.replace("[-\s\:]+", "_", regex=True)
    df.columns = df.columns.str.replace("\.", "_", regex=True)


    # CREATE DATABASE TABLE
    # Determine the data type of each column
    dtype_dict = {}
    for col in df.columns:
        if df[col].dtype == "int":
            dtype_dict[col] = "INTEGER"
        elif df[col].dtype == "float":
            dtype_dict[col] = "FLOAT"
        else:
            dtype_dict[col] = "TEXT"

    # Create SQL query to create the database table
    query = f"CREATE TABLE IF NOT EXISTS {file_name.split('.')[0]} ("
    for col in df.columns:
        query += f"{col} {dtype_dict[col]}, "
    query = query[:-2] + ");"

    # Execute SQL query to create the database table
    with engine.connect() as conn:
        conn.execute(query)

    # INSERT DATA INTO DATABASE TABLE
    try:
        df.to_sql(file_name.split('.')[0], engine, if_exists="append", index=False)
    except IntegrityError as e:
        print(f"Error inserting data into table {file_name.split('.')[0]}: {e}")
        continue
    

In [35]:
# Create an empty dataframe to store the joined data
joined_df = None

# Loop through each dataframe in the dictionary
for key, df in dataframe_dict.items():

    # Check if the dataframe has a 'fips' column
    if 'fips' not in df.columns:
        continue

    # Select columns that contain 'fips' in their name
    fips_cols = [col for col in df.columns if 'fips' in col.lower()]

    # If there are at least two 'fips' columns, join them into a single column
    if len(fips_cols) >= 2:
        # Create a new dataframe with only the 'fips' columns
        fips_df = df[fips_cols]

        # Join the 'fips' columns into a single column
        joined_fips = fips_df.apply(lambda x: ''.join(x.astype(str)), axis=1)

        # Rename the joined column to 'fips'
        df.rename(columns={joined_fips.name: 'fips'}, inplace=True)

    # Append the cleaned dataframe to the joined dataframe
    if joined_df is None:
        joined_df = df
    else:
        joined_df = joined_df.merge(df, on='fips', how='outer')

# Print the joined dataframe
joined_df
# CREATE A TABLE FROM THE JOINED DATAFRAME
table_name = 'us_df'
if inspect(engine).has_table(table_name):
    print(f"Table {table_name} already exists, skipping table creation")
else:
    # Determine the data type of each column
    dtype_dict = {}
    for col in joined_df.columns:
        if joined_df[col].dtype == "int64":
            dtype_dict[col] = "INTEGER"
        elif joined_df[col].dtype == "float64":
            dtype_dict[col] = "FLOAT"
        else:
            dtype_dict[col] = "TEXT"

    # Create SQL query to create the database table
    query = f"CREATE TABLE {table_name} ("
    for col in joined_df.columns:
        query += f"{col} {dtype_dict[col]}, "
    query = query[:-2] + ");"

    # Execute SQL query to create the database table
    with engine.connect() as conn:
        conn.execute(query)

# INSERT DATA INTO DATABASE TABLE
try:
    joined_df.to_sql(table_name, engine, if_exists="append", index=False)
except IntegrityError as e:
    print(f"Error inserting data into table {table_name}: {e}")

In [36]:
# Create an empty dataframe to store the joined data
joined_df = None

# Loop through each dataframe in the dictionary
for key, df in dataframe_dict.items():

    # Check if the dataframe has a 'fips' column and a 'rate' column
    if 'fips' not in df.columns or 'rate' not in df.columns:
        continue

    # Drop rows where the rate column is zero or null
    df = df[(df['rate'].notnull()) & (df['rate'] != 0)]

    # Select columns that contain 'fips' in their name
    fips_cols = [col for col in df.columns if 'fips' in col.lower()]

    # If there are at least two 'fips' columns, join them into a single column
    if len(fips_cols) >= 2:
        # Create a new dataframe with only the 'fips' columns
        fips_df = df[fips_cols]

        # Join the 'fips' columns into a single column
        joined_fips = fips_df.apply(lambda x: ''.join(x.astype(str)), axis=1)

        # Rename the joined column to 'fips'
        df.rename(columns={joined_fips.name: 'fips'}, inplace=True)

    # Append the cleaned dataframe to the joined dataframe
    if joined_df is None:
        joined_df = df
    else:
        joined_df = joined_df.merge(df, on='fips', how='outer')

# Print the joined dataframe
joined_df

# CREATE A TABLE FROM THE JOINED DATAFRAME
table_name = 'us_no_null_rate_df'
if inspect(engine).has_table(table_name):
    print(f"Table {table_name} already exists, skipping table creation")
else:
    # Determine the data type of each column
    dtype_dict = {}
    for col in joined_df.columns:
        if joined_df[col].dtype == "int64":
            dtype_dict[col] = "INTEGER"
        elif joined_df[col].dtype == "float64":
            dtype_dict[col] = "FLOAT"
        else:
            dtype_dict[col] = "TEXT"

    # Create SQL query to create the database table
    query = f"CREATE TABLE {table_name} ("
    for col in joined_df.columns:
        query += f"{col} {dtype_dict[col]}, "
    query = query[:-2] + ");"

    # Execute SQL query to create the database table
    with engine.connect() as conn:
        conn.execute(query)

# INSERT DATA INTO DATABASE TABLE
try:
    joined_df.to_sql(table_name, engine, if_exists="append", index=False)
except IntegrityError as e:
    print(f"Error inserting data into table {table_name}: {e}")


AttributeError: 'NoneType' object has no attribute 'columns'

In [None]:
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy import inspect
import pandas as pd

# Create an empty dataframe to store the joined data
joined_df = None

# Loop through each dataframe in the dictionary
for key, df in dataframe_dict.items():

    # Check if the dataframe has a 'fips' column
    if 'fips' not in df.columns:
        continue

    # Select columns that contain 'fips' in their name
    fips_cols = [col for col in df.columns if 'fips' in col.lower()]

    # If there are at least two 'fips' columns, join them into a single column
    if len(fips_cols) >= 2:
        # Create a new dataframe with only the 'fips' columns
        fips_df = df[fips_cols]

        # Join the 'fips' columns into a single column
        joined_fips = fips_df.apply(lambda x: ''.join(x.astype(str)), axis=1)

        # Rename the joined column to 'fips'
        df.rename(columns={joined_fips.name: 'fips'}, inplace=True)

    # Drop the 'latitude' and 'longitude' columns if they exist
    if 'latitude' in df.columns and 'longitude' in df.columns:
        df.drop(columns=['latitude', 'longitude'], inplace=True)

    # Drop all rows containing null or zero values
    df = df.replace({0: np.nan}).dropna()

    # Append the cleaned dataframe to the joined dataframe
    if joined_df is None:
        joined_df = df
    else:
        joined_df = joined_df.merge(df, on='fips', how='outer')

# Print the joined dataframe
joined_df = joined_df.dropna()

# CREATE A TABLE FROM THE JOINED DATAFRAME
table_name = 'us_no_nulls_no_zeroes_df'
if inspect(engine).has_table(table_name):
    print(f"Table {table_name} already exists, skipping table creation")
else:
    # Determine the data type of each column
    dtype_dict = {}
    for col in joined_df.columns:
        if joined_df[col].dtype == "int64":
            dtype_dict[col] = "INTEGER"
        elif joined_df[col].dtype == "float64":
            dtype_dict[col] = "FLOAT"
        else:
            dtype_dict[col] = "TEXT"

    # Create SQL query to create the database table
    query = f"CREATE TABLE {table_name} ("
    for col in joined_df.columns:
        query += f"{col} {dtype_dict[col]}, "
    query = query[:-2] + ");"

    # Execute SQL query to create the database table
    with engine.connect() as conn:
        conn.execute(query)

# INSERT DATA INTO DATABASE TABLE
try:
    joined_df.to_sql(table_name, engine, if_exists="append", index=False)
except IntegrityError as e:
    print(f"Error inserting data into table {table_name}: {e}")

{'us_no_nulls_no_zeroes_df':         fips                  county_x  cardio_death  total_cancer  bladder  \
 0     6001.0       Alameda, California         169.6         383.9     14.9   
 1     6001.0       Alameda, California         169.6         383.9     14.9   
 2     6001.0       Alameda, California         169.6         383.9     14.9   
 3     6001.0       Alameda, California         169.6         383.9     14.9   
 4     6001.0       Alameda, California         169.6         383.9     14.9   
 ..       ...                       ...           ...           ...      ...   
 912  44007.0  Providence, Rhode Island         208.0         460.8     22.8   
 913  44007.0  Providence, Rhode Island         208.0         460.8     22.8   
 914  44007.0  Providence, Rhode Island         208.0         460.8     22.8   
 915  44007.0  Providence, Rhode Island         208.0         460.8     22.8   
 916  44007.0  Providence, Rhode Island         208.0         460.8     22.8   
 
      brai