In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, MapType
from pyspark.sql.functions import *


class EdFi(BaseOEAModule):
    """
    Provides data processing methods for Ed-Fi data.
    Data is expected to be received via Ed-Fi API into stage1np/ed-fi
    The structure of the folders in stage1np will then be something like:
        -> stage1np/ed-fi
            -> stage1np/ed-fi/Student
            -> stage1np/ed-fi/Course
            etc

    In stage2, everything is written to stage2np/ed-fi and stage2p/ed-fi
    """

    def __init__(self, oea, source_folder='ed-fi'):
        BaseOEAModule.__init__(self, oea, source_folder)

        self.stage1np_edfi_data = self.stage1np + '/test_data'
        self.flatten_schema = {}

        self.schemas['Student'] = [['Id', 'string', 'no-op'],
                                    ['StudentUniqueId', 'string', 'hash'],
                                    ['BirthCity', 'string', 'no-op'],
                                    ['BirthDate', 'string', 'no-op'],
                                    ['BirthSexDescriptor', 'string', 'no-op'],
                                    ['FirstName', 'string', 'mask'],
                                    ['IdentificationDocuments', 'string', 'no-op'],
                                    ['LastSurname','string','mask'],
                                    ['OtherNames','string', 'no-op'],
                                    ['PersonalTitlePrefix', 'string', 'no-op'],
                                    ['PersonalIdentificationDocuments', 'string', 'no-op'],
                                    ['Visas', 'string', 'no-op']]

        self.schemas['School'] = [['Id','string','no-op'],
                                    ['SchoolId','string','no-op'],
                                    ['NameOfInstitution','string','no-op'],
                                    ['OperationalStatusDescriptor','string','no-op'],
                                    ['ShortNameOfInstitution','string','no-op'],
                                    ['Website','string','no-op'],
                                    ['AdministrativeFundingControlDescriptor','string','no-op'],
                                    ['CharterStatusDescriptor','string','no-op'],
                                    ['SchoolTypeDescriptor','string','no-op'],
                                    ['TitleIPartASchoolDesignationDescriptor','string','no-op'],
                                    ['Addresses','string','no-op'],
                                    ['EducationOrganizationCategories','string','no-op'],
                                    ['IdentificationCodes','string','no-op'],
                                    ['InstitutionTelephones','string','no-op'],
                                    ['InternationalAddresses','string','no-op'],
                                    ['SchoolCategories','string','no-op'],
                                    ['gradeLevels','string','no-op']]
    
        self.schemas['StudentSchoolAssociation'] = [['Id', 'string', 'no-op'],
                                                    ['GraduationPlanReference', 'string', 'no-op'],
                                                    ['SchoolReference', 'map', 'no-op', [
                                                        ['SchoolId', 'string', 'no-op'],
                                                        ['Link','string','no-op']
                                                    ]],
                                                    ['StudentReference', 'map', 'no-op', [
                                                        ['StudentUniqueId', 'string', 'no-op'],
                                                        ['Link', 'string', 'no-op']
                                                    ]],
                                                    ['EntryDate', 'string', 'no-op'],
                                                    ['EntryGradeLevelDescriptor', 'string', 'no-op'],
                                                    ['AlternativeGraduationPlans', 'string', 'no-op'],
                                                    ['EducationPlans', 'string', 'no-op']]

        self.schemas['Calendar'] = [['Id', 'string', 'no-op'],
                                    ['CalendarCode', 'string', 'no-op'],
                                    ['SchoolReference', 'map', 'no-op', [
                                        ['SchoolId', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ['SchoolYearTypeReference', 'map', 'no-op', [
                                        ['SchoolYear', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ['CalendarTypeDescriptor', 'string', 'no-op'],
                                    ['GradeLevel', 'string', 'no-op']]
        
        self.schemas['Course'] = [['Id', 'string', 'no-op'],
                                    ['EducationOrganizationReference', 'map', 'no-op', [
                                        ['EducationOrganizationId', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ['CourseCode', 'string', 'no-op'],
                                    ['AcademicSubjectDescriptor', 'string', 'no-op'],
                                    ['CourseDefinedByDescriptor', 'string', 'no-op'],
                                    ['CourseDescription', 'string', 'no-op'],
                                    ['CourseGPAApplicabilityDescriptor', 'string', 'no-op'],
                                    ['CourseTitle', 'string', 'no-op'],
                                    ['HighSchoolCourseRequirement', 'string', 'no-op'],
                                    ['NumberOfParts', 'string', 'no-op'],
                                    ['CompetencyLevels', 'string', 'no-op'],
                                    ['IdentificationCodes', 'string', 'no-op'],
                                    ['LearningObjectives', 'string', 'no-op'],
                                    ['LearningStandards', 'string', 'no-op'],
                                    ['LevelCharacteristics', 'string', 'no-op'],
                                    ['OfferedGradeLevels', 'string', 'no-op']]

        self.schemas['Staff'] = [["Id", 'string', 'no-op'],
                                    ["StaffUniqueId", 'string', 'no-op'],
                                    ["BirthDate", 'string', 'no-op'],
                                    ["FirstName", 'string', 'no-op'],
                                    ["HighestCompletedLevelOfEducationDescriptor", 'string', 'no-op'],
                                    ["HispanicLatinoEthnicity", 'string', 'no-op'],
                                    ["LastSurname", 'string', 'no-op'],
                                    ["LoginId", 'string', 'no-op'],
                                    ["PersonalTitlePrefix", 'string', 'no-op'],
                                    ["SexDescriptor", 'string', 'no-op'],
                                    ["YearsOfPriorProfessionalExperience", 'string', 'no-op'],
                                    ["Addresses", 'string', 'no-op'],
                                    ["AncestryEthnicOrigins", 'string', 'no-op'],
                                    ["Credentials", 'string', 'no-op'],
                                    ["ElectronicMails", 'string', 'no-op'],
                                    ["IdentificationDocuments", 'string', 'no-op'],
                                    ["InternationalAddresses", 'string', 'no-op'],
                                    ["Languages", 'string', 'no-op'],
                                    ["OtherNames", 'string', 'no-op'],
                                    ["PersonalIdentificationDocuments", 'string', 'no-op'],
                                    ["Races", 'string', 'no-op']]

        self.schemas['Session'] = [["Id", 'string', 'no-op'],
                                    ["SchoolReference", 'map', 'no-op', [
                                        ['SchoolId', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ["SchoolYearTypeReference", 'map', 'no-op', [
                                        ['SchoolYear', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ["SessionName", 'string', 'no-op'],
                                    ["BeginDate", 'string', 'no-op'],
                                    ["EndDate", 'string', 'no-op'],
                                    ["TermDescriptor", 'string', 'no-op'],
                                    ["TotalInstructionalDays", 'string', 'no-op'],
                                    ["GradingPeriods", 'string', 'no-op']]

        self.schemas['Section'] = [["Id", 'string', 'no-op'],
                                    ["CourseOfferingReference", 'map', 'no-op', [
                                        ['SchoolId', 'string', 'no-op'],
                                        ['LocalCourseCode', 'string', 'no-op'],
                                        ['SchoolYear', 'string', 'no-op'],
                                        ['SessionName', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ["LocationReference", 'map', 'no-op', [
                                        ['SchoolId', 'string', 'no-op'],
                                        ['ClassroomIdentificationCode', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ["LocationSchoolReference", 'map', 'no-op', [
                                        ['SchoolId', 'string', 'no-op'],
                                        ['Link','string','no-op']
                                    ]],
                                    ["SectionIdentifier", 'string', 'no-op'],
                                    ["AvailableCredits", 'string', 'no-op'],
                                    ["EducationalEnvironmentDescriptor", 'string', 'no-op'],
                                    ["SectionName", 'string', 'no-op'],
                                    ["SequenceOfCourse", 'string', 'no-op'],
                                    ["Characteristics", 'string', 'no-op'],
                                    ["ClassPeriods", 'string', 'no-op'],
                                    ["CourseLevelCharacteristics", 'string', 'no-op'],
                                    ["OfferedGradeLevels", 'string', 'no-op'],
                                    ["Programs", 'string', 'no-op']]

        self.schemas['StudentSectionAssociation'] = [['Id', 'string', 'no-op'],
                                                        ['SectionReference', 'map', 'no-op', [
                                                            ['SchoolId', 'string', 'no-op'],
                                                            ['LocalCourseCode', 'string', 'no-op'],
                                                            ['SchoolYear', 'string', 'no-op'],
                                                            ['SectionIdentifier', 'string', 'no-op'],
                                                            ['SessionName', 'string', 'no-op'],
                                                            ['Link','string','no-op']
                                                        ]],
                                                        ['StudentReference', 'map', 'no-op', [
                                                            ['StudentUniqueId', 'string', 'no-op'],
                                                            ['Link', 'string', 'no-op']
                                                        ]],
                                                        ['BeginDate', 'string', 'no-op'],
                                                        ['EndDate', 'string', 'no-op'],
                                                        ['HomeroomIndicator', 'string', 'no-op']]

        self.schemas['StaffSectionAssociation'] = [['Id', 'string', 'no-op'],
                                                    ['SectionReference', 'map', 'no-op', [
                                                            ['SchoolId', 'string', 'no-op'],
                                                            ['LocalCourseCode', 'string', 'no-op'],
                                                            ['SchoolYear', 'string', 'no-op'],
                                                            ['SectionIdentifier', 'string', 'no-op'],
                                                            ['SessionName', 'string', 'no-op'],
                                                            ['Link','string','no-op']
                                                        ]],
                                                    ['StaffReference', 'map', 'no-op', [
                                                        ['StaffUniqueId', 'string', 'no-op'],
                                                        ['Link', 'string', 'no-op']
                                                    ]],
                                                    ['BeginDate', 'string', 'no-op'],
                                                    ['ClassroomPositionDescriptor', 'string', 'no-op'],
                                                    ['EndDate', 'string', 'no-op']]

        self.schemas['StaffSchoolAssociation'] = [['Id', 'string', 'no-op'],
                                                    ['SchoolReference', 'map', 'no-op', [
                                                        ['SchoolId', 'string', 'no-op'],
                                                        ['Link','string','no-op']
                                                    ]],
                                                    ['StaffReference', 'map', 'no-op', [
                                                        ['StaffUniqueId', 'string', 'no-op'],
                                                        ['Link', 'string', 'no-op']
                                                    ]],
                                                    ['ProgramAssignmentDescriptor', 'string', 'no-op'],
                                                    ['AcademicSubjects', 'string', 'no-op'],
                                                    ['GradeLevels', 'string', 'no-op']]

                                                           
    def json_to_spark_schema(self, schema):
        fields = []
        for row in schema:
            dtype = MapType(StringType(),StringType(),True) if row[1] == 'map' else globals()[row[1].lower().capitalize() + "Type"]()
            fields.append(StructField(row[0], dtype, True))
        return StructType(fields)
    
    def get_operation_schema(self, schema):
        fields = []
        for row in schema:
            if len(row) == 3:
                fields.append(row)
            else:
                fields += [[row[0]+'_'+i[0],i[1],i[2]] for i in row[3]]
        return fields

    def process_data_from_stage1(self):
        """ Processes Ed-Fi data from stage1 into stage2 using structured streaming. 
            https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
        """
        logger.debug("Processing ed-fi api data from: " + self.stage1np_edfi_data)
        items = mssparkutils.fs.ls(self.stage1np_edfi_data)
        self.entities = [f.name.split('.')[0] for f in items]
        #self.entities = ['StudentSchoolAssociation']
        for entity in self.entities:
            self._process_edfi_entity(entity)

    def _process_edfi_entity(self, entity):
        spark_schema = self.json_to_spark_schema(self.schemas[entity])
        df = spark.read.format('json').load(path=self.stage1np_edfi_data + '/' + entity +'.json',schema=spark_schema)
        df = self.flatten_references(df,entity)
        df.write.format('parquet').mode('overwrite').save(self.stage2np + '/' + entity)   
        sc = self.get_operation_schema(self.schemas[entity])
        print(sc)
        df_pseudo, df_lookup = self.oea.pseudonymize(df, self.get_operation_schema(self.schemas[entity]))
        if len(df_pseudo.columns) > 0: 
            df_pseudo.write.format('parquet').mode('overwrite').save(self.stage2p + '/' + entity)
        if len(df_lookup.columns) > 0: 
            df_lookup.write.format('parquet').mode('overwrite').save(self.stage2np + '/' + entity + '_lookup')

    def flatten_references(self, df, entity):
        for row in self.schemas[entity]:
            if len(row) == 4:
                for child in row[3]:
                    df = df.withColumn(row[0]+'_'+child[0], df[row[0]].getItem(child[0]))
                df = df.drop(row[0])
        return df

    def process_data_from_stage2(self):
        logger.debug("Load data stored in stage2p and stage2np to spark db")
        for entity in self.entities:
            self.create_table('s2_edfi', entity, self.stage2p + '/' + entity)
            self.create_table('s2np_edfi', entity, self.stage2np + '/' + entity)

    def reset_edfi_data(self):
        """Resets all data in stage2p and stage2np"""
        self.oea.rm_if_exists(self.stage2np)
        self.oea.rm_if_exists(self.stage2p)
        logger.info(f"Deleted {self.stage2p} and {self.stage2np}")  


    def create_table(self, db_name, table, source_path):
        spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
        spark.sql(f"DROP TABLE IF EXISTS {db_name}.{table}")
        spark.sql(f"create table if not exists {db_name}.{table} using PARQUET location '{source_path}'")
