In [2]:
# --- Setup: imports, engine, paths ---
import os
from getpass import getpass
from pathlib import Path
import pandas as pd
from sqlalchemy import create_engine, text, bindparam
from sqlalchemy.engine import URL

# Project paths
PROJECT_ROOT = Path.cwd().resolve()
DATA_DIR     = PROJECT_ROOT / "data"
SAMPLES_DIR  = DATA_DIR / "samples"
SAMPLES_DIR.mkdir(parents=True, exist_ok=True)

# DB connection (prompts user; no secrets saved in file)
pg_password = getpass("Postgres password: ")
engine = create_engine(URL.create(
    "postgresql+psycopg2",
    username=os.getenv("PGUSER", "postgres"),
    password=pg_password,
    host=os.getenv("PGHOST", "localhost"),
    port=int(os.getenv("PGPORT", 5432)),
    database=os.getenv("PGDATABASE", "yelpdb"),
), future=True)

print("Project root:", PROJECT_ROOT)
print("Sample output dir:", SAMPLES_DIR)


Postgres password:  ········


Project root: C:\Users\syksh\Coursera\SQL-Capstone-Yelp\notebooks
Sample output dir: C:\Users\syksh\Coursera\SQL-Capstone-Yelp\notebooks\data\samples


In [1]:
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
from getpass import getpass

connection_url = URL.create(
    "postgresql+psycopg2",
    username="postgres",
    password=getpass("Postgres password: "),  # you'll be prompted
    host="localhost",
    port=5432,
    database="yelpdb",
)
engine = create_engine(connection_url, future=True)

# sanity check
with engine.connect() as conn:
    print(conn.execute(text("select current_database(), count(*) from yelp.business_coffee")).one())


Postgres password:  ········


('yelpdb', 6704)


In [2]:
from sqlalchemy import text

with engine.begin() as conn:
    conn.execute(text("""
        CREATE OR REPLACE VIEW yelp.business_coffee_sanitized AS
        SELECT
          business_id,
          name,
          city,
          state,
          stars::float          AS stars,
          review_count::int     AS review_count,

          -- price_level as int only when digits; else NULL
          CASE
            WHEN attributes ? 'RestaurantsPriceRange2'
             AND (attributes->>'RestaurantsPriceRange2') ~ '^[0-9]+$'
            THEN (attributes->>'RestaurantsPriceRange2')::int
            ELSE NULL
          END AS price_level,

          -- WiFi: free/paid -> 1, no -> 0, others/None -> NULL
          CASE lower(coalesce(attributes->>'WiFi',''))
            WHEN 'free' THEN 1
            WHEN 'paid' THEN 1
            WHEN 'no'   THEN 0
            ELSE NULL
          END AS has_wifi,

          -- OutdoorSeating: 'true'/'false' -> 1/0; else NULL
          CASE lower(coalesce(attributes->>'OutdoorSeating',''))
            WHEN 'true'  THEN 1
            WHEN 'false' THEN 0
            ELSE NULL
          END AS outdoor_seating

        FROM yelp.business_coffee;
    """))
print("✅ View yelp.business_coffee_sanitized created.")


✅ View yelp.business_coffee_sanitized created.


In [3]:
import pandas as pd

pd.read_sql("""
SELECT
  corr(stars, price_level::float)       AS r_price,
  corr(stars, has_wifi::float)          AS r_wifi,
  corr(stars, outdoor_seating::float)   AS r_outdoor
FROM yelp.business_coffee_sanitized;
""", engine)


Unnamed: 0,r_price,r_wifi,r_outdoor
0,0.234419,,0.261505


In [4]:
pd.read_sql("""
WITH a AS (
  SELECT stars, has_wifi, outdoor_seating
  FROM yelp.business_coffee_sanitized
)
SELECT
  'wifi' AS feature,
  AVG(stars) FILTER (WHERE has_wifi = 1)
  - AVG(stars) FILTER (WHERE has_wifi IS DISTINCT FROM 1) AS delta_stars
FROM a
UNION ALL
SELECT
  'outdoor' AS feature,
  AVG(stars) FILTER (WHERE outdoor_seating = 1)
  - AVG(stars) FILTER (WHERE outdoor_seating IS DISTINCT FROM 1)
FROM a;
""", engine)


Unnamed: 0,feature,delta_stars
0,wifi,
1,outdoor,0.476084


In [19]:
!pip install sqlalchemy psycopg2-binary tqdm --quiet

