In [None]:
pwd

In [None]:
#!pip install simple_salesforce
#!pip install pymysql
#!pip install sqlalchemy

In [None]:
#import json
from sqlalchemy import create_engine
from datetime import date
import pandas as pd
import pymysql
pymysql.install_as_MySQLdb()

In [None]:
# import credentials
from config import sf_username, sf_password, sf_security_token
from config import remote_db_endpoint, remote_db_port
from config import remote_db_name, remote_db_user, remote_db_pwd

In [None]:
#connect to salesforce
from simple_salesforce import Salesforce
sf = Salesforce(username=sf_username, password=sf_password, security_token=sf_security_token)

In [None]:
#connect to mysql db
engine = create_engine(f"mysql://{remote_db_user}:{remote_db_pwd}@{remote_db_endpoint}:{remote_db_port}/{remote_db_name}")
conn = engine.connect()

## Prepare ETL Data

### Course Table

In [None]:
#reads in course table from mysql
course_data_df = pd.read_sql("SELECT * FROM course", conn)
course_data_df.head(30)

In [None]:
#rename columns from course table
course_data_df.rename(columns={
    'CourseCode':'Course_Code__c',
    'CourseName':'Course_Name__c',
    'CreditHours':'Credit_Hours__c',
    'BootCampCourse':'Boot_Camp_Course__c',    
}, inplace=True)
course_data_df.head()

In [None]:
#Dropping ID_Course
course_data_df = course_data_df[['Course_Code__c', 'Course_Name__c', 'Credit_Hours__c', 'Boot_Camp_Course__c']]
course_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
course_data_records = course_data_df.to_dict('records')
course_data_records

In [None]:
#deletes course data
class_records = sf.query("SELECT Id FROM Course__c")
recs_to_delete = [{'Id': r['Id']} for r in class_records['records']]
recs_to_delete

sf.bulk.Course__c.delete(recs_to_delete)

In [None]:
 #creating records for each dic in dic list
for rec in course_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {
        'Course_Code__c': rec['Course_Code__c'],
        'Course_Name__c': rec['Course_Name__c'],
        'Credit_Hours__c': rec['Credit_Hours__c'],
        'Boot_Camp_Course__c': rec['Boot_Camp_Course__c'],
    } 
    try: 
        sf.Course__c.create(record)
    except Exception as e:
        print(e)

### Student Table

In [None]:
#read in student data
student_data_df = pd.read_sql("SELECT * FROM student", conn)
student_data_df.head(30)

In [None]:
#rename columns from student table
student_data_df.rename(columns={
    'StudentID':'Student_ID__c',
    'LastName':'Last_Name__c',
    'FirstName':'First_Name__c',
    'MiddleName':'Middle_Name__c', 
    'BirthDate':'Birth_Date__c',
    'Gender': 'Gender__c',
}, inplace=True)

#Dropping ID_Student
student_data_df = student_data_df[['Student_ID__c', 'Last_Name__c', 'First_Name__c', 'Middle_Name__c', 'Birth_Date__c', 'Gender__c']]
student_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
student_data_records = student_data_df.to_dict('records')
student_data_records

In [None]:
#delets student data
student_records = sf.query("SELECT Id FROM Student__c")
recs_to_delete = [{'Id': r['Id']} for r in student_records['records']]
recs_to_delete

sf.bulk.Student__c.delete(recs_to_delete)

In [None]:
#creating student records for each dic in dic list
for rec in student_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {
        'Student_ID__c':rec['Student_ID__c'],
        'Last_Name__c':rec['Last_Name__c'],
        'First_Name__c':rec['First_Name__c'],
        'Middle_Name__c':rec['Middle_Name__c'], 
        'Birth_Date__c':rec['Birth_Date__c'],
        'Gender__c': rec['Gender__c'],
    }
    
    try:
        sf.Student__c.create(record)
    except Exception as e:
        print(e)

### Staff Table

In [None]:
#read in staff data
staff_data_df = pd.read_sql("SELECT * FROM staff", conn)
staff_data_df.head(30)

In [None]:
#rename columns from student table
staff_data_df.rename(columns={
    'EmployeeID':'Employee_ID__c',
    'LastName':'Last_Name__c',
    'FirstName':'First_Name__c',
    'MiddleName':'Middle_Name__c', 
    'BirthDate':'Birth_Date__c',
}, inplace=True)

#Dropping ID_Staff
staff_data_df = staff_data_df[['Employee_ID__c', 'Last_Name__c', 'First_Name__c', 'Middle_Name__c', 'Birth_Date__c']]
staff_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
staff_data_records = staff_data_df.to_dict('records')
staff_data_records

