In [17]:
import sqlalchemy
import pandas as pd
from sqlalchemy import text

# Path to the SQLite database
sqlite_db_path = "Main_DB/zomato_DB.sqlite"
engine = sqlalchemy.create_engine(f'sqlite:///{sqlite_db_path}')

def count_rows_in_table(engine, table_name):
    with engine.connect() as conn:
        result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
        row_count = result.fetchone()[0]
        print(f"Total rows in {table_name}: {row_count}")

def save_table_to_csv(engine, table_name, output_file):
    with engine.connect() as conn:
        df = pd.read_sql(f"SELECT * FROM {table_name}", conn)
        df.to_csv(output_file, index=False)
        print(f"Table {table_name} saved to {output_file}")

# Count rows
count_rows_in_table(engine, 'restaurant_dimension_table')
count_rows_in_table(engine, 'location_dimension_table')
count_rows_in_table(engine, 'fact_table')

# Save tables to CSV files
save_table_to_csv(engine, 'restaurant_dimension_table', 'restaurant_dimension_table.csv')
save_table_to_csv(engine, 'fact_table', 'fact_table.csv')


Total rows in restaurant_dimension_table: 107380
Total rows in fact_table: 107380


In [3]:
def read_config(file_path):
    try:
        with open(file_path, 'r') as file:
            config = yaml.safe_load(file)
        return config
    except Exception as e:
        logging.error(f"An error occurred while reading the config file: {e}")

In [16]:
import pandas as pd
import sqlite3
import sqlalchemy
import logging
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score
import yaml
import numpy as np
from sklearn.preprocessing import LabelEncoder
import os

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
config = read_config('PA_config.yml')

def initialize_db(engine):
    """Check connection and create the database if necessary."""
    with engine.connect() as conn:
        logging.info("Database created and connected successfully.")

def train_linear_regression(X_train, y_train):
    """Train a Linear Regression model."""
    model = LinearRegression()
    model.fit(X_train, y_train)
    return model

def train_decision_tree(X_train, y_train, max_depth, criterion):
    """Train a Decision Tree model for classification."""
    model = DecisionTreeClassifier(max_depth=max_depth, criterion=criterion)
    model.fit(X_train, y_train)
    return model

def evaluate_regression_model(model, X_test, y_test):
    """Evaluate a regression model with Mean Squared Error and Mean Absolute Error."""
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
    return mse, mae


def label_encode_categorical_features(df, categorical_columns):
    """Apply Label Encoding to categorical columns."""
    for col in categorical_columns:
        le = LabelEncoder()
        df[col] = le.fit_transform(df[col].astype(str))  # Encode categorical column
    return df  # Only return the DataFrame



def evaluate_classification_model(model, X_test, y_test):
    """Evaluate a classification model with Accuracy."""
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    return accuracy

def clean_data(df):
    """Clean data by replacing non-numeric values like '-' with NaN and handle missing values."""
    df.replace('-', np.nan, inplace=True)  # Replace '-' with NaN
    df.fillna(df.median(numeric_only=True), inplace=True)  # Handle missing values using median
    return df

def load_and_prepare_data(engine, table_name, selected_columns, use_numerical_only=False):
    """Load and prepare data for modeling from the database."""
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql(query, engine)

    # Clean the data
    df = clean_data(df)
    print("Available columns in DataFrame:", df.columns)

    # If using numerical columns only (for Linear Regression)
    if use_numerical_only:
        numerical_columns = df.select_dtypes(include=[np.number]).columns
        selected_columns = [col for col in selected_columns if col in numerical_columns]

    # If not using numerical only, we will label encode the categorical columns for Decision Tree
    if not use_numerical_only:
        categorical_columns = ['CUSINETYPE', 'RATING_TYPE', 'CITY']
        df= label_encode_categorical_features(df, categorical_columns)

    # Split into features and target
    X = df[selected_columns]
    y = df['RATING']

    # For Decision Tree Classification: round ratings to nearest integer
    y_class = y.round().astype(int)

    return train_test_split(X, y, y_class, test_size=0.2, random_state=42)


