In [1]:
# Install psycopg2 if not already installed. You can do this in your
# terminal or command prompt:
# pip install psycopg2-binary

# SQL database setup
import sqlalchemy
import pandas as pd
import os
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table, Column, String, Integer, Float, Boolean, Date, ForeignKey
from sqlalchemy import inspect
from sqlalchemy import exc
import json

# Define the target directory
target_dir = r"C:\Users\asg_a_1p8y6mm\OneDrive\Desktop\WIOA Training\DataAnalytics\Module 13\Module 13; Project 2\Crowdfunding_ETL\Resources"

# Create the target directory if it doesn't exist
os.makedirs(target_dir, exist_ok=True)

# Create the database engine using a variable for the db connection string
db_string = f"postgresql://postgres:postgres@127.0.0.1:5432/crowdfunding_db"
engine_default = create_engine(db_string)


# ### Import the CSV files into DataFrames
category_df = pd.read_csv(os.path.join(target_dir, 'category.csv'))
subcategory_df = pd.read_csv(os.path.join(target_dir, 'subcategory.csv'))
contacts_df = pd.read_csv(os.path.join(target_dir, 'contacts.csv'))
campaign_df = pd.read_csv(os.path.join(target_dir, 'campaign.csv'))


# ### Create the crowdfunding_db database
try:
    conn = engine_default.connect()
    result = conn.execute(sqlalchemy.text(
        "SELECT 1 FROM pg_database WHERE datname='crowdfunding_db';"
    )).fetchone()

    if result is None:  # No existing database
        conn.execution_options(isolation_level="AUTOCOMMIT").execute(sqlalchemy.text("CREATE DATABASE crowdfunding_db;"))
        print("New crowdfunding database created")
    else:
        print("Crowdfunding database already exists")  # Database
    conn.close()
except Exception as e:
    print(f"Error connecting to or creating database: {e}")

# Re-create engine to connect to the new database
engine = create_engine(db_string)
conn = engine.connect()

# Create Metadata object
metadata = MetaData()

# Define the tables
category_table = Table('category', metadata,
    Column('category_id', String, primary_key=True),
    Column('category', String)
)

subcategory_table = Table('subcategory', metadata,
    Column('subcategory_id', String, primary_key=True),
    Column('subcategory', String)
)

contacts_table = Table('contacts', metadata,
    Column('contact_id', Integer, primary_key=True),
    Column('first_name', String),
    Column('last_name', String),
    Column('email', String)
)

campaign_table = Table('campaign', metadata,
    Column('cf_id', Integer, primary_key=True),
    Column('contact_id', Integer, ForeignKey('contacts.contact_id')),
    Column('company_name', String),
    Column('description', String),
    Column('goal', Float),
    Column('pledged', Float),
    Column('outcome', String),
    Column('backers_count', Integer),
    Column('country', String),
    Column('currency', String),
    Column('launched_date', Date),
    Column('end_date', Date),
    Column('category_id', String, ForeignKey('category.category_id')),
    Column('subcategory_id', String, ForeignKey('subcategory.subcategory_id'))
)

# Create all database tables, *replacing* existing ones
# If replacing the tables, drop them in reverse order of dependencies first, before creating
try:
    metadata.drop_all(engine)
    print("Existing tables dropped")
except exc.OperationalError as e:
    print(f"Tables didn't exist: {e}")
metadata.create_all(engine)
print("Tables created")

def infer_mongodb_schema(df):
    """
    Infers a MongoDB-like schema from a Pandas DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame to inspect.

    Returns:
        dict: A dictionary representing the inferred schema.
    """
    schema = {}
    for col in df.columns:
        col_type = str(df[col].dtype)
        
        # Attempt to infer sub-type based on first non-null element
        sample_value = df[col].dropna().iloc[0] if not df[col].dropna().empty else None
        subtype = None
        if sample_value is not None:
           if isinstance(sample_value, str):
               try:
                   pd.to_datetime(sample_value)
                   subtype = "date"
               except:
                   pass 
        
        if col_type.startswith('int'):
            col_type = "int"
        elif col_type.startswith('float'):
            col_type = "float"
        elif col_type.startswith('date'):
          col_type = "date"
        else:
            col_type = "string"

        # Use subtype if one was found
        if subtype:
           schema[col] = {"type": subtype}
        else:
           schema[col] = {"type": col_type}

    return schema

def generate_mongodb_schemas(category_df, subcategory_df, contacts_df, campaign_cleaned):
    """
    Generates MongoDB schemas for all the dataframes

    Args:
       category_df (pd.DataFrame): Category DataFrame to inspect
       subcategory_df (pd.DataFrame): Subcategory DataFrame to inspect
       contacts_df (pd.DataFrame): Contacts DataFrame to inspect
       campaign_cleaned (pd.DataFrame): Cleaned Campaign DataFrame to inspect
    Returns:
        dict: A dictionary that holds each of the dataframes schemas
    """
    schemas = {}
    schemas['category_schema'] = infer_mongodb_schema(category_df)
    schemas['subcategory_schema'] = infer_mongodb_schema(subcategory_df)
    schemas['contacts_schema'] = infer_mongodb_schema(contacts_df)
    schemas['campaign_schema'] = infer_mongodb_schema(campaign_cleaned)
    return schemas

