In [1]:
!which python

/opt/conda/bin/python


In [1]:
import pandas as pd
import pyarrow.parquet as pq
import s3fs

# Initialize S3 filesystem
fs = s3fs.S3FileSystem(anon=False)

# Define the base path and years/months
base_path = "jkim27-etl-5b9d2da3-5f5d-4ab5-bda1-80307b8dc702/taxi/glue-transformed"
years = ['2023', '2024']
months = [f"{i:02d}" for i in range(1, 13)]

# Gather all matching file paths
parquet_files = []
for year in years:
    for month in months:
        path = f"{base_path}/year={year}/month={month}/"
        try:
            files = fs.ls(path)
            parquet_files.extend(["s3://" + f for f in files if f.endswith(".parquet") or f.endswith(".snappy.parquet")])
        except FileNotFoundError:
            # Skip months that don't exist
            continue

# Load all Parquet files into a single DataFrame
df_list = []
for file in parquet_files:
    df = pq.ParquetDataset(file, filesystem=fs).read().to_pandas()
    df_list.append(df)

# Concatenate all dataframes
full_df = pd.concat(df_list, ignore_index=True)

# Show the resulting DataFrame
print(full_df.head())

           pickup_hour  pickup_location_id  rides  year  month
0  2023-01-01 00:00:00                   2      0  2023      1
1  2023-01-01 01:00:00                   2      0  2023      1
2  2023-01-01 02:00:00                   2      0  2023      1
3  2023-01-01 03:00:00                   2      0  2023      1
4  2023-01-01 04:00:00                   2      0  2023      1


In [2]:
full_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4220784 entries, 0 to 4220783
Data columns (total 5 columns):
 #   Column              Dtype 
---  ------              ----- 
 0   pickup_hour         object
 1   pickup_location_id  int16 
 2   rides               int16 
 3   year                int32 
 4   month               int32 
dtypes: int16(2), int32(2), object(1)
memory usage: 80.5+ MB


In [5]:
pip install --upgrade psycopg2-binary SQLAlchemy

Collecting psycopg2-binary
  Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting SQLAlchemy
  Using cached sqlalchemy-2.0.40-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Using cached psycopg2_binary-2.9.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
Using cached sqlalchemy-2.0.40-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
Installing collected packages: SQLAlchemy, psycopg2-binary
  Attempting uninstall: SQLAlchemy
    Found existing installation: SQLAlchemy 2.0.38
    Uninstalling SQLAlchemy-2.0.38:
      Successfully uninstalled SQLAlchemy-2.0.38
Successfully installed SQLAlchemy-2.0.40 psycopg2-binary-2.9.10
Note: you may need to restart the kernel to use updated packages.


In [6]:
# check connection
from sqlalchemy import create_engine, text

engine = create_engine("postgresql+psycopg2://taxiuser:Occupier-Dismount-Unmovable-Fading-Defender@taxi-db.cyhsik28ubia.us-east-1.rds.amazonaws.com:5432/postgres")

with engine.connect() as conn:
    result = conn.execute(text("SELECT version();"))
    print(result.fetchone())

('PostgreSQL 17.2 on aarch64-unknown-linux-gnu, compiled by gcc (GCC) 12.4.0, 64-bit',)


In [7]:
# check connection
import socket

host = "taxi-db.cyhsik28ubia.us-east-1.rds.amazonaws.com"
port = 5432

try:
    socket.create_connection((host, port), timeout=5)
    print("✅ Able to reach the database on port 5432")
except Exception as e:
    print(f"❌ Could not connect: {e}")

✅ Able to reach the database on port 5432


In [8]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

# Connect to the default 'postgres' database (not the one you're trying to create)
conn = psycopg2.connect(
    dbname='postgres',
    user='taxiuser',
    password='Occupier-Dismount-Unmovable-Fading-Defender',
    host='taxi-db.cyhsik28ubia.us-east-1.rds.amazonaws.com',
    port=5432
)

conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

cur = conn.cursor()
cur.execute("CREATE DATABASE taxidata;")
cur.close()
conn.close()



DuplicateDatabase: database "taxidata" already exists


In [11]:
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://taxiuser:Occupier-Dismount-Unmovable-Fading-Defender@taxi-db.cyhsik28ubia.us-east-1.rds.amazonaws.com:5432/postgres"
)

In [12]:
full_df.to_sql('taxi_rides', con=engine, index=False, if_exists='replace')

784

In [13]:
import pandas as pd

pd.read_sql("SELECT count(*) FROM taxi_rides LIMIT 5;", con=engine)

Unnamed: 0,count
0,4220784


In [14]:
import pandas as pd
import s3fs
import re

fs = s3fs.S3FileSystem()

bucket_base = 'jkim27-etl-5b9d2da3-5f5d-4ab5-bda1-80307b8dc702/taxi/test-predicted-values'
years = ['2023', '2024']

