In [7]:
import requests
import json
import io
import zipfile
import pandas as pd
import time
from pyspark.sql import SparkSession
import re
from datetime import datetime, timedelta

StatementMeta(, 8716d90b-d69c-41b9-a264-0f26a951aa71, 9, Finished, Available, Finished)

In [8]:
spark = SparkSession.builder.getOrCreate()
tokens_df = spark.table("my3de_survey_bronze.tokens")
token_json = tokens_df.collect()[0][0]
token_data = json.loads(token_json)
access_token = token_data['access_token']

StatementMeta(, 8716d90b-d69c-41b9-a264-0f26a951aa71, 10, Finished, Available, Finished)

In [15]:
BASE_URL = "https://learning.my3de.org"
DATASET_ID = "d3668759-2309-471c-b959-d3835936f5e8" # Survey Results ADS

previous_date = datetime.now() - timedelta(days=1) # Get the previous date
START_DATE = '2023-01-01' # Format the start of the day
END_DATE = datetime.now().strftime("%Y-%m-%d") # Format the end of the day


#END_DATE = '2025-04-28'


PARENT_ORG_UNIT_ID = 6668 

# Headers for API requests
HEADERS = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
    "Accept": "application/json"
}

StatementMeta(, 8716d90b-d69c-41b9-a264-0f26a951aa71, 20, Finished, Available, Finished)

In [16]:
export_job_id = create_export_job(DATASET_ID, START_DATE, END_DATE, PARENT_ORG_UNIT_ID, 123)
print(f"Export job created with ID: {export_job_id}")


poll_export_job_status(export_job_id)


df = download_export_job(export_job_id)

StatementMeta(, 8716d90b-d69c-41b9-a264-0f26a951aa71, 21, Finished, Available, Finished)

Export job created with ID: 5776937a-0e99-47a7-a643-4e43f968f750
Export job status: 1. Polling again in 30 seconds...
Export job status: 1. Polling again in 30 seconds...
Export job completed successfully.


In [17]:
display(df)

StatementMeta(, 8716d90b-d69c-41b9-a264-0f26a951aa71, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b0fd638d-fa0f-44c0-8551-214f97b8aeb6)

In [1]:
def create_export_job(dataset_id, start_date, end_date, parent_org_unit_id, role):
    """
    Create an export job for the specified dataset and date range.
    """
    url = f"{BASE_URL}/d2l/api/lp/1.13/dataExport/create"
    payload = {
        "DataSetId": dataset_id,
        "Filters": [
            {"Name": "startDate", "Value": start_date},
            {"Name": "endDate", "Value": end_date},
            {"Name": "parentOrgUnitId", "Value": parent_org_unit_id},
            {"Name": "roles", "Value": role} # role ID for students
        ]
    }
    response = requests.post(url, headers=HEADERS, data=json.dumps(payload))
    response.raise_for_status()
    return response.json()["ExportJobId"]

def poll_export_job_status(export_job_id):
    """
    Poll the status of the export job until it completes.
    """
    url = f"{BASE_URL}/d2l/api/lp/1.13/dataExport/jobs/{export_job_id}"
    while True:
        response = requests.get(url, headers=HEADERS)
        response.raise_for_status()
        status = response.json()["Status"]
        if status == 2:  # Status 2 means the job is complete
            print("Export job completed successfully.")
            return
        elif status == 3:  # Status 3 means the job failed
            raise Exception("Export job failed.")
        else:
            print(f"Export job status: {status}. Polling again in 30 seconds...")
            time.sleep(30)

def download_export_job(export_job_id):
    """
    Download the completed export job, process the CSV content, and clean column names.
    """
    url = f"{BASE_URL}/d2l/api/lp/1.13/dataExport/download/{export_job_id}"
    response = requests.get(url, headers=HEADERS, stream=True)
    response.raise_for_status()
    
    zip_file = io.BytesIO(response.content)
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        for file in zip_ref.namelist():
            if file.endswith('.csv'):
                # Read CSV content with error handling for mixed types
                df = pd.read_csv(io.BytesIO(zip_ref.read(file)), low_memory=False)  # This addresses the DtypeWarning
                
                # Clean column names
                df.columns = [re.sub(r'[^a-zA-Z0-9_]', '_', col_name).strip().lower() for col_name in df.columns]
                
                return df
    return None  # If no CSV found

