In [0]:
!pip install tqdm

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import requests
import json
import pandas as pd
from time import sleep
from tqdm.notebook import tqdm, trange

In [0]:
# === Constants ===
ACADEMY_ID = "1991905536"
API_KEY = "3b01d96f4b1e0ee735f9a87dde1a67f2"
BASE_URL = "https://app.ca.schoox.com/api/v1"

PARAMS = {
    "apikey": API_KEY,
    "acadId": ACADEMY_ID
}

GET_USER_LIMIT = 500

progress_delta_table = 'aecon_dev.aecon_idd.schoox_progress'
users_delta_table = 'aecon_dev.aecon_idd.schoox_users'
courses_delta_table = 'aecon_dev.aecon_idd.schoox_courses'

### Function Definitions

In [0]:
# ========= Main api call function =========

def get_response_json(request, parameters=None):
    ENDPOINT = f"{BASE_URL}/{request}"
    if parameters == None:
        parameters = PARAMS
    else:
        parameters.update(PARAMS)
    
    try:
        response = requests.get(ENDPOINT, params=parameters)

        if response.status_code == 200:
            return json.loads(response.text)
        else:
            for i in range(5):
                sleep(3)
                response = requests.get(ENDPOINT, params=parameters)
                if response.status_code == 200:
                    return json.loads(response.text)
                else:
                    continue
                
            print(f"❌ Error: {response.status_code}: {response.text}")

    except requests.RequestException as e:
        print(f"❌ Request failed: {e}")

# ==================================================================================


def generate_groups():
    types_df = pd.DataFrame(get_response_json("types"))                         # superset types
    types_df.rename(columns={"id":"type_id", "name":"type_name"}, inplace=True)

    aboves_df = pd.DataFrame(get_response_json("aboves",{"limit":1000}))        # supersets
    aboves_df = aboves_df.iloc[:,0:3]

    groups = aboves_df.merge(types_df, on="type_id", suffixes=("_aboves", "_type"))
    groups_df = groups[~groups["type_name"].isin(["All Users","Manager", "Organization"])]

    return groups_df


def extract_dept(dict_list):
    if len(dict_list) > 0:
        return dict_list[0]['name']
    else: 
        return ''

def extract_role(dict_list):
    if len(dict_list) > 0 and 'jobs' in list(dict_list[0]):
        return dict_list[0]['jobs'][0]['name']
    else:
        return ''

def extract_worker_type(dict_list, groups_df):
    worker_types = list(groups_df[groups_df['type_name']=='Type (Safety)']['name'])
    for i in [d['name'] for d in dict_list]:
        if i in worker_types:
            return i

def extract_company(dict_list, groups_df):
    companies = list(groups_df[groups_df['type_name']=='Company']['name'])
    for i in [d['name'] for d in dict_list]:
        if i in companies:
            return i

# Final function
def get_all_users():
    print("\n⚙️ Fetching full user data.")
    parameters = {"limit":GET_USER_LIMIT}
    parameters.update(PARAMS)

    response_json = get_response_json("users", parameters)
    data_df = pd.DataFrame(response_json)

    data_df['worker_type'] = ""
    data_df['company'] = ""

    groups = generate_groups()

    for i in range(len(data_df)):
        data_df.loc[i, 'worker_type'] = extract_worker_type(data_df['above_units'][i], groups)
        data_df.loc[i, 'company'] = extract_company(data_df['above_units'][i], groups)

    data_df['department'] = data_df['above_units'].apply(extract_dept)
    data_df['role'] = data_df['above_units'].apply(extract_role)
    data_df['sub_team'] = data_df['units'].apply(extract_dept)

    print("✅ User data loaded.")
    return data_df[['id','firstname','lastname','email','worker_type','company','role','sub_team','username']]


def extract_names(dict_item):
    return f"{dict_item['firstname']} {dict_item['lastname']}"

def extract_categories(dict_list):
    return [d['name'] for d in dict_list]

# Final function
def get_all_courses():
    print("\n⚙️ Fetching full course data.")
    parameters = {"limit":100, "start":0}
    parameters.update(PARAMS)
    response_json = get_response_json("courses", parameters)
    data_df = pd.DataFrame(response_json)

    while len(response_json) > 0:
        parameters["start"] += 100
        response_json = get_response_json("courses", parameters)
        data_df = pd.concat([data_df, pd.DataFrame(response_json)], ignore_index=True)

    data_df['categories'] = data_df['categories'].apply(extract_categories)
    data_df['instructor'] = data_df['instructor'].apply(extract_names)
    data_df['categories'] = data_df['categories'].apply(lambda x: ', '.join(x)).apply(lambda x: x.replace('-', ', '))
    data_df.scope = data_df.scope.astype(str)

    print("✅ Course data loaded.")

    return data_df[['id','title','instructor','students','level','categories','scope']]


def get_user_details(user_id:str, parameters=None):
    if parameters != None:
        parameters.update(PARAMS)
    
    response_json = get_response_json(f"users/{user_id}", parameters)
    
    data_df = pd.DataFrame(response_json)
    
    return data_df


def get_user_course_details(user_id:str, parameters=None):
    if parameters != None:
        parameters.update(PARAMS)
    else:
        parameters = {"userId": user_id}
    
    response_json = get_response_json("courses", parameters)
    
    data_df = pd.DataFrame(response_json)
    
    return data_df


def get_user_progress(user_id):
    
    response_json = get_response_json(f"dashboard/users/{user_id}/courses")
    data_df = pd.DataFrame(response_json)
    data_df['user_id'] = user_id
    
    return data_df


# Final function
def get_all_progress():
    print("\n⚙️ Fetching full progress data.")
    cols = [
            'user_id','id','title','url','progress','total_time','due_date','is_due',
            'last_progress','completions_count','completed_by_admin','assignee_first_name',
            'assignee_last_name','enroll_date','timecompleted','certificates','enrolled',
            'archived','completed_as_equivalent','external_id','required','compliance_course'
            ]
    data_df = pd.DataFrame(columns=cols)
    id_list = [d['id'] for d in get_response_json("users", {'limit':GET_USER_LIMIT})]
    print(f"User count: {len(id_list)}")
    
    for id in tqdm(id_list):
        try:
            response = get_response_json(f"dashboard/users/{id}/courses",{'dropped_out':'false'})
            if response != []:
                response_df = pd.DataFrame(response)
                response_df['user_id'] = id
                data_df = pd.concat([data_df, response_df], ignore_index=True)
        except Exception as E:
            print(f"{id}: {E}")
    
    print("✅ Progress data loaded.")

    return data_df[cols]


def save_data(df,delta_table_name):
    try:
        spark_df = spark.createDataFrame(df)
        spark_df.write.format("delta").option("delta.columnMapping.mode", "name").option("mergeSchema", "true").mode("overwrite").saveAsTable(delta_table_name)
        print(f"Data Saved to {delta_table_name}.")
    except Exception as e:
        print(e)

### Main

In [0]:
execution = "000"

if execution[0] == '1':
    users = get_all_users()
    users = users[(users['username']!='solutions.architect') & (~users['email'].str.contains('@schoox.com'))]
    save_data(users,users_delta_table)

if execution[1] == '1':
    courses = get_all_courses()
    save_data(courses,courses_delta_table)

if execution[2] == '1':
    progress = get_all_progress()
    save_data(progress,progress_delta_table)


⚙️ Fetching full course data.
✅ Course data loaded.
Data Saved to aecon_dev.aecon_idd.schoox_courses.


### Testing Area