import json, os
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL
from psycopg2.extras import Json, execute_values
from tqdm import tqdm

from getpass import getpass

pg_password = getpass("Postgres password: ")

connection_url = URL.create(
    "postgresql+psycopg2",
    username="postgres",
    password=pg_password,
    host="localhost",
    port=5432,
    database="yelpdb",
)

engine = create_engine(connection_url, future=True)

# Quick test
with engine.connect() as conn:
    who = conn.execute(text("select current_user, current_database()")).one()
print("Connected as:", who)


Postgres password:  ········


Connected as: ('postgres', 'yelpdb')


In [7]:
from contextlib import contextmanager

@contextmanager
def pg_cursor(engine):
    conn = engine.raw_connection()
    try:
        yield conn.cursor()
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

def load_jsonl_to_pg(json_path, table_fullname, batch_size=5000):
    """
    Streams a .json (NDJSON) file line-by-line, inserts valid JSON rows into
    table_fullname (which must have a single column 'doc jsonb').
    Skips malformed lines and reports counts.
    """
    total = good = bad = 0
    buf = []

    if not os.path.exists(json_path):
        raise FileNotFoundError(json_path)

    with open(json_path, "r", encoding="utf-8") as f, pg_cursor(engine) as cur:
        # truncate target first (idempotent loads)
        cur.execute(f"TRUNCATE {table_fullname};")

        for line in tqdm(f, desc=f"Loading {os.path.basename(json_path)}"):
            total += 1
            line = line.strip()
            if not line:
                continue
            try:
                obj = json.loads(line)          # validate JSON
                buf.append((Json(obj),))        # psycopg2 will adapt to jsonb
                good += 1
            except Exception:
                bad += 1
                continue

            if len(buf) >= batch_size:
                execute_values(cur,
                               f"INSERT INTO {table_fullname}(doc) VALUES %s",
                               buf, page_size=batch_size)
                buf.clear()

        if buf:
            execute_values(cur,
                           f"INSERT INTO {table_fullname}(doc) VALUES %s",
                           buf, page_size=batch_size)

    return {"total_lines": total, "inserted": good, "skipped_bad": bad}


In [8]:
business_path = r"C:\Users\syksh\Coursera\SQL-Capstone-Yelp\data\raw data\yelp_academic_dataset_business.json"

stats = load_jsonl_to_pg(business_path, "yelp.business_raw", batch_size=5000)
stats


Loading yelp_academic_dataset_business.json: 150346it [00:12, 11862.77it/s]


{'total_lines': 150346, 'inserted': 150346, 'skipped_bad': 0}

In [18]:
from sqlalchemy import text

with engine.begin() as conn:
    # Drop dependent view(s) first
    conn.execute(text("DROP VIEW IF EXISTS yelp.business_coffee_sanitized;"))

    # Now drop the tables
    conn.execute(text("DROP TABLE IF EXISTS yelp.business_coffee;"))
    conn.execute(text("DROP TABLE IF EXISTS yelp.business;"))

    # Recreate yelp.business
    conn.execute(text("""
        CREATE TABLE yelp.business AS
        SELECT
          doc->>'business_id'         AS business_id,
          doc->>'name'                AS name,
          doc->>'city'                AS city,
          doc->>'state'               AS state,
          doc->>'categories'          AS categories,
          (doc->>'stars')::float      AS stars,
          (doc->>'review_count')::int AS review_count,
          doc->'attributes'           AS attributes,
          doc->'hours'                AS hours
        FROM yelp.business_raw;
    """))
    conn.execute(text("ALTER TABLE yelp.business ADD PRIMARY KEY (business_id);"))

    # Recreate coffee table
    conn.execute(text("""
        CREATE TABLE yelp.business_coffee AS
        SELECT *
        FROM yelp.business
        WHERE categories ILIKE '%Coffee & Tea%';
    """))

# sanity counts
with engine.begin() as conn:
    all_biz = conn.execute(text("SELECT COUNT(*) FROM yelp.business;")).scalar_one()
    coffee  = conn.execute(text("SELECT COUNT(*) FROM yelp.business_coffee;")).scalar_one()
all_biz, coffee


(150346, 6704)

In [9]:
# Path to the reviews file (adjust if your folder/name differs)
review_path = r"C:\Users\syksh\Coursera\SQL-Capstone-Yelp\data\raw data\yelp_academic_dataset_review.json"

