In [None]:
import psycopg2
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, MetaData
from sqlalchemy.ext.declarative import declarative_base
import pandas as pd
import logging
import os
from urllib.request import urlretrieve
import gzip
from io import BytesIO

# Configure logging (same as before)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Define database connection details (using defaults for local testing in Jupyter)
DB_HOST = os.environ.get("DB_HOST", "localhost")  # Use localhost if Postgres is local
DB_PORT = int(os.environ.get("DB_PORT", 5432))
DB_USER = os.environ.get("DB_USER", "postgres")
DB_PASSWORD = os.environ.get("DB_PASSWORD", "postgres")
DB_NAME = os.environ.get("DB_NAME", "ny_taxi")

# Define constants (same as before)
GREEN_TAXI_TABLE_NAME = "green_taxi_trips"
ZONE_LOOKUP_TABLE_NAME = "taxi_zones"
GREEN_TAXI_COLUMN_MAPPINGS = {
    "VendorID": "vendor_id",
    "lpep_pickup_datetime": "pickup_datetime",
    "lpep_dropoff_datetime": "dropoff_datetime",
    "store_and_fwd_flag": "store_and_fwd_flag",
    "RatecodeID": "ratecode_id",
    "PULocationID": "pickup_location_id",
    "DOLocationID": "dropoff_location_id",
    "passenger_count": "passenger_count",
    "trip_distance": "trip_distance",
    "fare_amount": "fare_amount", 
    "extra": "extra",
    "mta_tax": "mta_tax",
    "tip_amount": "tip_amount",
    "tolls_amount": "tolls_amount",
    "ehail_fee": "ehail_fee",    
    "improvement_surcharge": "improvement_surcharge",
    "total_amount": "total_amount",
    "payment_type": "payment_type",
    "trip_type":"trip_type",
    "congestion_surcharge": "congestion_surcharge",
}

ZONE_LOOKUP_COLUMN_MAPPINGS = {
    "LocationID": "location_id",
    "Borough": "borough",
    "Zone": "zone",
    "service_zone": "service_zone",
}
CHUNK_SIZE = 100000
GREEN_TAXI_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-10.csv.gz"
ZONE_LOOKUP_URL = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"


def download_data(url):
    """Downloads data from the given URL and returns it as BytesIO object."""
    logging.info(f"Downloading data from {url}...")
    try:
        response = urlretrieve(url)
        with open(response[0], 'rb') as f:
            data = BytesIO(f.read())
        logging.info(f"Download complete.")
        return data
    except Exception as e:
        logging.error(f"Error downloading data: {e}")
        return None

def create_database_engine():
    """Creates a SQLAlchemy engine."""
    try:
        engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
        engine.connect()  # Test the connection immediately
        logging.info("Database connection successful.")
        return engine
    except Exception as e:
        logging.error(f"Error creating database engine: {e}")
        return None

def create_tables(engine):
    """Creates the database tables."""
    metadata_obj = MetaData()
    Base = declarative_base(metadata=metadata_obj)

    class GreenTaxiTrip(Base):
        __tablename__ = GREEN_TAXI_TABLE_NAME
        trip_id = Column(Integer, primary_key=True, autoincrement=True)
        # ... (other columns same as before)
    class ZoneLookup(Base): #... (same as before)

    try:
        Base.metadata.create_all(engine)
        logging.info("Tables created successfully.")
    except Exception as e:
        logging.error(f"Error creating tables: {e}")

def ingest_data(engine, data, table_name, column_mappings, table_class):
    """Ingests data from a BytesIO object into the database."""
    logging.info(f"Ingesting data into {table_name}...")
    try:
        df_chunks = pd.read_csv(data, chunksize=CHUNK_SIZE, dtype='str', compression='gzip' if table_name == GREEN_TAXI_TABLE_NAME else None)
        for i, chunk in enumerate(df_chunks):
            logging.info(f"Processing chunk {i+1}...")
            chunk = chunk.rename(columns=column_mappings)
            for col, dtype in table_class.__table__.columns.items():
                if col in chunk.columns:
                    if dtype.type.__class__.__name__ == 'DateTime':
                        chunk[col] = pd.to_datetime(chunk[col], errors='coerce')
                    elif dtype.type.__class__.__name__ in ('Integer', 'Float'):
                        chunk[col] = pd.to_numeric(chunk[col], errors='coerce')
            chunk.to_sql(name=table_name, con=engine, if_exists='append', index=False)
            logging.info(f"Chunk {i+1} ingested.")
        logging.info(f"Data ingestion into {table_name} complete.")
    except Exception as e:
        logging.error(f"Error ingesting data: {e}")


def main():
    """Main function"""
    try:
        engine = create_database_engine()
        if engine is None:
            raise Exception("Engine not created")
        create_tables(engine)

        green_taxi_data = download_data(GREEN_TAXI_URL)
        if green_taxi_data is None:
            raise Exception("Green Taxi data not downloaded")
        ingest_data(engine, green_taxi_data, GREEN_TAXI_TABLE_NAME, GREEN_TAXI_COLUMN_MAPPINGS, GreenTaxiTrip)

        zone_lookup_data = download_data(ZONE_LOOKUP_URL)
        if zone_lookup_data is None:
            raise Exception("Zone Lookup data not downloaded")
        ingest_data(engine, zone_lookup_data, ZONE_LOOKUP_TABLE_NAME, ZONE_LOOKUP_COLUMN_MAPPINGS, ZoneLookup)

        logging.info("All operations completed successfully.")

    except Exception as e:
        logging.error(f"A critical error occurred: {e}")

if __name__ == "__main__":
    main()