In [None]:
#deletes staff data
staff_records = sf.query("SELECT Id FROM Staff__c")
recs_to_delete = [{'Id': r['Id']} for r in staff_records['records']]
recs_to_delete

sf.bulk.Staff__c.delete(recs_to_delete)

In [None]:
#creating staff records for each dic in dic list
for rec in staff_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {
        'Employee_ID__c':rec['Employee_ID__c'],
        'Last_Name__c':rec['Last_Name__c'],
        'First_Name__c':rec['First_Name__c'],
        'Middle_Name__c':rec['Middle_Name__c'], 
        'Birth_Date__c':str(rec['Birth_Date__c'])
    } 
    try:
        sf.Staff__c.create(record)
    except Exception as e:
        print(e)

## Look up Tables

### Link Class to Course Table

In [None]:
#create course lookup table
course_lookup_list = []

# The `Name` column in the primary key in Salesforce objects
data = sf.query_all_iter("SELECT Course_Code__c, Name FROM Course__c")
for row in data:
    rec = {
        'ID_Course__c': row['Name'], # this is a critical line of code
        'Course_Code__c': row['Course_Code__c']
    }
    course_lookup_list.append(rec)
    
course_lookup_list

In [None]:
#convert to dataframe
course_lookup_df = pd.DataFrame(course_lookup_list)
#course_lookup_df

In [None]:
# Query the `Class` table from MySQL
query = '''
    SELECT 
        co.CourseCode,
        cl.*
    FROM 
        class cl
        INNER JOIN course co
        ON cl.ID_Course = co.ID_Course

'''

class_data_df = pd.read_sql(query, conn)
class_data_df.head()

In [None]:
#rename columns from class table
class_data_df.rename(columns={
    'ID_Course':'ID_Course__c',
    'Section':'Section__c',
    'StartDate':'Start_Date__c',
    'EndDate':'End_Date__c',
    'CourseCode':'Course_Code__c'
}, inplace=True)


class_data_df = class_data_df[['ID_Course__c','Course_Code__c', 'Section__c', 'Start_Date__c', 'End_Date__c']]
class_data_df

In [None]:
#merge MySQL Class table to salesforce FK lookup
class_data_df = pd.merge(class_data_df, course_lookup_df, how='left', on='Course_Code__c')
#class_data_df.drop(columns = ['ID_Class','ID_Course','CourseName','CreditHours','BootCampCourse','Course_Code__c'], inplace=True)

class_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
class_data_records = class_data_df.to_dict('records')
class_data_records

In [None]:
#delets class data
class_records = sf.query("SELECT Id FROM Class__c")
recs_to_delete = [{'Id': r['Id']} for r in class_records['records']]
recs_to_delete

sf.bulk.Class__c.delete(recs_to_delete)

In [None]:
#creating class records for each dic in dic list
#Insert records into salesforce
for rec in class_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {
        'ID_Course__c': rec['ID_Course__c_y'],
        'Section__c':rec['Section__c'],
        'Start_Date__c':str(rec['Start_Date__c']),
        'End_Date__c':str(rec['End_Date__c'])
    } 
    try:
        sf.Class__c.create(record)
    except Exception as e:
        print(e)

### Link Class Participant to Class and Student Tables

In [None]:
#create class lookup table 
class_lookup_list = []

# The `Name` column in the primary key in Salesforce objects
data = sf.query_all_iter("SELECT Section__c, Name FROM Class__c")
for row in data:
    rec = {
        'ID_Class__c': row['Name'], # this is a critical line of code
        'Section__c': row['Section__c']
    }
    class_lookup_list.append(rec)

#convert to dataframe
class_lookup_df = pd.DataFrame(class_lookup_list)
class_lookup_df

In [None]:
#create student lookup table
student_lookup_list = []

# The `Name` column in the primary key in Salesforce objects
data = sf.query_all_iter("SELECT Student_ID__c, Name FROM Student__c")
for row in data:
    rec = {
        'ID_Student__c': row['Name'], # this is a critical line of code
        'Student_ID__c': row['Student_ID__c']
    }
    student_lookup_list.append(rec)
    
#convert to dataframe
student_lookup_df = pd.DataFrame(student_lookup_list)
student_lookup_df

In [None]:
# Query the `classparticpant, class, student` tables from MySQL
query = '''
    SELECT 
        cp.StartDate,
        cp.EndDate,
        s.StudentID,
        c.Section
    FROM classparticipant cp
    INNER JOIN student s
    ON s.ID_Student = cp.ID_Student
    INNER JOIN class c
    ON c.ID_Class = cp.ID_Class

'''