# This uses the load_jsonl_to_pg(...) function you already defined
stats_rev = load_jsonl_to_pg(review_path, "yelp.review_raw", batch_size=10000)
stats_rev  # shows {'total_lines': ..., 'inserted': ..., 'skipped_bad': ...}


Loading yelp_academic_dataset_review.json: 6990280it [07:09, 16280.59it/s]


{'total_lines': 6990280, 'inserted': 6990280, 'skipped_bad': 0}

In [18]:
from sqlalchemy import text

with engine.begin() as conn:
    # start clean
    conn.execute(text("DROP TABLE IF EXISTS yelp.review_coffee;"))
    conn.execute(text("DROP TABLE IF EXISTS yelp.review;"))

    # build a robust typed table:
    # - stars as FLOAT (handles "3.0")
    # - date parsed only if it looks like YYYY-MM-DD...
    # - useful cast to INT only if it's digits, else 0
    conn.execute(text("""
        CREATE TABLE yelp.review AS
        SELECT
          doc->>'review_id'                 AS review_id,
          doc->>'business_id'               AS business_id,
          (doc->>'stars')::float            AS stars,
          CASE
            WHEN (doc->>'date') ~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
              THEN (doc->>'date')::timestamp
            ELSE NULL
          END                               AS review_date,
          doc->>'text'                      AS text,
          CASE
            WHEN (doc->>'useful') ~ '^[0-9]+$'
              THEN (doc->>'useful')::int
            ELSE 0
          END                               AS useful
        FROM yelp.review_raw
        WHERE (doc->>'stars') ~ '^[0-9]+(\\.[0-9]+)?$'
          AND doc ? 'review_id'
          AND doc ? 'business_id';
    """))

    conn.execute(text("ALTER TABLE yelp.review ADD PRIMARY KEY (review_id);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_review_business_id ON yelp.review(business_id);"))

    # coffee/tea-only subset
    conn.execute(text("""
        CREATE TABLE yelp.review_coffee AS
        SELECT r.*
        FROM yelp.review r
        JOIN yelp.business_coffee b USING (business_id);
    """))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_review_coffee_bid ON yelp.review_coffee(business_id);"))

# sanity counts
with engine.begin() as conn:
    n_all = conn.execute(text("SELECT COUNT(*) FROM yelp.review;")).scalar_one()
    n_cof = conn.execute(text("SELECT COUNT(*) FROM yelp.review_coffee;")).scalar_one()

n_all, n_cof


(6990280, 442356)

In [10]:
from sqlalchemy import text
import pandas as pd

# counts
with engine.begin() as conn:
    n_all = conn.execute(text("SELECT COUNT(*) FROM yelp.review;")).scalar_one()
    n_cof = conn.execute(text("SELECT COUNT(*) FROM yelp.review_coffee;")).scalar_one()
print("reviews:", n_all, " | coffee-only reviews:", n_cof)

# peek a few rows to confirm schema
display(pd.read_sql("SELECT * FROM yelp.review LIMIT 3;", engine))
display(pd.read_sql("SELECT * FROM yelp.review_coffee LIMIT 3;", engine))


reviews: 6990280  | coffee-only reviews: 442356


Unnamed: 0,review_id,business_id,stars,review_date,text,useful
0,zTmYV7KFlEQSKu7POc5PUg,0ijmmbTvd8fcLclFVdpIsQ,3.0,2020-08-14 01:41:03,Food was quite good and the place has a cool l...,3
1,rzNt4vMn2ROFISZoXhWQvg,MhmZuFRxLt-54jaRBZ8X2w,5.0,2020-10-30 13:13:57,Went from an old pair of wooden french doors t...,0
2,NDkqReZB29BPSWkcf_td9g,kfpwO_cdVQdTh2_oMDZ51Q,5.0,2020-09-03 14:17:48,Stopped by here twice. Once for lunch and the ...,0


Unnamed: 0,review_id,business_id,stars,review_date,text,useful
0,yX4bm3CbcU6JSjulBeEhOw,C8qLAGhE2UP3NgdShFuOPg,5.0,2021-04-11 17:07:26,I'm an avid Tiger Sugar fan and I can say that...,4
1,a8dkRAIhXGu2P5FxqY0XmQ,MdcgaLvlmhYwpb5l3NT19g,5.0,2021-05-31 03:44:10,I love cooking with all the delicious olive oi...,0
2,WVEylTcUIDRqlUUNRQyhMw,picPFi6JAQ6VDQUCOqftmQ,1.0,2020-10-07 19:36:56,How is this place even open? It's barely a bus...,0


