In [41]:
# Install Necessary Packages
!pip install dotenv pandas requests psycopg2-binary sqlalchemy

Defaulting to user installation because normal site-packages is not writeable


In [66]:

# Importing necessary librabries needed
import psycopg2
import requests
import pandas as pd
import psycopg2
from sqlalchemy import create_engine, Table, Column, Integer, String, Float, DateTime, MetaData, ForeignKey
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
import os
import dotenv
from datetime import datetime

In [None]:
# Extraction Phase: Extract sales and rental data from the Rentcast API

# Define API key and URLs
API_KEY = "API_KEY"  # Replace with your actual API key
HEADERS = {"X-Api-Key": API_KEY, "accept": "application/json"}
SALE_URL = "https://api.rentcast.io/v1/listings/sale?city=Austin&state=TX&status=Active&limit=500"
RENTAL_URL = "https://api.rentcast.io/v1/listings/rental/long-term?city=Austin&state=TX&status=Active&limit=500"

# === Extract Function ===
def extract_data(url, category):
    response = requests.get(url, headers=HEADERS)
    
    if response.status_code == 200:
        data = response.json()
        df = pd.DataFrame(data)
        df["listing_category"] = category  # add category (sale or rental)
        print(f"Extracted {len(df)} records for {category}.")
        return df
    else:
        raise Exception(f"API request failed: {response.status_code} - {response.text}")

# Run extractions
sales_df = extract_data(SALE_URL, "sale")
rentals_df = extract_data(RENTAL_URL, "rental")

# Combine into one dataset
combined_df = pd.concat([sales_df, rentals_df], ignore_index=True)
print(f"Total records extracted: {len(combined_df)}")

# Idempotent save to CSV
combined_df.to_csv("austin_listings.csv", index=False)



Extracted 500 records for sale.
Extracted 500 records for rental.
Total records extracted: 1000


In [67]:
# Transformation Phase: Clean and transform data

# === Transform Function ===
def transform_data(sales_df, rentals_df):
    
    # Handle missing values (based on schema)
    combined_df.fillna({
        'addressLine2': '', 'county': '', 'lotSize': 0, 'yearBuilt': 0, 'hoa': {'fee': 0},
        'listingType': 'Standard', 'daysOnMarket': 0, 'removedDate': None, 'mlsName': '', 'mlsNumber': '',
        'listingAgent': {}, 'listingOffice': {}, 'builder': {}, 'history': {}
    }, inplace=True)
    
    # Extract nested fields
    combined_df['hoa_fee'] = combined_df['hoa'].apply(lambda x: x.get('fee', 0) if isinstance(x, dict) else 0)
    combined_df['agent_name'] = combined_df['listingAgent'].apply(lambda x: x.get('name', '') if isinstance(x, dict) else '')
    combined_df['agent_phone'] = combined_df['listingAgent'].apply(lambda x: x.get('phone', '') if isinstance(x, dict) else '')
    combined_df['agent_email'] = combined_df['listingAgent'].apply(lambda x: x.get('email', '') if isinstance(x, dict) else '')
    combined_df['agent_website'] = combined_df['listingAgent'].apply(lambda x: x.get('website', '') if isinstance(x, dict) else '')
    combined_df['office_name'] = combined_df['listingOffice'].apply(lambda x: x.get('name', '') if isinstance(x, dict) else '')
    combined_df['office_phone'] = combined_df['listingOffice'].apply(lambda x: x.get('phone', '') if isinstance(x, dict) else '')
    # Handle builder for 'New Construction'
    combined_df['agent_name'] = combined_df.apply(lambda row: row['builder'].get('name', row['agent_name']) if row.get('listingType') == 'New Construction' else row['agent_name'], axis=1)
    # Extend similarly for other builder fields if present
    
    # Convert dates
    date_cols = ['listedDate', 'removedDate', 'createdDate', 'lastSeenDate']
    for col in date_cols:
        if col in combined_df.columns:
            combined_df[col] = pd.to_datetime(combined_df[col], errors='coerce')
    
    # Flatten history object
    history_records = []
    for idx, row in combined_df.iterrows():
        for history_date, event in row.get('history', {}).items():
            if isinstance(event, dict):
                event['listing_id'] = row['id']
                event['history_date'] = history_date
                history_records.append(event)
    history_df = pd.DataFrame(history_records)
    history_df['listedDate'] = pd.to_datetime(history_df['listedDate'], errors='coerce')
    history_df['removedDate'] = pd.to_datetime(history_df['removedDate'], errors='coerce')
    
    return combined_df, history_df