cp_data_df = pd.read_sql(query, conn)
#cp_data_df.head()

In [None]:
#rename columns from classparticipant table
cp_data_df.rename(columns={
    'StartDate':'Start_Date__c',
    'EndDate':'End_Date__c',
    'StudentID':'Student_ID__c',
    'Section':'Section__c'
}, inplace=True)

cp_data_df.head()

In [None]:
#merge MySQL cp table to salesforce FK lookup tables
#classparticipant and class
claspart_data_df = pd.merge(cp_data_df, class_lookup_df, how='left', on='Section__c')

#merged cp data with student
claspart_data_df = pd.merge(claspart_data_df, student_lookup_df, how='left', on='Student_ID__c')

claspart_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
claspart_data_records = claspart_data_df.to_dict('records')
claspart_data_records

In [None]:
#delets classparticipant data
class_records = sf.query("SELECT Id FROM Class_Participant__c")
recs_to_delete = [{'Id': r['Id']} for r in class_records['records']]
recs_to_delete

sf.bulk.Class_Participant__c.delete(recs_to_delete)

In [None]:
#creating records for each dic in dic list
#Insert records into salesforce
for rec in claspart_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {}
    record['ID_Class__c'] = rec['ID_Class__c']
    record['ID_Student__c'] = rec['ID_Student__c']
    if rec['Start_Date__c'] != None:
        record['Start_Date__c'] = str(rec['Start_Date__c'])
    if rec['End_Date__c'] != None:
        record['End_Date__c'] = str(rec['End_Date__c'])
    
    try:
        sf.Class_Participant__c.create(record)
    except Exception as e:
        print(e)

### Link Staff Assignmentt to Class and Staff Tables

In [None]:
#create staff lookup table
staff_lookup_list = []

# The `Name` column in the primary key in Salesforce objects
# The Salesforce query language is called SOQL 
data = sf.query_all_iter("SELECT Employee_ID__c, Name FROM Staff__c")
for row in data:
    rec = {
        'ID_Staff__c': row['Name'], # this is a critical line of code
        'Employee_ID__c': row['Employee_ID__c']
    }
    staff_lookup_list.append(rec)
    
#convert to dataframe
staff_lookup_df = pd.DataFrame(staff_lookup_list)
staff_lookup_df

In [None]:
# Query the `staff assignment, class, staff` tables from MySQL
query = '''
    SELECT 
        sa.StartDate,
        sa.EndDate,
        sa.Role,
        s.EmployeeID,
        c.Section
    FROM staffassignment sa
    INNER JOIN staff s
    ON s.ID_Staff = sa.ID_Staff
    INNER JOIN class c
    ON c.ID_Class = sa.ID_Class
'''

sa_data_df = pd.read_sql(query, conn)
sa_data_df.head()

In [None]:
#rename columns from staff assign table
sa_data_df.rename(columns={
    'StartDate':'Start_Date__c',
    'EndDate':'End_Date__c',
    'Role':'Role__c',
    'EmployeeID':'Employee_ID__c',
    'Section':'Section__c'
}, inplace=True)

sa_data_df.head()

In [None]:
#merge MySQL staff assign table to salesforce FK lookup tables
#staff assignment and class
staffass_data_df = pd.merge(sa_data_df, class_lookup_df, how='left', on='Section__c')

#merged cp data with student
staffass_data_df = pd.merge(staffass_data_df, staff_lookup_df, how='left', on='Employee_ID__c')

staffass_data_df.head()

In [None]:
#convert dataframe to a list of dictionaries
staffass_data_records = staffass_data_df.to_dict('records')
staffass_data_records

In [None]:
#delets staff assignment data
class_records = sf.query("SELECT Id FROM Staff_Assignment__c")
recs_to_delete = [{'Id': r['Id']} for r in class_records['records']]
recs_to_delete

sf.bulk.Staff_Assignment__c.delete(recs_to_delete)

In [None]:
#creating records for each dic in dic list
#Insert records into salesforce
for rec in staffass_data_records:
    #taking a key out of dictionary putting into new dic format
    record = {}
    record['ID_Class__c'] = rec['ID_Class__c']
    record['ID_Staff__c'] = rec['ID_Staff__c']
    record['Role__c'] = rec['Role__c']
    if rec['Start_Date__c'] != None:
        record['Start_Date__c'] = str(rec['Start_Date__c'])
    if rec['End_Date__c'] != None:
        record['End_Date__c'] = str(rec['End_Date__c'])
    
    try:
        sf.Staff_Assignment__c.create(record)
    except Exception as e:
        print(e)