In [None]:
import pandas as pd
from prefect import flow, task

Extract

Define helper functions to read in data for each school, specifying column types where necessary.
Rename columns.
Add column for source file.

These helper functions will be combined into a prefect task.

School A

In [None]:
def extract_a():
    s_a = (
        pd.read_csv(
            "raw data/schoolA.csv",
            dtype={
                "StudentID": str,
                "SchoolID": str,
                "Mathematics_Score": pd.Int64Dtype(),
                "Reading_Score": pd.Int64Dtype(),
            },
        )
        .rename(
            columns={
                "StudentID": "Student_id",
                "SchoolID": "School_code",
                "Test_Date": "Assessment_date",
                "Mathematics_Score": "Mathematics_Score",
                "Mathematics_Met_Growth_Target": "Mathematics_Met_Growth_Target",
                "Reading_Score": "Reading_Score",
                "Reading_Met_Growth_Target": "Reading_Met_Growth_Target",
            }
        )
        .assign(source_file="schoolA.csv")
    )

    return s_a

School B

In [None]:
def extract_b():
    s_b = (
        pd.read_csv(
            "raw data/schoolB.csv",
            dtype={
                "Student_ID": str,
                "SchoolID": str,
                "Mathematics_Score": pd.Int64Dtype(),
                "Reading_Score": pd.Int64Dtype(),
            },
        )
        .rename(
            columns={
                "Student_ID": "Student_id",
                "SchoolID": "School_code",
                "TestDate": "Assessment_date",
                "Mathematics_Score": "Mathematics_Score",
                "Mathematics_Met_Growth_Target": "Mathematics_Met_Growth_Target",
                "Reading_Score": "Reading_Score",
                "Reading_Met_Growth_Target": "Reading_Met_Growth_Target",
            }
        )
        .assign(source_file="schoolB.csv")
    )

    return s_b

School C

In [None]:
def extract_c():
    s_c = (
        pd.read_csv(
            "raw data/schoolC.csv",
            dtype={
                "StudentID": str,
                "SchoolID": str,
                "Mathematics_Score": pd.Int64Dtype(),
                "Reading_Score": pd.Int64Dtype(),
            },
        )
        .rename(
            columns={
                "StudentID": "Student_id",
                "SchoolID": "School_code",
                "Test_Date": "Assessment_date",
                "Mathematics_Score": "Mathematics_Score",
                "Mathematics_Met_Growth_Target": "Mathematics_Met_Growth_Target",
                "Reading_Score": "Reading_Score",
                "Reading_Met_Growth_Target": "Reading_Met_Growth_Target",
            }
        )
        .assign(source_file="schoolC.csv")
    )

    return s_c

School D

In [None]:
def extract_d():
    s_d = (
        pd.read_csv(
            "raw data/schoolD.csv",
            dtype={
                "StudentID": str,
                "SchoolID": str,
                "Mathematics_Score": pd.Int64Dtype(),
                "Reading_Score": pd.Int64Dtype(),
            },
        )
        .rename(
            columns={
                "StudentID": "Student_id",
                "SchoolID": "School_code",
                "Test_Date": "Assessment_date",
                "Mathematics_Score": "Mathematics_Score",
                "Mathematics_Met_Growth_Target": "Mathematics_Met_Growth_Target",
                "Reading_Score": "Reading_Score",
                "Reading_Met_Growth_Target": "Reading_Met_Growth_Target",
            }
        )
        .assign(source_file="schoolD.csv")
    )

    return s_d

School E

In [None]:
def extract_e():
    s_e = (
        pd.read_table(
            "raw data/schoolE.csv",
            dtype={
                "StudentID": str,
                "SchoolID": str,
                "Mathematics_Score": pd.Int64Dtype(),
                "Reading_Score": pd.Int64Dtype(),
            },
        )
        .rename(
            columns={
                "StudentID": "Student_id",
                "SchoolID": "School_code",
                "Test_Date": "Assessment_date",
                "Mathematics_Score": "Mathematics_Score",
                "Mathematics_Met_Growth_Target": "Mathematics_Met_Growth_Target",
                "Reading_Score": "Reading_Score",
                "Reading_Met_Growth_Target": "Reading_Met_Growth_Target",
            }
        )
        .assign(source_file="schoolE.csv")
    )

    return s_e

Concatenate into single table.

In [None]:
@task
def extract() -> pd.DataFrame:
    s_concat = pd.concat(
        [extract_a(), extract_b(), extract_c(), extract_d(), extract_e()]
    ).loc[
        :,
        [
            "Student_id",
            "School_code",
            "Assessment_date",
            "Mathematics_Score",
            "Mathematics_Met_Growth_Target",
            "Reading_Score",
            "Reading_Met_Growth_Target",
            "source_file",
        ],
    ]

    return s_concat

Transform

Pad student id and school id with zeros on left for consistency.
Standardize assessment dates.
Recode values for met growth target to boolean.

In [None]:
@task
def transform(s_concat: pd.DataFrame) -> pd.DataFrame:
    s_concat["Student_id"] = s_concat["Student_id"].str.pad(
        width=10, side="left", fillchar="0"
    )
    s_concat["School_code"] = s_concat["School_code"].str.pad(
        width=6, side="left", fillchar="0"
    )

    s_concat["Assessment_date"] = pd.to_datetime(
        s_concat["Assessment_date"], format="mixed"
    )

    condition_metgrowth = {"Yes": True, "Yes*": True, "No": False, "No*": False}
    s_concat["Mathematics_Met_Growth_Target"] = s_concat[
        "Mathematics_Met_Growth_Target"
    ].map(condition_metgrowth)
    s_concat["Reading_Met_Growth_Target"] = s_concat["Reading_Met_Growth_Target"].map(
        condition_metgrowth
    )
    s_concat = s_concat.astype(
        {"Mathematics_Met_Growth_Target": bool, "Reading_Met_Growth_Target": bool}
    )

    return s_concat

Load

Write to csv.

In [None]:
@task
def load(s_concat: pd.DataFrame) -> None:
    s_concat.to_csv("schoolsAll.csv", index=False)

Error File

Create error flags for null values.

In [None]:
@task
def error(s_concat: pd.DataFrame) -> pd.DataFrame:
    err = s_concat.assign(
        student_isnull=lambda x: x.Student_id.isnull(),
        school_isnull=lambda x: x.School_code.isnull(),
        date_isnull=lambda x: x.Assessment_date.isnull(),
        math_isnull=lambda x: x.Mathematics_Score.isnull(),
        reading_isnull=lambda x: x.Reading_Score.isnull(),
    )

    err["is_error"] = (
        err.student_isnull
        | err.school_isnull
        | err.date_isnull
        | err.math_isnull
        | err.reading_isnull
    )

    err.to_csv("error.csv", index=False)

    return err

Assemble pipeline from extract, transform, load, and error tasks.

In [None]:
@flow(name="etl", log_prints=True)
def etl():
    extracted = extract()
    transformed = transform(extracted)
    load(transformed)
    error(transformed)


if __name__ == "__main__":
    etl()