campaign_cleaned = campaign_df.copy()
campaign_cleaned = campaign_cleaned.rename(columns={"blurb": "description"})

# Correctly dropping columns
columns_to_drop = ['category', 'sub-category', 'category_x','category_y', 'subcategory', 'category & sub-category', 'sub-category']
campaign_cleaned = campaign_cleaned.drop(columns=[col for col in columns_to_drop if col in campaign_cleaned.columns])

# Keep only necessary columns
campaign_cleaned = campaign_cleaned[['cf_id', 'contact_id', 'company_name', 'description', 'goal', 'pledged', 'outcome',
                                    'backers_count', 'country', 'currency', 'launched_date', 'end_date',
                                    'category_id', 'subcategory_id']]
# Map category_id to match those in the category table
category_mapping = {
    'catl': '1',
    'cat2': '2',
    'cat3': '3',
    'cat4': '4',
    'cat5': '5',
    'cat6': '6',
    'cat7': '7',
    'cat8': '8',
    'cat9': '9'
}
campaign_cleaned['category_id'] = campaign_cleaned['category_id'].map(category_mapping)

# Map subcategory_id to match those in the subcategory table
subcategory_mapping = {
    'subcatl': '1',
    'subcat2': '2',
    'subcat3': '3',
    'subcat4': '4',
    'subcat5': '5',
    'subcat6': '6',
    'subcat7': '7',
    'subcat8': '8',
    'subcat9': '9',
    'subcat10': '10',
    'subcat11': '11',
    'subcat12': '12',
    'subcat13': '13',
    'subcat14': '14',
    'subcat15': '15',
    'subcat16': '16',
    'subcat17': '17',
    'subcat18': '18',
    'subcat19': '19',
    'subcat20': '20',
    'subcat21': '21',
    'subcat22': '22',
    'subcat23': '23',
    'subcat24': '24'
}
campaign_cleaned['subcategory_id'] = campaign_cleaned['subcategory_id'].map(subcategory_mapping)

# Convert launched_date and end_date to date without specifying unit
campaign_cleaned["launched_date"] = pd.to_datetime(campaign_cleaned["launched_date"]).dt.date
campaign_cleaned["end_date"] = pd.to_datetime(campaign_cleaned["end_date"]).dt.date

# Generate schemas
schemas = generate_mongodb_schemas(category_df, subcategory_df, contacts_df, campaign_cleaned)

# Save the MongoDB-like schema to a JSON file in the target directory
json_filepath = os.path.join(target_dir, 'crowdfunding_db_schema.json')
with open(json_filepath, 'w') as f:
    json.dump(schemas, f, indent=4)
print(f"MongoDB-like schema saved to '{json_filepath}'")

# Save the Postgres schema to a SQL file in the target directory
sql_filepath = os.path.join(target_dir, 'crowdfunding_db_schema.sql')
with open(sql_filepath, 'w') as f:
    for table in metadata.sorted_tables:
        f.write(str(sqlalchemy.schema.CreateTable(table).compile(engine)) + ";\n")
print(f"Postgres schema saved to '{sql_filepath}'")

# Import the DataFrames
category_df.to_sql('category', con=conn, if_exists='append', index=False)
subcategory_df.to_sql('subcategory', con=conn, if_exists='append', index=False)
contacts_df.to_sql('contacts', con=conn, if_exists='append', index=False)

# Verify that the data was imported correctly
print("Category table data:")
print(pd.read_sql_query("SELECT * FROM category", engine))
print("\nSubcategory table data:")
print(pd.read_sql_query("SELECT * FROM subcategory", engine))
print("\nContacts table data:")
print(pd.read_sql_query("SELECT * FROM contacts", engine))


# Import the cleaned DataFrame into the campaign table
try:
    campaign_cleaned.to_sql('campaign', con=conn, if_exists='append', index=False)
    print("\nCampaign table data:")
    print(pd.read_sql_query("SELECT * FROM campaign", engine))
except Exception as e:
    print(f"Error importing campaign data: {e}")

conn.close()

# Verify that the tables were created
inspector = inspect(engine)
print("Tables in the database: ", inspector.get_table_names())

Crowdfunding database already exists
Existing tables dropped
Tables created
MongoDB-like schema saved to 'C:\Users\asg_a_1p8y6mm\OneDrive\Desktop\WIOA Training\DataAnalytics\Module 13\Module 13; Project 2\Crowdfunding_ETL\Resources\crowdfunding_db_schema.json'
Postgres schema saved to 'C:\Users\asg_a_1p8y6mm\OneDrive\Desktop\WIOA Training\DataAnalytics\Module 13\Module 13; Project 2\Crowdfunding_ETL\Resources\crowdfunding_db_schema.sql'
Category table data:
  category_id      category
0           1          food
1           2         music
2           3    technology
3           4       theater
4           5  film & video
5           6    publishing
6           7         games
7           8   photography
8           9    journalism

Subcategory table data:
   subcategory_id        subcategory
0               1        food trucks
1               2               rock
2               3                web
3               4              plays
4               5        documentary
5          