## Processing and Storing Consumer Complaint Data in GCP PostgreSQL 

### The provided code connects to a PostgreSQL database, creates a new table, loads data from a CSV file into a pandas DataFrame, and inserts the data from the DataFrame into the table. The code then demonstrates different methods of fetching data from the database into a pandas DataFrame.

In [None]:
# Required modules are imported.
import psycopg2
from psycopg2 import extras
import pandas as pd

In [None]:
# Configuration for PostgreSQL database connection.
user = "user name"
password = "pwd"
database = "cfpb"
host = "ip_address"

# Connecting to the PostgreSQL server using provided details.
conn = psycopg2.connect(
    dbname=database,
    user=user,
    password=password,
    host=host
)

# Creating a cursor object to execute SQL queries.
cur = conn.cursor()

In [None]:
# Test the connection by querying the current time.
cur.execute("SELECT NOW()")
result = cur.fetchone()
print(f"Current time: {result[0]}")

In [None]:
# Preparing data for update/create table.
# Load a CSV file into a pandas DataFrame.
complaint_file = "you complaint file here, should called complaints.csv"
cfpb = pd.read_csv(complaint_file)
print("Sanity check： ", cfpb.shape)

In [None]:
# Converting date formats and handling missing data.
def convert_date_format(date):
    try:
        return pd.to_datetime(date, format='%m/%d/%y').strftime('%Y-%m-%d')
    except ValueError:
        return date

cfpb['Date received'] = cfpb['Date received'].apply(convert_date_format)
cfpb['Date sent to company'] = cfpb['Date sent to company'].apply(convert_date_format)
cfpb['Complaint ID'].fillna(0, inplace=True)
cfpb['Complaint ID'] = cfpb['Complaint ID'].astype(float)

# Rename columns to make them SQL-friendly (remove spaces and special characters).
rename_dict = {
    'Date received': 'Date_received',
    'Product': 'Product',
    'Sub-product': 'Sub_product',
    'Issue': 'Issue',
    'Sub-issue': 'Sub_issue',
    'Consumer complaint narrative': 'Consumer_complaint_narrative',
    'Company public response': 'Company_public_response',
    'Company': 'Company',
    'State': 'State',
    'ZIP code': 'ZIP_code',
    'Tags': 'Tags',
    'Consumer consent provided?': 'Consumer_consent_provided',
    'Submitted via': 'Submitted_via',
    'Date sent to company': 'Date_sent_to_company',
    'Company response to consumer': 'Company_response_to_consumer',
    'Timely response?': 'Timely_response',
    'Consumer disputed?': 'Consumer_disputed',
    'Complaint ID': 'Complaint_ID'
}
cfpb.rename(columns=rename_dict, inplace=True)

In [None]:
# Create a new table in the PostgreSQL database with the given schema.
table_creation_query = """
    CREATE TABLE cfpb (
        id SERIAL PRIMARY KEY,
        "Date_received" DATE,
        "Product" VARCHAR(255),
        "Sub_product" VARCHAR(255),
        "Issue" VARCHAR(255),
        "Sub_issue" VARCHAR(255),
        "Consumer_complaint_narrative" TEXT,
        "Company_public_response" VARCHAR(255),
        "Company" VARCHAR(255),
        "State" VARCHAR(255),
        "ZIP_code" VARCHAR(255),
        "Tags" VARCHAR(255),
        "Consumer_consent_provided" VARCHAR(255),
        "Submitted_via" VARCHAR(255),
        "Date_sent_to_company" DATE,
        "Company_response_to_consumer" VARCHAR(255),
        "Timely_response" VARCHAR(255),
        "Consumer_disputed" VARCHAR(255),
        "Complaint_ID" INT UNIQUE
    );
"""
cur.execute(table_creation_query)
conn.commit()

In [None]:
# Insert/update the table with data from the DataFrame.
# Convert the DataFrame into a list of tuples to insert into the table.
records = cfpb.to_records(index=False)
result = list(records)

# Using a SQL query template to insert values and handle conflicts.
insert_query_base = """
INSERT INTO cfpb ("Date_received", "Product", "Sub_product", "Issue", "Sub_issue", "Consumer_complaint_narrative", 
"Company_public_response", "Company", "State", "ZIP_code", "Tags", "Consumer_consent_provided", "Submitted_via", 
"Date_sent_to_company", "Company_response_to_consumer", "Timely_response", "Consumer_disputed", "Complaint_ID") 
VALUES %s ON CONFLICT ("Complaint_ID") DO NOTHING
"""
psycopg2.extras.execute_values(cur, insert_query_base, result)
conn.commit()

There are three different methods (two commented out) demonstrating how to fetch the result of the query into a DataFrame.

In [None]:
# Sample code to demonstrate how one might interact with a sidebar tool to run SQL queries.
query = f"""
SELECT *
FROM cfpb
WHERE STATES IN ({state})
ORDER BY random()
LIMIT 100;
"""

df = pd.read_sql_query(query, conn)

In [None]:
# Close the cursor and the database connection to release resources.
cur.close()
conn.close()