df_list = []

# List all pickup_location_id folders
location_paths = fs.ls(bucket_base)
for loc_path in location_paths:
    # Extract location ID from the folder name
    loc_match = re.search(r'pickup_location_id=(\d+)', loc_path)
    if not loc_match:
        continue
    location_id = int(loc_match.group(1))

    for year in years:
        try:
            months = fs.ls(f"{loc_path}/year={year}")
            for month_path in months:
                month_match = re.search(r'month=(\d+)', month_path)
                if not month_match:
                    continue
                month = int(month_match.group(1))
                days = fs.ls(month_path)
                for day_path in days:
                    day_match = re.search(r'day=(\d+)', day_path)
                    if not day_match:
                        continue
                    day = int(day_match.group(1))
                    hours = fs.ls(day_path)
                    for hour_path in hours:
                        hour_match = re.search(r'hour=(\d+)', hour_path)
                        if not hour_match:
                            continue
                        hour = int(hour_match.group(1))
                        file_path = f"{hour_path}/prediction.csv"
                        if fs.exists(file_path):
                            df = pd.read_csv(f"s3://{file_path}")
                            df['pickup_location_id'] = location_id
                            df['year'] = int(year)
                            df['month'] = month
                            df['day'] = day
                            df['hour'] = hour
                            df_list.append(df)
        except FileNotFoundError:
            continue

# Combine all data
predictions_df = pd.concat(df_list, ignore_index=True)

# Preview
print(predictions_df.head())

   prediction_datetime  predicted_rides  pickup_location_id  year  month  day  \
0  2023-12-18 00:00:00                9                  43  2023     12   18   
1  2023-12-18 01:00:00                5                  43  2023     12   18   
2  2023-12-18 10:00:00               96                  43  2023     12   18   
3  2023-12-18 11:00:00              125                  43  2023     12   18   
4  2023-12-18 12:00:00              195                  43  2023     12   18   

   hour  
0     0  
1     1  
2    10  
3    11  
4    12  


In [15]:
predictions_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8424 entries, 0 to 8423
Data columns (total 7 columns):
 #   Column               Non-Null Count  Dtype 
---  ------               --------------  ----- 
 0   prediction_datetime  8424 non-null   object
 1   predicted_rides      8424 non-null   int64 
 2   pickup_location_id   8424 non-null   int64 
 3   year                 8424 non-null   int64 
 4   month                8424 non-null   int64 
 5   day                  8424 non-null   int64 
 6   hour                 8424 non-null   int64 
dtypes: int64(6), object(1)
memory usage: 460.8+ KB


In [16]:
from sqlalchemy import create_engine

predictions_df['pickup_location_id'] = 43

engine = create_engine(
    "postgresql+psycopg2://taxiuser:Occupier-Dismount-Unmovable-Fading-Defender@taxi-db.cyhsik28ubia.us-east-1.rds.amazonaws.com:5432/taxidata"
)

# Load into a new table
predictions_df.to_sql('predicted_rides', con=engine, index=False, if_exists='replace')

424

In [17]:
predictions_df

Unnamed: 0,prediction_datetime,predicted_rides,pickup_location_id,year,month,day,hour
0,2023-12-18 00:00:00,9,43,2023,12,18,0
1,2023-12-18 01:00:00,5,43,2023,12,18,1
2,2023-12-18 10:00:00,96,43,2023,12,18,10
3,2023-12-18 11:00:00,125,43,2023,12,18,11
4,2023-12-18 12:00:00,195,43,2023,12,18,12
...,...,...,...,...,...,...,...
8419,2024-12-31 05:00:00,3,43,2024,12,31,5
8420,2024-12-31 06:00:00,7,43,2024,12,31,6
8421,2024-12-31 07:00:00,19,43,2024,12,31,7
8422,2024-12-31 08:00:00,28,43,2024,12,31,8


In [18]:
pd.read_sql("SELECT * FROM predicted_rides LIMIT 5;", con=engine)

Unnamed: 0,prediction_datetime,predicted_rides,pickup_location_id,year,month,day,hour
0,2023-12-18 00:00:00,9,43,2023,12,18,0
1,2023-12-18 01:00:00,5,43,2023,12,18,1
2,2023-12-18 10:00:00,96,43,2023,12,18,10
3,2023-12-18 11:00:00,125,43,2023,12,18,11
4,2023-12-18 12:00:00,195,43,2023,12,18,12


In [19]:
from sqlalchemy import text

with engine.connect() as conn:
    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_taxi_rides_loc_hour
        ON taxi_rides (pickup_location_id, pickup_hour);
    """))

    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_predicted_rides_loc_datetime
        ON predicted_rides (pickup_location_id, prediction_datetime);
    """))