Import All necessary packages

In [85]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine
import pymysql
import mysql.connector

#### Get dependents table from local file system


In [3]:
df_dependents = pd.read_csv('C:/Users/Cato/Downloads/DS2002_project_1/dependents.csv')

In [10]:
df_dependents

Unnamed: 0,dependent_id,first_name,last_name,relationship,employee_id
0,1,Penelope,Gietz,Child,206
1,2,Nick,Higgins,Child,205
2,3,Ed,Whalen,Child,200
3,4,Jennifer,King,Child,100
4,5,Johnny,Kochhar,Child,101
5,6,Bette,De Haan,Child,102
6,7,Grace,Faviet,Child,109
7,8,Matthew,Chen,Child,110
8,9,Joe,Sciarra,Child,111
9,10,Christian,Urman,Child,112


Loading data into MongoDB


In [5]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "GBwon4ring$"

In [6]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [11]:
dataframe = df_dependents
table_name = 'dependents'
primary_key = 'dependent_id'
db_operation = "insert"
dst_dbname = 'project_1'

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [15]:
def populate_dataframe(user_id, pwd, host_name, db_name, sql):
    conn = pymysql.connect(host=host_name, user=user_id, password=pwd, database=db_name)

    df = pd.read_sql(sql, conn)

    return df

In [17]:
atlas_cluster_name = "DS2002"
atlas_default_dbname = "admin"
atlas_user_name = "root"
atlas_password = "gthgthgth"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.20qvgzt.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}

print(conn_str["local"])
print(conn_str["atlas"])

src_dbname = "hr_db"
dst_dbname = "project_1"

mongodb://localhost:27017/
mongodb+srv://root:gthgthgth@DS2002.20qvgzt.mongodb.net/admin?retryWrites=true&w=majority


#### Populate MongoDB with Employee Data
Be certain you run this cell **ONLY ONCE!** 


In [23]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"employees" : 'employees.json'}

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()      

Make sure data loaded in and convert it into dataframe

In [18]:
query = {}
collection = "employees"

df_employees = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_employees.head(2)

Unnamed: 0,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9


Get the rest of the tables from MySQL and covert them into dataframes

In [19]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "GBwon4ring$"

src_dbname = "project_1"
#dst_dbname = "project_1dw2"

In [20]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [78]:
sql_countries = "SELECT * FROM project_1.countries;"
df_countries = get_dataframe(user_id, pwd, host_name, src_dbname, sql_countries)
df_countries

Unnamed: 0,country_id,country_name,region_id
0,AR,Argentina,2
1,AU,Australia,3
2,BE,Belgium,1
3,BR,Brazil,2
4,CA,Canada,2
5,CH,Switzerland,1
6,CN,China,3
7,DE,Germany,1
8,DK,Denmark,1
9,EG,Egypt,4


In [22]:
sql_departments = "SELECT * FROM project_1.departments;"
df_departments = get_dataframe(user_id, pwd, host_name, src_dbname, sql_departments)
df_departments.head(2)

Unnamed: 0,department_id,department_name,location_id
0,1,Administration,1700
1,2,Marketing,1800


In [23]:
sql_jobs = "SELECT * FROM project_1.jobs;"
df_jobs = get_dataframe(user_id, pwd, host_name, src_dbname, sql_jobs)
df_jobs.head(2)

Unnamed: 0,job_id,job_title,min_salary,max_salary
0,1,Public Accountant,4200.0,9000.0
1,2,Accounting Manager,8200.0,16000.0


In [24]:
sql_locations = "SELECT * FROM project_1.locations;"
df_locations = get_dataframe(user_id, pwd, host_name, src_dbname, sql_locations)
df_locations.head(2)

Unnamed: 0,location_id,street_address,postal_code,city,state_province,country_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US


In [25]:
sql_regions = "SELECT * FROM project_1.regions;"
df_regions = get_dataframe(user_id, pwd, host_name, src_dbname, sql_regions)
df_regions.head(2)

Unnamed: 0,region_id,region_name
0,1,Europe
1,2,Americas


## Make Fact Table

Fact table needs employees, job, department and location as those are the main details of each transaction. It is an HR database. Can just have the keys to the other tables: region, countries, and dependents.

Merging Location and Department Tables

In [57]:
df_nlocations = pd.merge(df_locations, df_countries, on ='country_id', how = 'inner')
df_nlocations.head(2)

Unnamed: 0,location_id,street_address,postal_code,city,state_province,country_id,country_name,region_id
0,1400,2014 Jabberwocky Rd,26192,Southlake,Texas,US,United States of America,2
1,1500,2011 Interiors Blvd,99236,South San Francisco,California,US,United States of America,2


