In [13]:
from urllib.parse import quote_plus
import mysql.connector
from mysql.connector import Error
import pandas as pd
from sqlalchemy import create_engine,text

In [14]:
def connection(hostname, username, password, dbname):
    # Initialize cinnection to None
    con = None

    # Encode the password
    password = quote_plus(password)

    # Create connection while checking for any errors
    try:
        con = mysql.connector.connect(host=hostname, user=username, passwd = password, database  = dbname)
        print("Connection successful!")
    except Error as e:
        print(f"The error {e} has occured.")
    
    return con

def engine(hostname, username, password, dbname, port):
    # Create engine
    eng = create_engine(f'mysql+pymysql://{username}:{password}@{hostname}:{port}/{dbname}')
    return eng

In [15]:
def csv_to_staging(datafile):
    # Create engine using the pre defined engine function
    eng = engine("localhost", "root", "Layaldbroot1997", "feedback_source", "3306")

    # Load csv to dataframe
    df = pd.read_csv(datafile, sep=', ', engine='python')

    # Load dataframe to staging schema
    df.to_sql(name="feedback",con=eng, schema="feedback_source",if_exists="replace",index = False)

csv_to_staging("../Data/sentiment-analysis.csv")

def extract_source():
    # Create connection and cursor
    conn = connection("localhost", "root", "Layaldbroot1997", "feedback_source")
    cursor = conn.cursor()

    # The query extracts all data in the staging schema
    query = "Select * from feedback_source.feedback"
    cursor.execute(query)
    data = cursor.fetchall()

    # Define columns names and load cursor data to a dataframe
    column_names = [i[0] for i in cursor.description]
    df = pd.DataFrame(data, columns=column_names)

    # Extract to a csv file
    df.to_csv("../Data/Extracted.csv", index=False)

extract_source()

Connection successful!


In [16]:
def transform_feedback():
    # Extract the data from staging area
    df = pd.read_csv("../Data/Extracted.csv")

    # Take a copy of the df
    df = df.copy()

    # Drop dupicates
    df =  df.drop_duplicates()

    # Rename columns properly
    df.rename(columns={'"Text': 'Text', 'Confidence Score"': 'Confidence Score'}, inplace=True)

    # Remove characters from values
    df.loc[:,'Text'] = df['Text'].str.replace(r'\"', '', regex=True)
    df.loc[:,'Confidence Score'] = df['Confidence Score'].str.replace(r'\"', '', regex=True)
    df.loc[:,'User ID'] = df['User ID'].str.replace(r'@', '', regex=True)

    # Change some data types
    df['Date/Time'] = pd.to_datetime(df['Date/Time'])
    df['Confidence Score'] = df['Confidence Score'].astype(dtype='float')

    # Save transformed data in a csv
    df.to_csv("../Data/Transformed.csv", index=False)

transform_feedback()

In [23]:
def load_sources():
    # Load transformd data into a dataframe
    df = pd.read_csv("../Data/Transformed.csv")

    # Create engine
    eng = engine("localhost", "root", "Layaldbroot1997", "feedback_dwh", "3306")

    # SQL commands to drop and create foreign key
    drop_fk = text("ALTER TABLE fact_feedback DROP FOREIGN KEY source_id;")
    add_fk = text("""
    ALTER TABLE fact_feedback
    ADD CONSTRAINT source_id
    FOREIGN KEY (source_id) REFERENCES dim_source(id);
    """)
    id_type = text("ALTER TABLE dim_source MODIFY COLUMN id INT;")

    # Load distinct sources to a dataframe
    sources = df[['Source']].drop_duplicates()

    # Generate ids for the sources
    sources = sources.reset_index(drop=True)
    sources['id'] = sources.index + 1 

    # Organize the columns
    sources = sources[['id', 'Source']]

    with eng.connect() as connection:
        # Drop foreign key
        connection.execute(drop_fk)

        # Load the sources to the corresponding table in the data warehouse
        sources.to_sql(name="dim_source", con=eng, schema="feedback_dwh", if_exists="replace", index=False)

        # Add foreign key
        connection.execute(id_type)
        connection.execute(add_fk)

    # Save sources into a csv
    sources.to_csv("../Data/sources.csv", index=False)

load_sources()

OperationalError: (pymysql.err.OperationalError) (3780, "Referencing column 'source_id' and referenced column 'id' in foreign key constraint 'source_id' are incompatible.")
[SQL: 
    ALTER TABLE fact_feedback
    ADD CONSTRAINT source_id
    FOREIGN KEY (source_id) REFERENCES dim_source(id);
    ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [37]:
def load_locations():
    # Load transformd data into a dataframe
    df = transform_feedback()

    # Create engine
    eng = engine("localhost", "root", "Layaldbroot1997", "feedback_dwh", "3306")

    # Load distinct locations to a dataframe
    locations = df[['Location']].drop_duplicates().reset_index(drop=True)

    # Generate ids for the locations
    locations = locations.reset_index(drop=True)
    locations['id'] = locations.index + 1

    # Organize the columns
    locations = locations[['id', 'Location']]

    # Load the locations to the corresponding table in the data warehouse
    locations.to_sql(name="dim_location", con=eng, schema="feedback_dwh", if_exists="replace", index=False)

    return
load_locations()

Connection successful!


In [38]:
def load_sentiments():
    # Load transformd data into a dataframe
    df = transform_feedback()

    # Create engine
    eng = engine("localhost", "root", "Layaldbroot1997", "feedback_dwh", "3306")

    # Load distinct locations to a dataframe
    sentiments = df[['Sentiment']].drop_duplicates().reset_index(drop=True)

    # Generate ids for the locations
    sentiments = sentiments.reset_index(drop=True)
    sentiments['id'] = sentiments.index + 1

    # Organize the columns
    sentiments = sentiments[['id', 'Sentiment']]

    # Load the locations to the corresponding table in the data warehouse
    sentiments.to_sql(name="dim_sentiment", con=eng, schema="feedback_dwh", if_exists="replace", index=False)

    return
load_sentiments()

Connection successful!
