## This notebook provides basic queries for exploring data issues regarding the ETL process for integration with SDS.

You'll need to have an instance of Synapse workspace in Azure, and will need to grant your synapse instance the role assignment "Storage Blob Data Contributor" for your storage account.


In [57]:
data_path = 'abfss://sds@stcontososdsetl.dfs.core.windows.net/2020-10-05T1150_09/profile1'

# Load the inbound data
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/school.csv', header='true'), 'school')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/section.csv', header='true'), 'section')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/teacher.csv', header='true'), 'teacher')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/teacherroster.csv', header='true'), 'teacherroster')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/student.csv', header='true'), 'student')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/studentenrollment.csv', header='true'), 'studentenrollment')

# Load the directly invalidated data (data with missing required fields)
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_school_records.csv', header='true'), 'i_school')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_section_records.csv', header='true'), 'i_section')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_teacher_records.csv', header='true'), 'i_teacher')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_teacherroster_records.csv', header='true'), 'i_teacherroster')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_studentenrollment_records.csv', header='true'), 'i_studentenrollment')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/invalid/invalid_student_records.csv', header='true'), 'i_student')

# Now load the validated data
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/school.csv', header='true'), 'v_school')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/section.csv', header='true'), 'v_section')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/teacher.csv', header='true'), 'v_teacher')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/teacherroster.csv', header='true'), 'v_teacherroster')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/studentenrollment.csv', header='true'), 'v_studentenrollment')
sqlContext.registerDataFrameAsTable(spark.read.csv(f'{data_path}/validated/student.csv', header='true'), 'v_student')

In [58]:
school_count = sqlContext.sql('select count(*) x from school').first()['x']
teacher_count = sqlContext.sql('select count(*) x from teacher').first()['x']
teacherroster_count = sqlContext.sql('select count(*) x from teacherroster').first()['x']
section_count = sqlContext.sql('select count(*) x from section').first()['x']
student_count = sqlContext.sql('select count(*) x from student').first()['x']
studentenrollment_count = sqlContext.sql('select count(*) x from studentenrollment').first()['x']

v_school_count = sqlContext.sql('select count(*) x from v_school').first()['x']
v_teacher_count = sqlContext.sql('select count(*) x from v_teacher').first()['x']
v_teacherroster_count = sqlContext.sql('select count(*) x from v_teacherroster').first()['x']
v_section_count = sqlContext.sql('select count(*) x from v_section').first()['x']
v_student_count = sqlContext.sql('select count(*) x from v_student').first()['x']
v_studentenrollment_count = sqlContext.sql('select count(*) x from v_studentenrollment').first()['x']

print(f'schools: {school_count} -> {v_school_count} ({school_count - v_school_count})\nsections: {section_count} -> {v_section_count} ({section_count - v_section_count})\nteachers: {teacher_count} -> {v_teacher_count} ({teacher_count - v_teacher_count})\nteacherroster: {teacherroster_count} -> {v_teacherroster_count} ({teacherroster_count-v_teacherroster_count})\nstudent: {student_count} -> {v_student_count} ({student_count-v_student_count})\nstudentenrollment: {studentenrollment_count} -> {v_studentenrollment_count} ({studentenrollment_count-v_studentenrollment_count})')

schools: 6 -> 6 (0)
sections: 156 -> 142 (14)
teachers: 12 -> 11 (1)
teacherroster: 154 -> 142 (12)
student: 60 -> 60 (0)
studentenrollment: 720 -> 650 (70)

In [59]:
# Get a list of all the sections that were found to be invalid (the delta between what came in and what is being sent to SDS)
section_delta = sqlContext.sql('select * from section where `SIS ID` not in (select `SIS ID` from v_section)')
display(section_delta)
# write out the delta to a file
#section_delta.write.mode('overwrite').csv(f'{data_path}/section_delta', header = 'true')

In [61]:
# show all sections and assigned teachers
#df = sqlContext.sql('select section.*, v_teacherroster.`SIS ID` as `teacherroster SIS ID` from section left join v_teacherroster on section.`SIS ID` = v_teacherroster.`Section SIS ID`')

# get a list of sections that don't have a teacher assigned in teacherroster
df = sqlContext.sql('select section.*, v_teacherroster.`SIS ID` as `teacherroster SIS ID` from section left join v_teacherroster on section.`SIS ID` = v_teacherroster.`Section SIS ID` where v_teacherroster.`SIS ID` is null')


print("Total of " + str(df.count()) + " sections with no assigned teacher.")
df.show(df.count())

# write out the list of sections that don't have an assigned teacher
df.write.mode('overwrite').csv(f'{data_path}/sections_without_teachers', header = 'true')

Total of 14 sections with no assigned teacher.
+------+-------------+--------------------+--------------+-----------+---------------+--------------+------------+-------------+--------------------+-------------+--------------------+--------------------+-------+------+--------------------+
|SIS ID|School SIS ID|        Section Name|Section Number|Term SIS ID|      Term Name|Term StartDate|Term EndDate|Course SIS ID|         Course Name|Course Number|  Course Description|      Course Subject|Periods|Status|teacherroster SIS ID|
+------+-------------+--------------------+--------------+-----------+---------------+--------------+------------+-------------+--------------------+-------------+--------------------+--------------------+-------+------+--------------------+
|   106|         sch5|Technology - Prog...|           106|      term5|  Fall Semester|      9/1/2019|  12/22/2019|     course47|Technology - Prog...|           47|Instruction cover...|Technology - Prog...|      2|Active|       