In [6]:
import pandas as pd
import pymysql
from sqlalchemy import create_engine, types
import datetime
import requests
import time 

In [13]:
username = 'TALENT_LMS_KEY'
password = ''


mysql_host = 'HOSTNAME'
mysql_root = 'USERNAME'
mysql_password = 'PASSWORD'
mysql_db = 'DATABASE_NAME'


mysql_engine = create_engine(f'mysql+pymysql://{mysql_root}:{mysql_password}@{mysql_host}:3306/{mysql_db}')

In [8]:
def load_lms_users():   
    retries = 1
    success = False
    
    while not success:
        try:
            url = 'https://thirdspace.talentlms.com/api/v1/users'
            response = requests.get(url, auth=(username, password), timeout=5)
            user_data = response.json()
            success = True

        except requests.exceptions.ConnectionError:
            wait = retries * 5
            print(f'Error! Waiting {wait} secs and re-trying...')
            time.sleep(wait)
            retries += 1

    users_df = pd.json_normalize(user_data)

    users_df['last_updated_timestamp'] = users_df['last_updated_timestamp'].astype('int64')
    users_df['updated_at'] = users_df['last_updated_timestamp'].apply(lambda x: datetime.datetime.fromtimestamp(x))
    users_df['account_created_at'] = users_df['created_on'].\
        apply(lambda x: datetime.datetime.strptime(x, '%d/%m/%Y, %H:%M:%S'))

    COLS_TO_DROP = ['timezone', 'language', 'avatar', 'bio', 
                    'login_key', 'custom_field_1', 'restrict_email',
                    'created_on', 'last_updated', 'last_updated_timestamp']
    lms_users = users_df.drop(columns=COLS_TO_DROP)
    
    return lms_users

def get_max_updated_at(table_name):
    conn = mysql_engine.connect()
    result = conn.execute(f"select coalesce(max(updated_at),'1970-01-01 00:00:01') from test_database.{table_name}").fetchall()
    max_updated_at = result[0][0]
    return max_updated_at

def get_latest_df(df, table_name):
    max_timestamp = get_max_updated_at(table_name)
    temp_df = df[df['updated_at'] >= max_timestamp]
    return temp_df

def create_temp_users_table(df):
    
    sql_types = {
    'id' : types.INTEGER(),
    'login': types.VARCHAR(length=128),
    'first_name': types.VARCHAR(length=128),
    'last_name' : types.VARCHAR(length=128),
    'email': types.VARCHAR(length=128),
    'user_type': types.VARCHAR(length=64),
    'status' : types.VARCHAR(length=64),
    'deactivation_date': types.VARCHAR(length=64),
    'level': types.INTEGER(),
    'points' : types.INTEGER(),
    'updated_at': types.TIMESTAMP(),
    'account_created_at': types.TIMESTAMP()   
}
    df.to_sql(
    name='temp_talent_lms_users',
    con=mysql_engine,
    if_exists='replace',
    dtype=sql_types,
    index=False
)
    
def upsert_users_table():
    conn = mysql_engine.connect()
    
    sql_query = f'''

        insert into {mysql_db}.talent_lms_users  
        select * from {mysql_db}.temp_talent_lms_users 
        on duplicate key update 
            talent_lms_users.user_type = temp_talent_lms_users.user_type,
            talent_lms_users.`status` = temp_talent_lms_users.`status`,
            talent_lms_users.`level` = temp_talent_lms_users.`level`,
            talent_lms_users.points = temp_talent_lms_users.points,
            talent_lms_users.updated_at = temp_talent_lms_users.updated_at;
'''

    conn.execute(sql_query)

In [4]:
# Note: Want to create the table first for users in the db

users_df = load_lms_users() # Extracting LMS data
temp_df = get_latest_df(df=users_df, table_name='talent_lms_users') # Temp table to update
create_temp_users_table(temp_df) # Creates the temp table in MySQL
upsert_users_table() # Upsert Operations

Error! Waiting 5 secs and re-trying...
