In [None]:
import pandas as pd

# Read the CSV file into a pandas DataFrame
df = pd.read_csv('course_details_csv.csv')

total_courses = df.drop_duplicates(subset=['value.name', 'value.course_id'])

# Filter the DataFrame to get only active courses
active_courses = total_courses[total_courses['value.workflow_state'] == 'active']

active_courses_rate = len(active_courses) / len(total_courses)

# Print the results
print(f"Total Courses: {len(total_courses)}")
print(f"Active Courses: {len(active_courses)}")
print(f"Active Courses Rate: {active_courses_rate:.2%}")



: 

In [None]:
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap
from bokeh.palettes import Viridis256

# Create a DataFrame for plotting
data = pd.DataFrame({'Courses': ['Total Courses', 'Active Courses'],
                     'Rate': [1, active_courses_rate]})

# Create a ColumnDataSource from the data
source = ColumnDataSource(data)

# Create a figure
p = figure(x_range=data['Courses'] d, title="Active Courses Rate")

# Create bars for the data
p.vbar(x='Courses', top='Rate', width=0.5, source=source, 
       line_color='white', fill_color=factor_cmap('Courses', palette=Viridis256, factors=data['Courses']))

# Customize the plot
p.y_range.start = 0
p.y_range.end = 1
p.xaxis.major_label_orientation = 0.5
p.xaxis.axis_label = "Courses"
p.yaxis.axis_label = "Rate"

# Show the plot
show(p)

: 

1. Connect to Canvas API 
2. Transform data (only the KPIs that will be in the dashboard or in the report) 
3. Connect to local DB (sqlServer)
4. Load to local DB transformed and raw untransformed data (tutorial)
5. Connect to local DB
6. Design of dashboards 
7. Extract needed information for KPIs
8. Generate plots for report 
9. Generate dashboards
10. Send information via email or publish it in a url

In [72]:
import requests as req
from typing import Optional, List, Dict
import time
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

# Access environment variables
BASE_URL = "https://api-gateway.instructure.com"
CLIENT_ID = os.getenv("API_KEY")
CLIENT_SECRET = os.getenv("API_SECRET")

class CanvasAPI:

    def __init__(self, client_id: str, client_secret: str, auto_login: bool = True) -> None: 
        self.url = BASE_URL
        self.client_id = client_id
        self. client_secret = client_secret
        if auto_login: 
            self.login()

    
    def login(self) -> Optional[str]: 
        url = f"{BASE_URL}/ids/auth/login"
        auth = req.auth.HTTPBasicAuth(self.client_id, self.client_secret)
        data = {"grant_type": "client_credentials"}
        response = req.post(url, auth=auth, data=data)

        if response.status_code == 200:
            response_json = response.json()
            self.access_token = response_json.get("access_token")  # Store the token
            return self.access_token
        else:
            raise Exception(f"Login failed with status code: {response.status_code}, response text: {response.text}")
        
    
    def make_request_with_auth(self,method: str, endpoint: str, auto_login=True, **kwargs) -> req.Response:
        """Makes a request to the Canvas API with authentication."""
        if self.access_token is None and auto_login:
            try: 
                self.login()
            except:
                raise Exception("Could not login and access token is expired.")

        url = f"{self.url}/{endpoint}"
        headers = {"Authorization": f"Bearer {self.access_token}"}
        response = req.request(method, url, headers=headers, **kwargs)
        assert response.status_code == 200, f"Request failed with status code: {response.status_code}, response text: {response.text}"
        return response
    
    def get_list_of_tables(self) -> Optional[List]:
        if self.access_token is None:
            raise Exception("Not logged in. Please call login() first.")
        response = self.make_request_with_auth("GET", "/dap/query/canvas/table")
        if response.status_code == 200:
            return response.json()["tables"]
        else:
            return [{"status code": response.status_code, "msg": response.text}]
        
    def get_schema_of_table(self, table_name: str) -> Optional[Dict]:
        response = self.make_request_with_auth("GET", f"dap/query/canvas/table/{table_name}/schema")
        if response.status_code == 200:
            return response.json()
        else:
            return {"status code": response.status_code, "msg": response.text}
    
    def get_id_for_table(self, table_name: str) -> Optional[str]:
        body = {"format": "jsonl"}
        response = self.make_request_with_auth("POST", f"dap/query/canvas/table/{table_name}/data", json=body)

        if response.status_code == 200:
            while True:
                response = self.make_request_with_auth("POST", f"dap/query/canvas/table/{table_name}/data", json=body)
                status = response.json().get("status")

                if status == "failed":
                    return f"Request failed with status code: {response.status_code}, response text: {response.text}"
                elif status != "complete":
                    time.sleep(5) # Wait 5 seconds before trying again
                    continue
                if status == "complete": # to make sure the status is complete  
                    return response.json()["objects"][0]["id"]
        else:
            return f"Initial request failed with status code: {response.status_code},response text: {response.text}"
    
    def get_table_by_name(self, table_name:str):
        endpoint = "dap/object/url"

        table_id = self.get_id_for_table(table_name)
        if not table_id:
            return f"Failed to get table ID for table: {table_name}"
        
        body = [{"id": str(table_id)}]
        response = self.make_request_with_auth("POST", endpoint, json=body)

        if response.status_code == 200:
            table_url = response.json()["urls"][table_id]["url"]
            return table_url
        else:
            print(f"Request failed with status code: {response.status_code}, response test: {response.text}")

