# Project 1 for Peter Saliba and Qais Youssef

### Import libraries

In [None]:
import pandas as pd
import pymongo
import os
from sqlalchemy import create_engine, text
import pymysql

# Using the sakila database (MySQL) preloaded in the Azure remote desktop

## Connecting to the database

In [None]:
# functions for mysql
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "UVA!1819"
srcDB = "sakila"
destDB = "destDB"
def createConnectionString(database):
    return f"mysql+pymysql://root:{pwd}@localhost/{database}"
def createDatabase(database):
    conn_str = createConnectionString("sys") 
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    with sqlEngine.connect() as conn:
        conn.execute(text(f"DROP DATABASE IF EXISTS `{database}`;"))
        conn.execute(text(f"CREATE DATABASE `{database}`;"))
        conn.execute(text(f"USE {database};"))
        
def pullDF(table, database):
    conn_str = createConnectionString(database)
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(text(f"SELECT * FROM {table}"), connection);
    connection.close()
    return dframe

def pushDF(df, table, database, pk):
    conn_str = createConnectionString(database)
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    with sqlEngine.connect() as conn:
        df.to_sql(table, con=conn, index=False, if_exists='replace')
        conn.execute(text(f"ALTER TABLE {table} ADD PRIMARY KEY ({pk})"))

### Get the payment table from the sakila db

In [None]:
payment_tocsv = pullDF("payment",srcDB)
payment_tocsv.head(1)

### Get the staff table from the sakila db

In [None]:
staff_df = pullDF("staff",srcDB)
staff_df.head(1)

### Get the store table from sakila db

In [None]:
store_tomongo = pullDF("store",srcDB)
store_tomongo.head(1)

## To csv

In [None]:
payment_tocsv.to_csv("sakila_payment.csv", sep='\t', encoding='utf-8')

In [None]:
payment_df = pd.read_csv("sakila_payment.csv", sep="\t")
payment_df.head(1)

## To MongoDB

In [None]:
atlas_cluster_name = "sandbox.zibbf"
atlas_user_name = "local"
atlas_password = "peterqais"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

print({conn_str['local']})

def get_mongo_dataframe(connect_str, db_name, collection, query):
    #Create a connection to MongoDB
    client = pymongo.MongoClient(connect_str)
    
    #Query MongoDB, and fill a python list with documents to create a DataFram
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

In [None]:
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["local"]
db["store"].insert_many(store_tomongo.to_dict('records'))

In [None]:
store_df = get_mongo_dataframe(conn_str["local"],"local","store",{})
store_df.head(1)

## Reset the destination DB

In [None]:
createDatabase(destDB)

In [None]:
store_df.head(1)
staff_df.head(1)
payment_df.head(1)

# Create the dimension table at this point! This can be accessed through the SQL code by running it

In [None]:
try: 
    date_dim = pullDF("dim_date", destDB)
    date_dim["full_date"] = date_dim["full_date"].astype('datetime64[ns]')
    date_dim = pd.DataFrame(date_dim, columns=["full_date", "date_key"])
    date_dim.head(1)
except: 
    print("something wrong with dimension table. rerun the sql that creates the dimension table.")

# Transform and Load

In [None]:
def getDate(df,field):
    return df[field].apply(lambda x:str(x).split(" ")[0])

def getTime(df,field):
    return df[field].apply(lambda x:str(x).split(" ")[1])

In [None]:
store_df["Store Update Date"] = getDate(store_df, "last_update").astype('datetime64[ns]')
store_df["Store Update Time"] = getTime(store_df, "last_update")
store_df["store_key"] = store_df.index
store_df = store_df.merge(date_dim, how="left", left_on="Store Update Date", right_on="full_date")
store_dim = pd.DataFrame(store_df,
                        columns = ["store_key", "date_key", "Store Update Time"])
staff_df["staff_key"] = staff_df.index
staff_df["Staff Update Date"] =  getDate(staff_df, "last_update").astype('datetime64[ns]')
staff_df["Staff Update Time"] = getTime(staff_df, "last_update")
staff_df = staff_df.merge(date_dim, how="left", left_on="Staff Update Date", right_on="full_date")
staff_dim = pd.DataFrame(staff_df, columns = ["staff_key", "first_name", "last_name", "store_id", "active", "last_update"]) #drop picture column
staff_dim.rename(columns ={"first_name": "First Name", "last_name": "Last Name", "active": "Active",
                           "last_update": "Staff Updated", "store_id":"store_key"}, inplace= True)

payment_df["payment_key"] = payment_df["payment_id"]
payment_df["Payment Update Date"] =  getDate(payment_df, "last_update").astype('datetime64[ns]')
payment_df["Payment Update Time"] = getTime(payment_df, "last_update")
payment_df = payment_df.merge(date_dim, how="left", left_on="Payment Update Date", right_on="full_date")
payment_dim = pd.DataFrame(payment_df,
                          columns = ["payment_key", "staff_id","Payment Update Date",
                                     "Payment Update Time", "amount"])
payment_dim["payment_key"] = payment_dim.index
payment_dim.rename(columns = {"staff_id":"staff_key", "rental_id": "Rental ID", "payment_date": "Payment Date", "last_update": "Payment Updated"}, inplace = True)

In [None]:
store_dim.head(1)

In [None]:
staff_dim.head(1)

In [None]:
payment_dim.head(1)

In [None]:
store_staff = staff_dim.merge(payment_dim, how="right", on="staff_key") # we dont care about staff that hasnt sold stuff
store_staff.head(1)

In [None]:
facts_table = store_staff.merge(store_dim, how="inner", on="store_key")
facts_table["facts_table_key"] = facts_table.index
facts_table.head(1)

# Load Database in MySQL

In [None]:
pushDF(store_dim,"store_dim",destDB, "store_key")
pushDF(staff_dim,"staff_dim",destDB, "staff_key")
pushDF(payment_dim,"payment_dim",destDB, "payment_key")
pushDF(facts_table,"facts_table",destDB, "facts_table_key")

In [None]:
def runSQL(sql, database):
    conn_str = createConnectionString(database)
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(text(sql), connection);
    connection.close()
    return dframe

# SQL group by statement

In [None]:
runSQL(
    """
    SELECT `First Name`, `Last Name`,SUM(amount) as `Total Sold` 
    FROM facts_table 
    GROUP BY `First Name`, `Last Name`;
    ;
    """, 
    destDB).head(1)

## SQL queries

In [None]:
runSQL("SELECT * FROM staff_dim;", destDB).head(1)

In [None]:
runSQL("SELECT * FROM store_dim;", destDB).head(1)

In [None]:
runSQL("SELECT * FROM payment_dim;", destDB).head(1)

In [None]:
runSQL("SELECT * FROM facts_table;", destDB).head(1)