In [58]:
df_ndepartments = pd.merge(df_departments, df_nlocations, on ='location_id', how = 'inner')
df_ndepartments.head(2)

Unnamed: 0,department_id,department_name,location_id,street_address,postal_code,city,state_province,country_id,country_name,region_id
0,1,Administration,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2
1,3,Purchasing,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2


Merging Department and Employee Tables

In [59]:
df_nemployees = pd.merge(df_employees, df_ndepartments,  on ='department_id', how = 'inner')
df_nemployees.head(2)

Unnamed: 0,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id,department_name,location_id,street_address,postal_code,city,state_province,country_id,country_name,region_id
0,100,Steven,King,steven.king@sqltutorial.org,515.123.4567,1987-06-17,4,24000.0,,9,Executive,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2
1,101,Neena,Kochhar,neena.kochhar@sqltutorial.org,515.123.4568,1989-09-21,5,17000.0,100.0,9,Executive,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2


In [66]:
df_ndependents = pd.merge(df_dependents, df_nemployees,  on ='employee_id', how = 'inner')
drop_columns = ['first_name_x', 'last_name_x', 'relationship']
df_ndependents.drop(drop_columns, axis=1, inplace = True)
df_ndependents.rename(columns={"first_name_y":"first_name", "last_name_y":"last_name"}, inplace=True)
df_ndependents.head(2)

Unnamed: 0,dependent_id,employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,manager_id,department_id,department_name,location_id,street_address,postal_code,city,state_province,country_id,country_name,region_id
0,1,206,William,Gietz,william.gietz@sqltutorial.org,515.123.8181,1994-06-07,1,8300.0,205.0,11,Accounting,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2
1,2,205,Shelley,Higgins,shelley.higgins@sqltutorial.org,515.123.8080,1994-06-07,2,12000.0,101.0,11,Accounting,1700,2004 Charade Rd,98199,Seattle,Washington,US,United States of America,2


Re-Order Columns in Table

In [68]:
reorder_columns = ['location_id',
        'country_id',
        'department_id',
        'employee_id',
        'job_id',
        'manager_id',
        'dependent_id',
        'street_address',
        'postal_code',
        'city',
        'state_province',
        'department_name',
        'first_name',
        'last_name',
        'email',
        'phone_number',
        'hire_date',
        'salary']

df_factTableOrdered = df_ndependents[reorder_columns]

df_factTableOrdered.insert(0, "fact_key", range(1, df_factTableOrdered.shape[0]+1))
df_factTableOrdered.head(2)

Unnamed: 0,fact_key,location_id,country_id,department_id,employee_id,job_id,manager_id,dependent_id,street_address,postal_code,city,state_province,department_name,first_name,last_name,email,phone_number,hire_date,salary
0,1,1700,US,11,206,1,205.0,1,2004 Charade Rd,98199,Seattle,Washington,Accounting,William,Gietz,william.gietz@sqltutorial.org,515.123.8181,1994-06-07,8300.0
1,2,1700,US,11,205,2,101.0,2,2004 Charade Rd,98199,Seattle,Washington,Accounting,Shelley,Higgins,shelley.higgins@sqltutorial.org,515.123.8080,1994-06-07,12000.0


insert fact table into MySQL database

In [70]:
dataframe = df_factTableOrdered
table_name = 'fact_table'
primary_key = 'fact_key'
db_operation = "insert"
dst_dbname = 'project_1'

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

Create Dimensional Report for names, country, and max salary sorted by date hired

In [84]:
query = """
    select first_name, last_name, country_name as country, max(max_salary) as max_salary
from project_1.fact_table
inner join project_1.countries
on project_1.fact_table.country_id = project_1.countries.country_id
inner join project_1.jobs
on project_1.fact_table.job_id = project_1.jobs.job_id
group by hire_date;
"""
df_query = get_dataframe(user_id, pwd, host_name, src_dbname, query)
df_query

Unnamed: 0,first_name,last_name,country,max_salary
0,William,Gietz,United States of America,16000.0
1,Jennifer,Whalen,United States of America,6000.0
2,Steven,King,United States of America,40000.0
3,Neena,Kochhar,United States of America,30000.0
4,Lex,De Haan,United States of America,30000.0
5,Daniel,Faviet,United States of America,9000.0
6,John,Chen,United States of America,9000.0
7,Ismael,Sciarra,United States of America,9000.0
8,Jose Manuel,Urman,United States of America,9000.0
9,Luis,Popp,United States of America,9000.0