def store_in_lakehouse(df, table_name):
    """
    Store the DataFrame into a Lakehouse table using PySpark with column mapping.
    Ensures that all columns in the new data match the existing schema before appending.
    """
    spark = SparkSession.builder.getOrCreate()

    # Get the current date and format it
    current_date = datetime.now().strftime('%Y-%m-%d')
    
    # Add 'data_retrieval_date' column to the DataFrame
    df['data_retrieval_date'] = current_date

    # Convert pandas DataFrame to Spark DataFrame
    spark_df = spark.createDataFrame(df)

    # Define the target table path
    table_path = f"my3de_survey_bronze.{table_name}"

    try:
        # Get existing table schema
        existing_schema = spark.table(table_path).schema

        # Cast new data to match existing schema
        for field in existing_schema:
            if field.name in spark_df.columns:
                spark_df = spark_df.withColumn(field.name, spark_df[field.name].cast(field.dataType))
            else:
                # Add missing columns with null values if necessary
                spark_df = spark_df.withColumn(field.name, lit(None).cast(field.dataType))

    except AnalysisException:
        # Table doesn't exist yet, so proceed with creation
        pass

    # Apply column mapping and ensure table exists
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {table_path} 
    USING DELTA 
    TBLPROPERTIES (
        'delta.columnMapping.mode' = 'name',
        'delta.minReaderVersion' = '2',
        'delta.minWriterVersion' = '5'
    )
    """)

    # Write the DataFrame to the Delta table
    spark_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(table_path)

StatementMeta(, 0688178a-c963-4a96-ba22-d85c2f8317a9, 3, Finished, Available, Finished)

In [11]:
# Survey Results - students
try:
    # Step 1: Create the export job
    print("Creating export job...")
    export_job_id = create_export_job(DATASET_ID, START_DATE, END_DATE, PARENT_ORG_UNIT_ID, 123)
    print(f"Export job created with ID: {export_job_id}")

    # Step 2: Poll the export job status
    print("Polling export job status...")
    poll_export_job_status(export_job_id)

    # Step 3: Download and directly process the CSV file
    print("Downloading and processing dataset...")
    df = download_export_job(export_job_id)
    if df is not None:
        # Step 4: Store directly in Lakehouse
        table_name = "ads_student_survey_results"
        store_in_lakehouse(df, table_name)
        print(f"Data stored in Lakehouse table: {table_name}")
    else:
        print("No CSV file found in the export.")

except requests.exceptions.HTTPError as err:
    print(f"HTTP error occurred: {err}")
    print(f"Response content: {err.response.text}")
except Exception as err:
    print(f"An error occurred: {err}")

StatementMeta(, 03f6b7c2-764f-490c-a517-602a15739615, 13, Finished, Available, Finished)

Creating export job...
Export job created with ID: cf6ae39f-b0d5-494a-a84d-266c66744951
Polling export job status...
Export job status: 1. Polling again in 30 seconds...
Export job completed successfully.
Downloading and processing dataset...
Data stored in Lakehouse table: ads_student_survey_results


In [12]:
# Survey Results - teachers
try:
    # Step 1: Create the export job
    print("Creating export job...")
    export_job_id = create_export_job(DATASET_ID, START_DATE, END_DATE, PARENT_ORG_UNIT_ID, 121)
    print(f"Export job created with ID: {export_job_id}")

    # Step 2: Poll the export job status
    print("Polling export job status...")
    poll_export_job_status(export_job_id)

    # Step 3: Download and directly process the CSV file
    print("Downloading and processing dataset...")
    df = download_export_job(export_job_id)
    if df is not None:
        # Step 4: Store directly in Lakehouse
        table_name = "ads_teacher_survey_results"
        store_in_lakehouse(df, table_name)
        print(f"Data stored in Lakehouse table: {table_name}")
    else:
        print("No CSV file found in the export.")

except requests.exceptions.HTTPError as err:
    print(f"HTTP error occurred: {err}")
    print(f"Response content: {err.response.text}")
except Exception as err:
    print(f"An error occurred: {err}")

StatementMeta(, 03f6b7c2-764f-490c-a517-602a15739615, 14, Finished, Available, Finished)

Creating export job...
Export job created with ID: fb1f267e-8006-4930-8c14-7ae346b419a3
Polling export job status...
Export job status: 1. Polling again in 30 seconds...
Export job completed successfully.
Downloading and processing dataset...
An error occurred: can not infer schema from empty dataset


In [13]:
# Survey Results - 3DE Staff
try:
    # Step 1: Create the export job
    print("Creating export job...")
    export_job_id = create_export_job(DATASET_ID, START_DATE, END_DATE, PARENT_ORG_UNIT_ID, 143)
    print(f"Export job created with ID: {export_job_id}")

    # Step 2: Poll the export job status
    print("Polling export job status...")
    poll_export_job_status(export_job_id)

    # Step 3: Download and directly process the CSV file
    print("Downloading and processing dataset...")
    df = download_export_job(export_job_id)
    if df is not None:
        # Step 4: Store directly in Lakehouse
        table_name = "3destaff_survey_results"
        store_in_lakehouse(df, table_name)
        print(f"Data stored in Lakehouse table: {table_name}")
    else:
        print("No CSV file found in the export.")

except requests.exceptions.HTTPError as err:
    print(f"HTTP error occurred: {err}")
    print(f"Response content: {err.response.text}")
except Exception as err:
    print(f"An error occurred: {err}")

StatementMeta(, 03f6b7c2-764f-490c-a517-602a15739615, 15, Finished, Available, Finished)

Creating export job...
Export job created with ID: 91cc0856-2ff2-42d0-bdc9-a57a36a52732
Polling export job status...
Export job status: 1. Polling again in 30 seconds...
Export job completed successfully.
Downloading and processing dataset...
An error occurred: can not infer schema from empty dataset


In [22]:
df = spark.sql("SELECT * FROM my3de_survey_bronze.ads_student_survey_results")
test = df.toPandas()
test['data_retrieval_date'].unique()

StatementMeta(, 0fab8ffd-2f75-42b9-b956-544c1ca9fbe2, 24, Finished, Available, Finished)

array(['2025-03-20'], dtype=object)