In [11]:
from sqlalchemy import text
import pandas as pd

with engine.begin() as conn:
    n_biz  = conn.execute(text("SELECT COUNT(*) FROM yelp.business;")).scalar_one()
    n_cofB = conn.execute(text("SELECT COUNT(*) FROM yelp.business_coffee;")).scalar_one()
    n_rev  = conn.execute(text("SELECT COUNT(*) FROM yelp.review;")).scalar_one()
    n_cofR = conn.execute(text("SELECT COUNT(*) FROM yelp.review_coffee;")).scalar_one()
print("businesses:", n_biz, "| coffee businesses:", n_cofB,
      "| reviews:", n_rev, "| coffee reviews:", n_cofR)


businesses: 150346 | coffee businesses: 6704 | reviews: 6990280 | coffee reviews: 442356


In [12]:
pd.read_sql("""
SELECT corr(stars::float, ln(review_count + 1)::float) AS r
FROM yelp.business_coffee
WHERE stars IS NOT NULL AND review_count IS NOT NULL;
""", engine)


Unnamed: 0,r
0,0.239948


In [13]:
pd.read_sql("""
WITH city_counts AS (
  SELECT city, COUNT(*) AS biz FROM yelp.business_coffee GROUP BY city
),
ranked AS (
  SELECT city, biz, NTILE(100) OVER (ORDER BY biz) AS pctile FROM city_counts
),
biz_city AS (
  SELECT b.*, (r.pctile >= 85)::int AS urban
  FROM yelp.business_coffee b JOIN ranked r USING (city)
),
agg AS (
  SELECT urban,
         AVG(stars::float) AS avg_stars,
         STDDEV_POP(stars::float) AS sd_stars,
         AVG(review_count::float) AS avg_reviews,
         STDDEV_POP(review_count::float) AS sd_reviews,
         COUNT(*) AS n
  FROM biz_city
  GROUP BY urban
)
SELECT
  (SELECT avg_stars FROM agg WHERE urban=1) - (SELECT avg_stars FROM agg WHERE urban=0) AS diff_stars,
  (
    (SELECT avg_stars FROM agg WHERE urban=1) - (SELECT avg_stars FROM agg WHERE urban=0)
  ) / SQRT((
    (SELECT sd_stars^2*(n-1) FROM agg WHERE urban=1) +
    (SELECT sd_stars^2*(n-1) FROM agg WHERE urban=0)
  )::float / (
    (SELECT n FROM agg WHERE urban=1) + (SELECT n FROM agg WHERE urban=0) - 2
  )) AS cohen_d_stars,
  (SELECT avg_reviews FROM agg WHERE urban=1) - (SELECT avg_reviews FROM agg WHERE urban=0) AS diff_reviews
""", engine)


Unnamed: 0,diff_stars,cohen_d_stars,diff_reviews
0,0.263998,0.262665,39.89379


In [14]:
# install once per kernel
!pip install scikit-learn --quiet

import pandas as pd
from sqlalchemy import text
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

# grab a manageable slice (you can remove LIMIT once it runs fine)
q = """
SELECT text, stars
FROM yelp.review_coffee
WHERE text IS NOT NULL
LIMIT 200000
"""
df = pd.read_sql(q, engine)

# buckets
pos = df[df["stars"]>=4]["text"].fillna("")
neg = df[df["stars"]<=2]["text"].fillna("")

# vectorize
vec = TfidfVectorizer(lowercase=True, stop_words="english", min_df=10, max_features=20000)
X_pos = vec.fit_transform(pos)
feat = np.array(vec.get_feature_names_out())
top_pos = feat[np.argsort((X_pos.mean(axis=0).A1))[-15:][::-1]]

# re-fit on negatives to get their distinctive terms
vec2 = TfidfVectorizer(lowercase=True, stop_words="english", min_df=10, max_features=20000)
X_neg = vec2.fit_transform(neg)
feat2 = np.array(vec2.get_feature_names_out())
top_neg = feat2[np.argsort((X_neg.mean(axis=0).A1))[-15:][::-1]]

print("Top positive terms:", ", ".join(top_pos))
print("Top negative terms:", ", ".join(top_neg))


Top positive terms: coffee, great, place, good, food, love, delicious, service, friendly, best, staff, breakfast, like, amazing, really
Top negative terms: coffee, food, order, place, service, just, time, like, good, don, minutes, ordered, got, drive, ve