In [68]:
# connect to PostgreSQL database
dotenv.load_dotenv() # load .env file
DB_USER = os.getenv('DB_USER') # get username
DB_PASSWORD = os.getenv('DB_PASSWORD') # get password
DB_HOST = 'localhost' # get host
DB_NAME = 'real_estate_db' # database name
DB_PORT = '5432' # default port
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}')

print(f"connected to {DB_NAME} database successfully.")


connected to real_estate_db database successfully.


In [69]:
# === Create Schema Function ===
def create_postgres_schema(engine):
    with engine.begin() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS dim_state (
                state_id SERIAL PRIMARY KEY NOT NULL,
                state VARCHAR(255) UNIQUE NOT NULL
            );

            CREATE TABLE IF NOT EXISTS dim_city (
                city_id SERIAL PRIMARY KEY NOT NULL,
                city VARCHAR(255) NOT NULL,
                county VARCHAR(255),
                state_id INTEGER NOT NULL REFERENCES dim_state(state_id)
            );

            CREATE TABLE IF NOT EXISTS dim_zip (
                zip_id SERIAL PRIMARY KEY NOT NULL,
                zip_code VARCHAR(255) UNIQUE NOT NULL,
                city_id INTEGER NOT NULL REFERENCES dim_city(city_id)
            );

            CREATE TABLE IF NOT EXISTS dim_address (
                address_id SERIAL PRIMARY KEY NOT NULL,
                address_line1 VARCHAR(255) NOT NULL,
                address_line2 VARCHAR(255),
                formatted_address VARCHAR(255),
                zip_id INTEGER NOT NULL REFERENCES dim_zip(zip_id)
            );

            CREATE TABLE IF NOT EXISTS dim_location (
                location_id SERIAL PRIMARY KEY NOT NULL,
                latitude FLOAT NOT NULL,
                longitude FLOAT NOT NULL,
                address_id INTEGER NOT NULL REFERENCES dim_address(address_id)
            );

            CREATE TABLE IF NOT EXISTS dim_hoa (
                hoa_id SERIAL PRIMARY KEY NOT NULL,
                fee FLOAT
            );

            CREATE TABLE IF NOT EXISTS dim_property (
                property_id SERIAL PRIMARY KEY NOT NULL,
                property_type VARCHAR(255) NOT NULL,
                bedrooms FLOAT,
                bathrooms FLOAT,
                square_footage FLOAT,
                lot_size FLOAT,
                year_built INTEGER,
                hoa_id INTEGER REFERENCES dim_hoa(hoa_id)
            );

            CREATE TABLE IF NOT EXISTS dim_date (
                date_id SERIAL PRIMARY KEY NOT NULL,
                full_date TIMESTAMP NOT NULL,
                year INTEGER NOT NULL,
                month INTEGER NOT NULL,
                day INTEGER NOT NULL
            );

            CREATE TABLE IF NOT EXISTS dim_office (
                office_id SERIAL PRIMARY KEY NOT NULL,
                name VARCHAR(255) NOT NULL,
                phone VARCHAR(255)
            );

            CREATE TABLE IF NOT EXISTS dim_agent (
                agent_id SERIAL PRIMARY KEY NOT NULL,
                name VARCHAR(255) NOT NULL,
                phone VARCHAR(255),
                email VARCHAR(255),
                website VARCHAR(255),
                type VARCHAR(255) NOT NULL,  -- 'agent' or 'builder'
                office_id INTEGER NOT NULL REFERENCES dim_office(office_id)
            );

            CREATE TABLE IF NOT EXISTS fact_listings (
                listing_id VARCHAR(255) PRIMARY KEY NOT NULL,
                listing_category VARCHAR(255) NOT NULL,  -- 'sale' or 'rental'
                price FLOAT NOT NULL,
                status VARCHAR(255) NOT NULL,
                listing_type VARCHAR(255) NOT NULL,
                days_on_market INTEGER,
                mls_name VARCHAR(255),
                mls_number VARCHAR(255),
                created_date TIMESTAMP NOT NULL,
                last_seen_date TIMESTAMP,
                location_id INTEGER NOT NULL REFERENCES dim_location(location_id),
                property_id INTEGER NOT NULL REFERENCES dim_property(property_id),
                listed_date_id INTEGER NOT NULL REFERENCES dim_date(date_id),
                agent_id INTEGER NOT NULL REFERENCES dim_agent(agent_id)
            );

            CREATE TABLE IF NOT EXISTS listing_history (
                history_id SERIAL PRIMARY KEY NOT NULL,
                listing_id VARCHAR(255) NOT NULL REFERENCES fact_listings(listing_id),
                history_date VARCHAR(255) NOT NULL,
                event VARCHAR(255) NOT NULL,
                price FLOAT,
                listing_type VARCHAR(255),
                days_on_market INTEGER,
                listed_date TIMESTAMP,
                removed_date TIMESTAMP
            );
        """))
        conn.commit()

        print("PostgreSQL schema table created successfully.")

In [70]:
# Create snowflake schema in PostgreSQL (using SQLAlchemy)
from sqlalchemy import (
    Table, Column, Integer, String, Float, DateTime, MetaData, ForeignKey
)

def create_snowflake_schema(engine):
    metadata = MetaData()
    
    dim_state = Table('dim_state', metadata,
        Column('state_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('state', String(255), unique=True, nullable=False)
    )

    dim_city = Table('dim_city', metadata,
        Column('city_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('city', String(255), nullable=False),
        Column('county', String(255)),
        Column('state_id', Integer, ForeignKey('dim_state.state_id'), nullable=False)
    )

    dim_zip = Table('dim_zip', metadata,
        Column('zip_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('zip_code', String(255), unique=True, nullable=False),
        Column('city_id', Integer, ForeignKey('dim_city.city_id'), nullable=False)
    )

    dim_address = Table('dim_address', metadata,
        Column('address_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('address_line1', String(255), nullable=False),
        Column('address_line2', String(255)),
        Column('formatted_address', String(255)),
        Column('zip_id', Integer, ForeignKey('dim_zip.zip_id'), nullable=False)
    )

    dim_location = Table('dim_location', metadata,
        Column('location_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('latitude', Float, nullable=False),
        Column('longitude', Float, nullable=False),
        Column('address_id', Integer, ForeignKey('dim_address.address_id'), nullable=False)
    )

    dim_hoa = Table('dim_hoa', metadata,
        Column('hoa_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('fee', Float)
    )
    
    dim_property = Table('dim_property', metadata,
        Column('property_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('property_type', String(255), nullable=False),
        Column('bedrooms', Float),
        Column('bathrooms', Float),
        Column('square_footage', Float),
        Column('lot_size', Float),
        Column('year_built', Integer),
        Column('hoa_id', Integer, ForeignKey('dim_hoa.hoa_id'))
    )

    dim_date = Table('dim_date', metadata,
        Column('date_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('full_date', DateTime, nullable=False),
        Column('year', Integer, nullable=False),
        Column('month', Integer, nullable=False),
        Column('day', Integer, nullable=False)
    )

    dim_office = Table('dim_office', metadata,
        Column('office_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('name', String(255), nullable=False),
        Column('phone', String(255))
    )
    
    dim_agent = Table('dim_agent', metadata,
        Column('agent_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('name', String(255), nullable=False),
        Column('phone', String(255)),
        Column('email', String(255)),
        Column('website', String(255)),
        Column('type', String(255), nullable=False),  # 'agent' or 'builder'
        Column('office_id', Integer, ForeignKey('dim_office.office_id'), nullable=False)
    )

    fact_listings = Table('fact_listings', metadata,
        Column('listing_id', String(255), primary_key=True, nullable=False),
        Column('listing_category', String(255), nullable=False),  # 'sale' or 'rental'
        Column('price', Float, nullable=False),
        Column('status', String(255), nullable=False),
        Column('listing_type', String(255), nullable=False),
        Column('days_on_market', Integer),
        Column('mls_name', String(255)),
        Column('mls_number', String(255)),
        Column('created_date', DateTime, nullable=False),
        Column('last_seen_date', DateTime),
        Column('location_id', Integer, ForeignKey('dim_location.location_id'), nullable=False),
        Column('property_id', Integer, ForeignKey('dim_property.property_id'), nullable=False),
        Column('listed_date_id', Integer, ForeignKey('dim_date.date_id'), nullable=False),
        Column('agent_id', Integer, ForeignKey('dim_agent.agent_id'), nullable=False)
    )

    listing_history = Table('listing_history', metadata,
        Column('history_id', Integer, primary_key=True, autoincrement=True, nullable=False),
        Column('listing_id', String(255), ForeignKey('fact_listings.listing_id'), nullable=False),
        Column('history_date', String(255), nullable=False),
        Column('event', String(255), nullable=False),
        Column('price', Float),
        Column('listing_type', String(255)),
        Column('days_on_market', Integer),
        Column('listed_date', DateTime),
        Column('removed_date', DateTime)
    )
    
    metadata.create_all(engine)
    
    print("Snowflake schema created successfully.")
                           

In [71]:
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime

# Load Phase: Loading data into PostgreSQL with Snowflake schema

# Connect to PostgreSQL
conn = psycopg2.connect(
    host= "localhost", 
    port= "5432", 
    dbname= "real_estate_db", 
    user= "postgres", 
    password= "10Alytics@"
)

cursor = conn.cursor()
cursor.execute("SELECT version();")
print("Connected to:", cursor.fetchone())

cursor.close()
conn.close()


# === API Setup ===
API_KEY = 'API_KEY_PLACEHOLDER'  # Replace with your actual API key
HEADERS = {"X-Api-Key": API_KEY, "accept": "application/json"}
SALE_URL = "https://api.rentcast.io/v1/listings/sale?city=Austin&state=TX&status=Active&limit=500"
RENTAL_URL = "https://api.rentcast.io/v1/listings/rental/long-term?city=Austin&state=TX&status=Active&limit=500"


# Load function to load data into PostgreSQL
def load_data(combined_df, history_df):
    conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
    cur = conn.cursor()
    
    # Append to history_df (always insert)
    history_df.to_sql('listing_history', ENGINE, if_exists='append', index=False)
    
    
    # Insert into normalized dimensions (example logic; use upserts and retrieve IDs)
    for _, row in combined_df.iterrows():
        # State
        cur.execute("""
            INSERT INTO dim_state (state) VALUES (%s)
            ON CONFLICT (state) DO UPDATE SET state = EXCLUDED.state
            RETURNING state_id
        """, (row['state'],))
        state_id = cur.fetchone()[0]
        
        # City
        cur.execute("""
            INSERT INTO dim_city (city, county, state_id) VALUES (%s, %s, %s)
            ON CONFLICT (city, state_id) DO UPDATE SET county = EXCLUDED.county
            RETURNING city_id
        """, (row['city'], row['county'], state_id))
        city_id = cur.fetchone()[0]
        
        # Zip
        cur.execute("""
            INSERT INTO dim_zip (zip_code, city_id) VALUES (%s, %s)
            ON CONFLICT (zip_code) DO UPDATE SET city_id = EXCLUDED.city_id
            RETURNING zip_id
        """, (row['zipCode'], city_id))
        zip_id = cur.fetchone()[0]
        
        # Address
        cur.execute("""
            INSERT INTO dim_address (address_line1, address_line2, formatted_address, zip_id)
            VALUES (%s, %s, %s, %s)
            ON CONFLICT (formatted_address) DO NOTHING
            RETURNING address_id
        """, (row['addressLine1'], row['addressLine2'], row['formattedAddress'], zip_id))
        address_id = cur.fetchone()[0] if cur.rowcount > 0 else None  # Query if exists
        
        # Location
        cur.execute("""
            INSERT INTO dim_location (latitude, longitude, address_id) VALUES (%s, %s, %s)
            ON CONFLICT (latitude, longitude) DO NOTHING
            RETURNING location_id
        """, (row['latitude'], row['longitude'], address_id))
        location_id = cur.fetchone()[0] if cur.rowcount > 0 else None
        
        # HOA
        cur.execute("""
            INSERT INTO dim_hoa (fee) VALUES (%s)
            ON CONFLICT (fee) DO NOTHING
            RETURNING hoa_id
        """, (row['hoa_fee'],))
        hoa_id = cur.fetchone()[0] if cur.rowcount > 0 else None
        
        # Property
        cur.execute("""
            INSERT INTO dim_property (property_type, bedrooms, bathrooms, square_footage, lot_size, year_built, hoa_id)
            VALUES (%s, %s, %s, %s, %s, %s, %s)
            RETURNING property_id
        """, (row['propertyType'], row['bedrooms'], row['bathrooms'], row['squareFootage'], row['lotSize'], row['yearBuilt'], hoa_id))
        property_id = cur.fetchone()[0]
        
        # Date (for listedDate)
        listed_date = row.get('listedDate')
        if listed_date:
            year, month, day = listed_date.year, listed_date.month, listed_date.day
            cur.execute("""
                INSERT INTO dim_date (full_date, year, month, day) VALUES (%s, %s, %s, %s)
                ON CONFLICT (full_date) DO NOTHING
                RETURNING date_id
            """, (listed_date, year, month, day))
            date_id = cur.fetchone()[0] if cur.rowcount > 0 else None
        
        # Office
        cur.execute("""
            INSERT INTO dim_office (name, phone) VALUES (%s, %s)
            ON CONFLICT (name) DO UPDATE SET phone = EXCLUDED.phone
            RETURNING office_id
        """, (row['office_name'], row['office_phone']))
        office_id = cur.fetchone()[0]
        
        # Agent
        agent_type = 'builder' if row.get('listingType') == 'New Construction' else 'agent'
        cur.execute("""
            INSERT INTO dim_agent (name, phone, email, website, type, office_id)
            VALUES (%s, %s, %s, %s, %s, %s)
            RETURNING agent_id
        """, (row['agent_name'], row['agent_phone'], row['agent_email'], row['agent_website'], agent_type, office_id))
        agent_id = cur.fetchone()[0]
        
        # Fact Listings (upsert)
        cur.execute("""
            INSERT INTO fact_listings (listing_id, listing_category, price, status, listing_type, days_on_market,
                                       mls_name, mls_number, created_date, last_seen_date, location_id, property_id,
                                       listed_date_id, agent_id)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (listing_id) DO UPDATE SET price = EXCLUDED.price, status = EXCLUDED.status  -- Update key fields
        """, (row['id'], row['listing_category'], row['price'], row['status'], row.get('listingType'), row.get('daysOnMarket'),
              row.get('mlsName'), row.get('mlsNumber'), row.get('createdDate'), row.get('lastSeenDate'), location_id,
              property_id, date_id, agent_id))
        
        # History
        for _, hrow in history_df[history_df['listing_id'] == row['id']].iterrows():
            cur.execute("""
                INSERT INTO listing_history (listing_id, history_date, event, price, listing_type, days_on_market,
                                             listed_date, removed_date)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
            """, (hrow['listing_id'], hrow['history_date'], hrow.get('event'), hrow.get('price'), hrow.get('listingType'),
                  hrow.get('daysOnMarket'), hrow.get('listedDate'), hrow.get('removedDate')))
    
    conn.commit()
    cur.close()
    conn.close()
    
    print("✅ Data loaded into {DB_NAME} successfully.")

Connected to: ('PostgreSQL 17.2 on x86_64-windows, compiled by msvc-19.42.34435, 64-bit',)


In [72]:
# Run complete ETL pipeline
# Extract
def extract_data(url, category):
    response = requests.get(url, headers=HEADERS)

    if response.status_code == 200:
        data = response.json()

        if isinstance(data, dict):
            df = pd.DataFrame([data])   # wrap dict in list
        else:
            df = pd.DataFrame(data)     # already list of dicts

        if not df.empty:
            df["listing_category"] = category
            df["extraction_timestamp"] = datetime.utcnow()

        print(f"Extracted {len(df)} records for {category}")
        return df

if __name__ == "__main__":
    sales_df = extract_data(SALE_URL, "sale")
    rentals_df = extract_data(RENTAL_URL, "rental")
    
# Transform
def transform_data(sales_df, rentals_df):
    # Combine dataframes
    combined_df = pd.concat([sales_df, rentals_df], ignore_index=True)
    combined_df, history_df = transform_data(sales_df, rentals_df)
    
    # Create schema
    create_postgres_schema(engine) # Create Postgres schema if not exists
    create_snowflake_schema(engine) # Create snowflake schema if not exists
    print("Schema setup completed.")
    

# Load
def load_data(combined_df, history_df):
    
    conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD)
    cur = conn.cursor()
    
    
    print("ETL Pipeline completed successfully.")

In [None]:
# pip install apache-airflow