def predictive_analysis_pipeline(engine, config):
    logging.info("Starting Predictive Analysis...")

    # Columns to be used for prediction
    selected_columns = config['columns']['selected_features']

    # Load and split data for Linear Regression (numerical columns only)
    X_train_lr, X_test_lr, y_train_lr, y_test_lr, _, _ = load_and_prepare_data(engine, "staging_data", selected_columns, use_numerical_only=True)

    # Train Linear Regression model
    lr_model = train_linear_regression(X_train_lr, y_train_lr)
    # Evaluate Linear Regression model
    mse_lr, mae_lr = evaluate_regression_model(lr_model, X_test_lr, y_test_lr)
    logging.info(f"Linear Regression - MSE: {mse_lr}, MAE: {mae_lr}")

    # Load and split data for Decision Tree (including categorical columns)
    X_train_dt, X_test_dt, y_train_dt, y_test_dt, y_train_class, y_test_class = load_and_prepare_data(engine, "staging_data", selected_columns, use_numerical_only=False)

    # Train Decision Tree model for classification
    dt_model = train_decision_tree(X_train_dt, y_train_class, config['DT']['max_depth'], config['DT']['criterion'])
    # Evaluate Decision Tree model for classification
    accuracy_dt = evaluate_classification_model(dt_model, X_test_dt, y_test_class)
    logging.info(f"Decision Tree - Accuracy: {accuracy_dt}")

    return lr_model, dt_model

def etl_predictive_pipeline():
    logging.info("Starting ETL and Predictive Analysis Pipeline...")
    
    # Configuration
    config = read_config('PA_config.yml')

    # Set up database connection
    sqlite_db_path = config['database']['sqlite_db_path']
    os.makedirs(os.path.dirname(sqlite_db_path), exist_ok=True)
    engine = sqlalchemy.create_engine(f'sqlite:///{sqlite_db_path}')
    initialize_db(engine)

    # Predictive Analysis
    lr_model, dt_model = predictive_analysis_pipeline(engine, config)
    logging.info("Predictive analysis completed successfully.")

if __name__ == "__main__":
    etl_predictive_pipeline()


2024-09-18 15:39:52,666 - INFO - Starting ETL and Predictive Analysis Pipeline...
2024-09-18 15:39:52,673 - INFO - Database created and connected successfully.
2024-09-18 15:39:52,674 - INFO - Starting Predictive Analysis...
2024-09-18 15:39:53,662 - INFO - Linear Regression - MSE: 0.11314048336203299, MAE: 0.2424909704035927


Available columns in DataFrame: Index(['NAME', 'PRICE', 'CUSINE_CATEGORY', 'CITY', 'REGION', 'URL', 'PAGENO',
       'CUSINETYPE', 'TIMING', 'RATING_TYPE', 'RATING', 'VOTES', 'Latitude',
       'Longitude'],
      dtype='object')
Available columns in DataFrame: Index(['NAME', 'PRICE', 'CUSINE_CATEGORY', 'CITY', 'REGION', 'URL', 'PAGENO',
       'CUSINETYPE', 'TIMING', 'RATING_TYPE', 'RATING', 'VOTES', 'Latitude',
       'Longitude'],
      dtype='object')


TypeError: tuple indices must be integers or slices, not list

In [8]:
def load_and_prepare_data(engine, table_name, selected_columns):
    """Load and prepare data for modeling from the database."""
    query = f"SELECT * FROM {table_name}"
    df = pd.read_sql(query, engine)

    # Print available columns for debugging
    print("Available columns in the DataFrame:", df.columns)

    # Clean the data
    df = clean_data(df)

    # List of categorical columns to encode
    categorical_columns = ['CUSINETYPE', 'RATING_TYPE', 'CITY']

    # Apply One-Hot Encoding
    df = encode_categorical_features(df, categorical_columns)

    # Split into features and target
    X = df[selected_columns]
    y = df['RATING']

    # For Decision Tree Classification: round ratings to nearest integer
    y_class = y.round().astype(int)

    return train_test_split(X, y, y_class, test_size=0.2, random_state=42)
load_and_prepare_data(engine,"staging_data",selected_columns=)


SyntaxError: unterminated string literal (detected at line 26) (570727082.py, line 26)

In [9]:
import sqlalchemy
import pandas as pd

# Set up database connection
sqlite_db_path = "../Main_DB/zomato_DB.sqlite"
engine = sqlalchemy.create_engine(f'sqlite:///{sqlite_db_path}')

# Query to retrieve the table schema
def check_table_columns(engine, table_name):
    """Retrieve and print the column names of a given table."""
    query = f"PRAGMA table_info({table_name})"
    df = pd.read_sql(query, engine)
    print(f"Column names in the '{table_name}' table:")
    print(df[['name']])  # 'name' column contains the column names in the table

# Call the function to check the columns in the 'staging_data' table
check_table_columns(engine, 'staging_data')


Column names in the 'staging_data' table:
               name
0              NAME
1             PRICE
2   CUSINE_CATEGORY
3              CITY
4            REGION
5               URL
6            PAGENO
7        CUSINETYPE
8            TIMING
9       RATING_TYPE
10           RATING
11            VOTES
12         Latitude
13        Longitude