In [73]:
canvas_api = CanvasAPI(CLIENT_ID, CLIENT_SECRET, True)
table_id = canvas_api.get_id_for_table("wikis")
table_url = canvas_api.get_table_by_name("wikis")


In [13]:
import os
from dap.api import DAPClient
from dap.dap_types import Credentials, Format, SnapshotQuery
from dotenv import load_dotenv

# Load environment variables from .env file
def load_env_vars():
    load_dotenv()
    base_url = os.environ["DAP_API_URL"]
    client_id = os.environ["DAP_CLIENT_ID"]
    client_secret = os.environ["DAP_CLIENT_SECRET"]
    return base_url, client_id, client_secret

# Create credentials
def create_credentials() -> Credentials:
    _, client_id, client_secret = load_env_vars()
    return Credentials.create(client_id=client_id, client_secret=client_secret)

# Get table schema
async def get_table_schema(namespace: str, table: str, credentials:Credentials = None):
    if credentials is None:
        credentials = create_credentials()
    async with DAPClient() as session:
        schema = await session.get_table_schema(namespace, table)
        return schema

# Download all table schemas
async def download_all_table_schemas(namespace: str, output_directory: str, credentials:Credentials = None):
    if credentials is None: 
        credentials = create_credentials()
    async with DAPClient() as session:
        tables = await session.get_tables(namespace)
        for table in tables:
            await session.download_table_schema(namespace=namespace, table=table, output_directory=output_directory)
            print(f"Downloaded schema for '{table}'")

# Download table data
async def download_table_data(namespace: str, table: str, output_directory: str, credentials: Credentials = None):
    if credentials is None:
        credentials = create_credentials
    async with DAPClient() as session:
        query = SnapshotQuery(format=Format.CSV, mode=None)
        await session.download_table_data(
            namespace=namespace, table=table, query=query, output_directory=output_directory, decompress=True
        )

# Example usage:
if __name__ == "__main__":
    import asyncio

    # Run independently
    # asyncio.run(get_table_schema("canvas", "accounts"))
    # asyncio.run(download_all_table_schemas("canvas", os.getcwd()))
    asyncio.run(download_table_data("canvas", "courses", os.getcwd()))



In [12]:




asyncio.run(get_table_schema("canvas", "accounts"))
output_directory = os.getcwd()
asyncio.run(download_all_table_schemas("canvas", output_directory))
asyncio.run(download_table_data("canvas", "courses", output_directory))


  for i in range(size):


KeyboardInterrupt: 

In [11]:
import nest_asyncio
nest_asyncio.apply()


