In [37]:
import pandas as pd
import pandasql as psql
import sqlite3
import re
import numpy as np
import glob
import os

In [19]:
raw_path = '../data'
bronze_path = '../Submission/bronze_data/transaction_data'
silver_path = '../Submission/silver_data/transaction_data'

bronze_holiday_path = '../Submission/bronze_data/holiday_data'
bronze_weather_path = '../Submission/bronze_data/weather_data'
bronze_location_path = '../Submission/bronze_data/location_data'

In [33]:
transaction_schema = {
                        'location_id': str, 
                        'transaction_id': str, 
                        'profit': float
                    }

location_schema = {
                    	'location_id' : str,
                        'population': int,
                        'elevation': float
                }

weather_schema = {
                    'location_id': str,
                    'date': str,
                    'temperature': float,
                    'pressure': float,
                    'humidity': float,
                    'cloudy': bool,
                    'precipitation': bool
                }

In [28]:
def clean_profit(value):
    try:
        if '$' in str(value):
            value = value.replace('$', '')
        if '-' in str(value):
            parts = str(value).split('-')
            clean_value = parts[-1]
        else:
            clean_value = value

        return float(clean_value)
    except ValueError:
        # Return np.nan in case of conversion failure
        return np.nan

In [11]:
def invalid_location_check(df, column_name = 'location_id'):

    invalid_rows = df[df[column_name].str.match(r'^\d{3}$') == False]
    
    return invalid_rows

In [17]:
def invalid_date_check(df, pattern, column_name = 'date'):

    def check_format(value):
        return not bool(re.match(pattern, str(value)))

    invalid_rows = df[df[column_name].apply(check_format)]
    
    return invalid_rows

In [15]:
def invalid_transaction_id_check(df,column_name = 'transaction_id'):
    
    def contains_non_numeric(value):
        return not value.isdigit()
    
    non_numeric_rows = df[df[column_name].astype(str).apply(contains_non_numeric)]
    return non_numeric_rows


In [29]:
def invalid_profit(df,column_name = 'profit'):
    
    df[column_name] = df[column_name].replace("nan", np.nan)
    numeric_values = pd.to_numeric(df[column_name], errors='coerce')
    non_float_rows = df[numeric_values.isna()]

    return non_float_rows

In [None]:
def write_to_path_bronze(df, file_path, file_name):
    
    full_file_path = f"{file_path}/{file_name}"
    df.to_csv(full_file_path, index=False)

    return print(f"DataFrame saved to {full_file_path}")

In [25]:
def read_transactions_json(path):

    json_files = glob.glob(os.path.join(path, 'transactions*.json'))
    
    schema = {
        'location_id': str, 
        'date': str, 
        'transaction_id': str, 
        'profit': str
    }
    
    df_list = []
    df_counts = []

    for json_file in json_files:
        try:
            temp_df = pd.read_json(json_file, dtype=schema)
            df_list.append(temp_df)
            df_counts.append(len(temp_df))
        except ValueError as e:
            print(f"Error reading {json_file}: {e}")

    if not df_list:  # If df_list is empty after the loop
        print("No valid JSON files found or all files failed to load.")
        return None, None

    combined_df = pd.concat(df_list, ignore_index=True)

    return combined_df, df_counts

In [32]:
def read_transactions_txt(path):
    txt_files = glob.glob(os.path.join(path, 'transactions*.txt'))
    
    df_list = []
    df_counts = []
    schema = {
        'location_id': str, 
        'date': str, 
        'transaction_id': str, 
        'profit': str
    }
    

    for txt_file in txt_files:
        try:
            temp_df = pd.read_csv(txt_file, delimiter='\t', dtype=schema)
            df_list.append(temp_df)
            df_counts.append(len(temp_df))
        except Exception as e:  # Using a broad catch to handle any error during file reading
            print(f"Error reading {txt_file}: {e}")

    if not df_list:  # If df_list is empty after the loop
        print("No valid TXT files found or all files failed to load.")
        return None, None

    combined_df = pd.concat(df_list, ignore_index=True)

    return combined_df, df_counts

In [None]:
def read_transactions_csv(path):
    csv_files = glob.glob(os.path.join(path, 'transactions*.csv'))
    df_list = []
    df_counts = []

    schema = {
        'location_id': str, 
        'date': str, 
        'transaction_id': str, 
        'profit': str
    }

    for csv_file in csv_files:
        try:
            # Read the CSV file. Pandas assumes comma as the default delimiter for CSV.
            temp_df = pd.read_csv(csv_file, dtype=schema)
            df_list.append(temp_df)
            df_counts.append(len(temp_df))
        except Exception as e:  # Using a broad catch to handle any error during file reading
            print(f"Error reading {csv_file}: {e}")

    if not df_list:  # If df_list is empty after the loop
        print("No valid CSV files found or all files failed to load.")
        return None, None

    combined_df = pd.concat(df_list, ignore_index=True)

    return combined_df, df_counts

In [34]:
def check_bool(df, column_name = 'holiday'):
    failed_columns = []
    
    for column_name in df.columns:
        # Drop rows where the value in the current column is NaN or None and keep the index of dropped rows
        non_missing_values = df[column_name].dropna()
        dropped_indices = df[df[column_name].isna()].index.tolist()

        # Check if all non-missing values in the column are strictly True or False
        if not all(non_missing_values.apply(lambda x: type(x) == bool)):
            # If the column fails the check, add it and the dropped row indices to the failed_columns list
            failed_columns.append({'Column': column_name, 'Dropped Row Indices': dropped_indices})

    # Convert the list of dictionaries to a DataFrame
    failed_columns_df = pd.DataFrame(failed_columns)

    return failed_columns_df