In [1]:
import pandas as pd
import datetime
import csv


# Path to CSV and Parquet files
PATH_1_1 = 'source/carriers.csv'
PATH_1_2 = 'raw/carriers/part-00001-366e67ad-4fd6-41cd-af99-1ff7b3e314db-c000.snappy.parquet'
PATH_2_1 = 'source/airports.csv'
PATH_2_2 = 'raw/airports/part-00001-a9aee747-6f56-4317-bf6b-075fe3b3ed5f-c000.snappy.parquet'
PATH_3_1 = 'source/flights.csv'
PATH_3_2 = 'raw/flights/part-00000-55c5be74-a9db-4265-8f2c-bceb8279269e-c000.snappy.parquet'

# Read CSV and Parquet files
df_1_1 = pd.read_csv(PATH_1_1)
df_1_2 = pd.read_parquet(PATH_1_2)
df_2_1 = pd.read_csv(PATH_2_1)
df_2_2 = pd.read_parquet(PATH_2_2)
df_3_1 = pd.read_csv(PATH_3_1)
df_3_2 = pd.read_parquet(PATH_3_2)

# Table's names
table_1 = 'carriers'
table_2 = 'airports'
table_3 = 'flights'

# Set path for result_file
file_path = f"./result_{datetime.datetime.now()}.csv"

result_list = list()

# Create class for '#' values
class check_id:
    def __init__(self, initial_value=1):
        self.counter = initial_value

    def get_next(self):
        current_value = self.counter
        self.counter += 1
        return current_value
    
check_id = check_id()