In [15]:
# H1 (reviews ↔ stars)
pd.read_sql("""
SELECT corr(stars::float, ln(review_count + 1)::float) AS r
FROM yelp.business_coffee
WHERE stars IS NOT NULL AND review_count IS NOT NULL;
""", engine)


Unnamed: 0,r
0,0.239948


In [17]:
#H2 (urban vs non-urban)

pd.read_sql("""
WITH city_counts AS (
  SELECT city, COUNT(*) AS biz FROM yelp.business_coffee GROUP BY city
),
ranked AS (
  SELECT city, biz, NTILE(100) OVER (ORDER BY biz) AS pctile FROM city_counts
),
biz_city AS (
  SELECT b.*, (r.pctile >= 85)::int AS urban
  FROM yelp.business_coffee b JOIN ranked r USING (city)
),
agg AS (
  SELECT urban,
         AVG(stars::float) AS avg_stars,
         STDDEV_POP(stars::float) AS sd_stars,
         AVG(review_count::float) AS avg_reviews,
         STDDEV_POP(review_count::float) AS sd_reviews,
         COUNT(*) AS n
  FROM biz_city
  GROUP BY urban
)
SELECT
  (SELECT avg_stars FROM agg WHERE urban=1) - (SELECT avg_stars FROM agg WHERE urban=0) AS diff_stars,
  (
    (SELECT avg_stars FROM agg WHERE urban=1) - (SELECT avg_stars FROM agg WHERE urban=0)
  ) / SQRT((
    (SELECT sd_stars^2*(n-1) FROM agg WHERE urban=1) +
    (SELECT sd_stars^2*(n-1) FROM agg WHERE urban=0)
  )::float / (
    (SELECT n FROM agg WHERE urban=1) + (SELECT n FROM agg WHERE urban=0) - 2
  )) AS cohen_d_stars,
  (SELECT avg_reviews FROM agg WHERE urban=1) - (SELECT avg_reviews FROM agg WHERE urban=0) AS diff_reviews
""", engine)


Unnamed: 0,diff_stars,cohen_d_stars,diff_reviews
0,0.263998,0.262665,39.89379


In [6]:
from pathlib import Path

# Project root = wherever your notebook lives
PROJECT_DIR = Path.cwd().parent  # ..\SQL-Capstone-Yelp\notebooks\ → goes up one level
DATA_DIR = PROJECT_DIR / "data" / "raw_data"   # safer to avoid space in folder name!
SAMPLES_DIR = DATA_DIR   # write samples directly into raw_data


# --- Generate small, matching samples for GitHub users ---
N_BUSINESS = 100
N_REVIEWS  = 300

OUT_BIZ = SAMPLES_DIR / "sample_business.csv"
OUT_REV = SAMPLES_DIR / "sample_review.csv"

# Deterministic sample (seed + sample in the SAME connection)
with engine.begin() as conn:
    conn.execute(text("SELECT setseed(0.42);"))
    sample_biz = pd.read_sql(
        text("""
            SELECT *
            FROM yelp.business
            WHERE categories ILIKE '%Coffee & Tea%'
            ORDER BY random()
            LIMIT :n
        """),
        conn,
        params={"n": N_BUSINESS}
    )

biz_ids = sample_biz["business_id"].tolist() or ["__none__"]

qry_reviews = text("""
    SELECT *
    FROM yelp.review
    WHERE business_id IN :ids
    LIMIT :cap
""").bindparams(bindparam("ids", expanding=True), bindparam("cap"))

with engine.begin() as conn:
    sample_reviews = pd.read_sql(qry_reviews, conn, params={"ids": biz_ids, "cap": N_REVIEWS})

# Write files (→ data/samples/)
sample_biz.to_csv(OUT_BIZ, index=False)
sample_reviews.to_csv(OUT_REV, index=False)

print("Saved business sample to:", OUT_BIZ.resolve(), f"({len(sample_biz)} rows)")
print("Saved review sample to:", OUT_REV.resolve(), f"({len(sample_reviews)} rows)")



Saved business sample to: C:\Users\syksh\Coursera\SQL-Capstone-Yelp\data\raw_data\sample_business.csv (100 rows)
Saved review sample to: C:\Users\syksh\Coursera\SQL-Capstone-Yelp\data\raw_data\sample_review.csv (300 rows)