In [2]:
# Check 1-3: Count records for all tables
def check_record_counts(table, df_source, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Completeness"
    column = "All"
    if len(df_source)==len(df_raw):
        
        return [f"{check_id}", f"{table}", f"{dq_check}", f"{column}", f"Pass", f"Counts are the same"]
    else:
        discretpency = len(df_source)-len(df_raw)
        return [f"{check_id}", f"{table}", f"{dq_check}", f"{column}", f"Fail", f"Source have more records: {discretpency}"]

In [3]:
# Check 4: Count null values for columns 'Code' in the source table 'carriers'
def check_null_values_for_source_code(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    null_count_code_csv = df_source['Code'].isnull().sum()
    dq_check = "Completeness"
    if null_count_code_csv>0:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Code", f"Fail", f"{null_count_code_csv}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Code", f"Pass", f"There are no NULL values."]


# Check 5: Count null values for columns 'Code' in the raw table 'carriers'
def check_null_values_for_raw_code(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    null_count_code_parquet = df_raw['code'].isnull().sum()
    dq_check = "Completeness"
    if null_count_code_parquet>0:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Code", f"Fail", f"{null_count_code_parquet}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Code", f"Pass", f"There are no NULL values."]
    

#  Check 6: Count null values for columns 'Description' in the source table 'carriers'
def check_null_value_for_source_desciption(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    null_count_description_csv = df_source['Description'].isnull().sum()
    dq_check = "Completeness"
    if null_count_description_csv>0:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Description", f"Fail", f"{null_count_description_csv}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Description", f"Pass", f"There are no NULL values."]

    
#  Check 7: Count null values for columns 'Description' in the raw table 'carriers'
def check_null_value_for_raw_desciption(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    null_count_description_parquet = df_raw['description'].isnull().sum()
    dq_check = "Completeness"
    if null_count_description_parquet>0:
        null_count = null_count_description_parquet
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Description", f"Fail", f"{null_count_description_parquet}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Description", f"Pass", f"There are no NULL values."]

In [4]:
#  Check 8: Check if all values in 'State' column have length equals to 2 in the source airports table.
def check_state_length_for_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Consistency"
    # Exclude empty values
    non_empty_df = df_source[df_source['state'].notna()]
    # Check if all non-empty values in 'state' column have a length of 2
    bad_state_records = non_empty_df[~non_empty_df['state'].astype(str).str.len().eq(2)]
    if not bad_state_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"State", f"Fail", f"Bad records found:\n{bad_state_records}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"State", f"Pass", 'All values are correct.']
    

#  Check 9: Check if all values in 'State' column have length equals to 2 in the source airports table.
def check_state_length_for_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Consistency"
    # Exclude empty values
    non_empty_df = df_raw[df_raw['state'].notna()]
    # Check if all non-empty values in 'state' column have a length of 2
    bad_state_records = non_empty_df[~non_empty_df['state'].astype(str).str.len().eq(2)]
    if not bad_state_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"State", f"Fail", f"Bad records found:\n{bad_state_records}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"State", f"Pass", 'All values are correct.']

In [5]:
# Check 10: Check if all values for USA are consistent (Should be not 'NA' and not NULL) in the soure table airports.
def check_state_na_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    na_records = df_source[(df_source['state'].isna()) & (df_source['country'] == 'USA')]
    dq_check = "Consistency"
    if not na_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"State, Country", f"Fail", f"Bad records found:\n{na_records}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"State, Country", f"Pass", 'All values are correct.']
    
# Check 11: Check if all values for USA are consistent (Should be not 'NA' and not NULL) in the raw table airports.
def check_state_na_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    na_records = df_raw[(df_raw['state'].isna()) & (df_raw['country'] == 'USA')]
    dq_check = "Consistency"
    if not na_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"State, Country", f"Fail", f"Bad records found:\n{na_records}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"State, Country", f"Pass", "All values are correct."]

In [6]:
# Check 12: Check if all records have value '2008'  for 'Year' column in the source table flights.
def check_year_values_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[~df_source['Year'].eq(2008)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Year", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Year", f"Pass", "All values are equal to 2008."]

    
# Check 13: Check if all records have value '2008' for 'Year' column in the raw table flights.
def check_year_values_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_raw[~df_raw['Year'].eq(2008)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Year", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Year", f"Pass", "All values are equal to 2008."]

In [7]:
# Check 14: Check if all records have value '1' and not NULL for 'Month' column in the source table flights.
def check_month_values_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[~df_source['Month'].eq(1)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Month", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Month", f"Pass", "All values are equal to 1"]
    
# Check 15: Check if all records have value '1' and not NULL for 'Month' column in the raw table flights.
def check_month_values_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_raw[~df_raw['Month'].eq(1)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Month", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Month", f"Pass", "All values are equal to 1"]

In [8]:
# Check 16: Check if all values in 'DayofMonth' column are in the range of 1 to 31 and not NULL in the source table fligths.
def check_dayofmonth_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['DayofMonth'] < 1) | (df_source['DayofMonth'] > 31)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DayofMonth", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DayofMonth", f"Pass", "All values are in the range of 1 to 31."]
    
# Check 17: Check if all values in 'DayofMonth' column are in the range of 1 to 31 and not NULL in the raw table fligths.
def check_dayofmonth_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
    # Convert 'DayofMonth' column to numeric values
    df_raw['DayofMonth'] = pd.to_numeric(df_raw['DayofMonth'], errors='coerce')
    
    bad_records = df_raw[(df_raw['DayofMonth'] < 1) | (df_raw['DayofMonth'] > 31)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DayofMonth", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DayofMonth", f"Pass", "All values are in the range of 1 to 31."]

In [9]:
# Check 18: Check if all values in 'DayOfWeek' column are in the range of 1 to 7 and and not NULL in the source table fligths.
def check_dayofweek_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['DayOfWeek'] < 1) | (df_source['DayOfWeek'] > 7)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DayOfWeek", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DayOfWeek", f"Pass", "All values are in the range of 1 to 7."]
    
# Check 19: Check if all values in 'DayOfWeek' column are in the range of 1 to 7 and and not NULL in the raw table fligths.
def check_dayofweek_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
     # Convert 'DayOfWeek' column to numeric values
    df_raw['DayOfWeek'] = pd.to_numeric(df_raw['DayOfWeek'], errors='coerce')
    
    bad_records = df_raw[(df_raw['DayOfWeek'] < 1) | (df_raw['DayOfWeek'] > 7)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DayOfWeek", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DayOfWeek", f"Pass", "All values are in the range of 1 to 7."]

In [10]:
# Check 20: Check if all values in 'DepTime' column are in the range of 0 to 2359 in the source table fligths.
def check_deptime_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['DepTime'] < 0) | (df_source['DepTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DepTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"DepTime", f"Pass", "All values are in the range of 0 to 2359."]
    
# Check 21: Check if all values in 'DepTime' column are in the range of 0 to 2359 in the raw table fligths.
def check_deptime_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
    # Convert 'DepTime' column to numeric values
    df_raw['DepTime'] = pd.to_numeric(df_raw['DepTime'], errors='coerce')
    
    bad_records = df_raw[(df_raw['DepTime'] < 0) | (df_raw['DepTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DepTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"DepTime", f"Pass", "All values are in the range of 0 to 2359."]

In [11]:
# Check 22: Check if all values in 'CRSDepTime' column are in the range of 0 to 2359 in the source table fligths.
def check_crsdeptime_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['CRSDepTime'] < 0) | (df_source['CRSDepTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"CRSDepTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"CRSDepTime", f"Pass", "All values are in the range of 0 to 2359."]
    
# Check 23: Check if all values in 'CRSDepTime' column are in the range of 0 to 2359 in the raw table fligths.
def check_crsdeptime_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
    # Convert 'CRSDepTime' column to numeric values
    df_raw['CRSDepTime'] = pd.to_numeric(df_raw['CRSDepTime'], errors='coerce')
    
    bad_records = df_raw[(df_raw['CRSDepTime'] < 0) | (df_raw['CRSDepTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"CRSDepTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"CRSDepTime", f"Pass", "All values are in the range of 0 to 2359."]

In [12]:
# Check 24: Check if all values in 'ArrTime' column are in the range of 0 to 2359 in the source table fligths.
def check_arrtime_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['ArrTime'] < 0) | (df_source['ArrTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ArrTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ArrTime", f"Pass", "All values are in the range of 0 to 2359."]
    
# Check 25: Check if all values in 'ArrTime' column are in the range of 0 to 2359 in the raw table fligths.
def check_arrtime_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
    # Convert 'ArrTime' column to numeric values
    df_raw['ArrTime'] = pd.to_numeric(df_raw['ArrTime'], errors='coerce')
    
    bad_records = df_raw[(df_raw['ArrTime'] < 0) | (df_raw['ArrTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ArrTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ArrTime", f"Pass", "All values are in the range of 0 to 2359."]

In [13]:
# Check 26: Check if all values in 'CRSArrTime' column are in the range of 0 to 2359 in the source table fligths.
def check_crsarrtime_range_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    bad_records = df_source[(df_source['CRSArrTime'] < 0) | (df_source['CRSArrTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ArrTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ArrTime", f"Pass", "All values are in the range of 0 to 2359."]
    
# Check 27: Check if all values in 'CRSArrTime' column are in the range of 0 to 2359 in the raw table fligths.
def check_crsarrtime_range_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Validity"
    
    # Convert 'CRSArrTime' column to numeric values
    df_raw['CRSArrTime'] = pd.to_numeric(df_raw['CRSArrTime'], errors='coerce')

    bad_records = df_raw[(df_raw['CRSArrTime'] < 0) | (df_raw['CRSArrTime'] > 2359)]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ArrTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ArrTime", f"Pass", "All values are in the range of 0 to 2359."]

In [14]:
# Check 28: Check if 'ActualElapsedTime' column is calculated right : ArrTime - DepTime (mins) in the source table flights.
def check_calculated_fields_source_1_1(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Consistency"
    
    # Convert 'ArrTime' and 'DepTime' columns to numeric values
    df_source['ArrTime'] = pd.to_numeric(df_source['ArrTime'], errors='coerce')
    df_source['DepTime'] = pd.to_numeric(df_source['DepTime'], errors='coerce')

    # Calculate the expected 'ActualElapsedTime' column
    expected_values = df_source['ArrTime'] - df_source['DepTime']

    # Compare the calculated 'ActualElapsedTime' with the expected values
    is_correct = df_source['ActualElapsedTime'] == expected_values

    # Get the bad records where the calculation is incorrect
    bad_records = df_source[~is_correct]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ActualElapsedTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"ActualElapsedTime", f"Pass", "All values calculated correctly."]

# Check 29: Check if 'ActualElapsedTime' column is calculated right : ArrTime - CRSArrTime (mins) in the raw table flights.
def check_calculated_fields_raw_1_2(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Consistency"
    
    # Convert 'ArrTime' and 'CRSArrTime' columns to numeric values
    df_raw['ArrTime'] = pd.to_numeric(df_raw['ArrTime'], errors='coerce')
    df_raw['DepTime'] = pd.to_numeric(df_raw['DepTime'], errors='coerce')

    # Calculate the expected 'ActualElapsedTime' column
    expected_values = df_raw['ArrTime'] - df_raw['DepTime']

    # Compare the calculated 'ActualElapsedTime' with the expected values
    is_correct = df_raw['ActualElapsedTime'] == expected_values

    # Get the bad records where the calculation is incorrect
    bad_records = df_raw[~is_correct]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ActualElapsedTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"ActualElapsedTime", f"Pass", "All values calculated correctly."]

In [15]:
# Check 30: Check if 'CRSElapsedTime' column is calculated right : CRSArrTime - CRSDepTime (mins) in the source table flights.
def check_calculated_fields_source_2_1(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Consistency"
    # Calculate the expected 'CRSElapsedTime' column
    expected_values = df_source['CRSArrTime'] - df_source['CRSDepTime']

    # Compare the calculated 'ActualElapsedTime' with the expected values
    is_correct = df_source['CRSElapsedTime'] == expected_values

    # Get the bad records where the calculation is incorrect
    bad_records = df_source[~is_correct]
    if not bad_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"CRSElapsedTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"CRSElapsedTime", f"Pass", "All values calculated right."]
    
# Check 31: Check if 'CRSElapsedTime' column is calculated right : ArrTime - CRSArrTime (mins) in the raw table flights.
def check_calculated_fields_raw_2_2(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    # Convert 'CRSArrTime' and 'CRSDepTime' columns to numeric values
    df_raw['CRSArrTime'] = pd.to_numeric(df_raw['CRSArrTime'], errors='coerce')
    df_raw['CRSDepTime'] = pd.to_numeric(df_raw['CRSDepTime'], errors='coerce')
    
    dq_check = "Consisntency"
    # Calculate the expected 'CRSElapsedTime' column
    expected_values = df_raw['CRSArrTime'] - df_raw['CRSDepTime']

    # Compare the calculated 'CRSElapsedTime' with the expected values
    is_correct = df_raw['CRSElapsedTime'] == expected_values

    # Get the bad records where the calculation is incorrect
    bad_records = df_raw[~is_correct]
    if not bad_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"CRSElapsedTime", f"Fail", f"Bad records found:\n{str(bad_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"CRSElapsedTime", f"Pass", "All values calculated right."]

In [16]:
# Check 32: Check PK uniqueness for the source table carries.
def check_primary_key_uniqueness_carriers_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['Code', 'Description']
    duplicate_records = df_source[df_source.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Code, Description", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Code, Description", f"Pass", "No duplicate records found."]
    
# Check 33: Check PK uniqueness for the raw table carries.
def check_primary_key_uniqueness_carriers_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['code', 'description']
    duplicate_records = df_raw[df_raw.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Code, Description", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Code, Description", f"Pass", "No duplicate records found."]

In [17]:
# Check 34: Check PK uniqueness for the source table airports.
def check_primary_key_uniqueness_airports_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['iata']
    duplicate_records = df_source[df_source.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Iata", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Iata", f"Pass", "No duplicate records found."]
    
# Check 35: Check PK uniqueness for the raw table airports.
def check_primary_key_uniqueness_airports_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['iata']
    duplicate_records = df_raw[df_raw.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Iata", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Iata", f"Pass", "No duplicate records found."]

In [18]:
# Check 36: Check PK uniqueness for the source table flights.
def check_primary_key_uniqueness_flights_source(table, df_source, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']
    duplicate_records = df_source[df_source.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Year, Month, DayofMonth, DepTime, FlightNum", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Source/{table}", f"{dq_check}", f"Year, Month, DayofMonth, DepTime, FlightNum", f"Pass", "No duplicate records found."]
    
# Check 37: Check PK uniqueness for the raw table flights.
def check_primary_key_uniqueness_flights_raw(table, df_raw, check_id = check_id):
    check_id = check_id.get_next()
    
    dq_check = "Uniqueness"
    primary_key_cols = ['Year', 'Month', 'DayofMonth', 'DepTime', 'FlightNum']
    duplicate_records = df_raw[df_raw.duplicated(subset=primary_key_cols, keep=False)]
    if not duplicate_records.empty:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Year, Month, DayofMonth, DepTime, FlightNum", f"Fail", f"Bad records found:\n{str(duplicate_records)}"]
    else:
        return [f"{check_id}", f"Raw/{table}", f"{dq_check}", f"Year, Month, DayofMonth, DepTime, FlightNum", f"Pass", "No duplicate records found."]

In [19]:
# Run all checks and append to result_list
result_list.append(check_record_counts(table_1, df_1_1, df_1_2))
result_list.append(check_record_counts(table_2, df_2_1, df_2_2))
result_list.append(check_record_counts(table_3, df_3_1, df_3_2))

result_list.append(check_null_values_for_source_code(table_1, df_1_1))
result_list.append(check_null_values_for_raw_code(table_1, df_1_2))
result_list.append(check_null_value_for_source_desciption(table_1, df_1_1))
result_list.append(check_null_value_for_raw_desciption(table_1, df_1_2))

result_list.append(check_state_length_for_source(table_2, df_2_1))
result_list.append(check_state_length_for_raw(table_2, df_2_2))

result_list.append(check_state_na_source(table_2, df_2_1))
result_list.append(check_state_na_raw(table_2, df_2_2))

result_list.append(check_year_values_source(table_3, df_3_1))
result_list.append(check_year_values_raw(table_3, df_3_2))

result_list.append(check_month_values_source(table_3, df_3_1))
result_list.append(check_month_values_raw(table_3, df_3_2))

result_list.append(check_dayofmonth_range_source(table_3, df_3_1))
result_list.append(check_dayofmonth_range_raw(table_3, df_3_2))

result_list.append(check_dayofweek_range_source(table_3, df_3_1))
result_list.append(check_dayofweek_range_raw(table_3, df_3_2))

result_list.append(check_deptime_range_source(table_3, df_3_1))
result_list.append(check_deptime_range_raw(table_3, df_3_2))

result_list.append(check_crsdeptime_range_source(table_3, df_3_1))
result_list.append(check_crsdeptime_range_raw(table_3, df_3_2))

result_list.append(check_arrtime_range_source(table_3, df_3_1))
result_list.append(check_arrtime_range_raw(table_3, df_3_2))

result_list.append(check_crsarrtime_range_source(table_3, df_3_1))
result_list.append(check_crsarrtime_range_raw(table_3, df_3_2))

result_list.append(check_calculated_fields_source_1_1(table_3, df_3_1))
result_list.append(check_calculated_fields_raw_1_2(table_3, df_3_2))

result_list.append(check_calculated_fields_source_2_1(table_3, df_3_1))
result_list.append(check_calculated_fields_raw_2_2(table_3, df_3_2))

result_list.append(check_primary_key_uniqueness_carriers_source(table_1, df_1_1))
result_list.append(check_primary_key_uniqueness_carriers_raw(table_1, df_1_2))

result_list.append(check_primary_key_uniqueness_airports_source(table_2, df_2_1))
result_list.append(check_primary_key_uniqueness_airports_raw(table_2, df_2_2))

result_list.append(check_primary_key_uniqueness_flights_source(table_3, df_3_1))
result_list.append(check_primary_key_uniqueness_flights_raw(table_3, df_3_2))

headers = ['#', 'Table', 'DQ Check', 'Column', 'Status', 'Counts or Bad Data']


df = pd.DataFrame(result_list, columns = headers)
df.to_csv(file_path, index=False)
