In [1]:
# Install psycopg3 to Connect to PostgreSQL, create a table, and insert data
# pip install psycopg2-binary

In [1]:
import psycopg2
import pandas as pd
import numpy as np
import random
from random import randrange
import json
import os
import ast
from pymongo import MongoClient
import time
import pprint

## Data Cleaning and Sampling (Not to run again since the CSVs have been creaed)

In [None]:
def reservoir_sample(data, n, k):
  # fill the reservoir array
  r = []
  for i in range(k):
    r.append(data[i])

  # replace elements w/gradually decreasing prob.
  for i in range(k, n):
    # generates a uniform integer between 0 and a-1
    j = randrange(i+1)
    if j < k:
        r[j] = data[i]

  return r

In [3]:
def fix_json_columns(df, columns):
    for col in columns:
        if col in df.columns:
            def safe_parse(x):
                if pd.isnull(x):
                    return None
                if isinstance(x, dict):
                    return json.dumps(x)
                if isinstance(x, str):
                    try:
                        parsed = ast.literal_eval(x)
                        return json.dumps(parsed)
                    except (ValueError, SyntaxError):
                        return None  # or keep as is: return x
                return None
            df[col] = df[col].apply(safe_parse)
    return df

In [4]:
# Create data folder
os.makedirs("data", exist_ok=True)

In [5]:
print("Sampling 15,000 businesses using reservoir sampling...")

# Load business dataset
business_file = pd.read_json("yelp_academic_dataset_business.json", lines=True)
business_dict = business_file.to_dict(orient='records')

# Sample business dataset with a size of 15000
n = len(business_file)
k = 15000

business_sample = reservoir_sample(business_dict, n, k)
business_df = pd.DataFrame(business_sample)

# Clean data
business_df.dropna(subset=['business_id', 'name', 'city', 'stars', 'latitude', 'longitude'], inplace=True)
business_df = business_df[business_df['stars'] > 0]
business_df = business_df[business_df['review_count'] > 0]
business_df = business_df.drop_duplicates(subset='business_id')
business_df = fix_json_columns(business_df, ['attributes', 'hours'])

# Save cleaned businesses
business_df.to_csv('data/business_sample.csv', index=False)
print(f"Saved {len(business_df)} business records.")

# Extract business_id from business_df
business_ids = set(business_df["business_id"])

Sampling 15,000 businesses using reservoir sampling...
Saved 15000 business records.


In [6]:
# Load and filter review dataset
print("Filtering reviews for sampled businesses...")
review_sample = []

with open('yelp_academic_dataset_review.json', 'r') as f:
    for line in f:
        review = json.loads(line)
        if review['business_id'] in business_ids:
            review_sample.append(review)

review_df = pd.DataFrame(review_sample)

# Clean data
review_df.dropna(subset=['review_id', 'user_id', 'business_id', 'text', 'stars', 'date'], inplace=True)
review_df = review_df[review_df['stars'] > 0]
review_df = review_df.drop_duplicates(subset='review_id')

# Save cleaned reviews
review_df.to_csv('data/review_sample.csv', index=False)
print(f"Saved {len(review_df)} review records.")

# Extract user_id from df_review_sample
user_ids = set(review_df["user_id"])

Filtering reviews for sampled businesses...
Saved 685279 review records.


In [7]:
# Load and filter user dataset
print("Filtering users who wrote sampled reviews...")
user_sample = []

with open('yelp_academic_dataset_user.json', 'r') as f:
    for line in f:
        user = json.loads(line)
        if user['user_id'] in user_ids:
            user_sample.append(user)

user_df = pd.DataFrame(user_sample)

# Clean data
user_df.dropna(subset=['user_id', 'name', 'review_count', 'yelping_since'], inplace=True)
user_df = user_df[user_df['review_count'] > 0]
user_df = user_df.drop_duplicates(subset='user_id')

# Save cleaned users
user_df.to_csv("data/user_sample.csv", index=False)
print(f"Saved {len(user_df)} users.")

Filtering users who wrote sampled reviews...
Saved 413059 users.


In [8]:
l=[business_df, review_df, user_df]
for df in l:
    print(f"Rows after cleaning: {len(df)}")
    print("Null check:")
    print(df.isnull().sum())
    print(df.columns)
    print('\n')

Rows after cleaning: 15000
Null check:
business_id        0
name               0
address            0
city               0
state              0
postal_code        0
latitude           0
longitude          0
stars              0
review_count       0
is_open            0
attributes      1398
categories        14
hours           2253
dtype: int64
Index(['business_id', 'name', 'address', 'city', 'state', 'postal_code',
       'latitude', 'longitude', 'stars', 'review_count', 'is_open',
       'attributes', 'categories', 'hours'],
      dtype='object')


Rows after cleaning: 685279
Null check:
review_id      0
user_id        0
business_id    0
stars          0
useful         0
funny          0
cool           0
text           0
date           0
dtype: int64
Index(['review_id', 'user_id', 'business_id', 'stars', 'useful', 'funny',
       'cool', 'text', 'date'],
      dtype='object')


Rows after cleaning: 413059
Null check:
user_id               0
name                  0
review_count        

## Create PostgreSQL Database

In [9]:
# Connect to the default database
conn = psycopg2.connect("dbname=postgres user=kaiting host=localhost")
conn.autocommit = True
cur = conn.cursor()

# Create `yelp` database if it doesn't exist
cur.execute("SELECT 1 FROM pg_database WHERE datname = 'yelp'")
if not cur.fetchone():
    cur.execute("CREATE DATABASE yelp")
    print("Database 'yelp' created.")
else:
    print("Database 'yelp' already exists.")

Database 'yelp' already exists.


## Create Tables (Do not run multiple times if created)

In [10]:
cur.execute("""
DROP TABLE IF EXISTS business;
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS business (
    business_id TEXT PRIMARY KEY,
    name TEXT,
    address TEXT,
    city TEXT,
    state TEXT,
    postal_code TEXT,
    latitude FLOAT,
    longitude FLOAT,
    stars FLOAT,
    review_count INT,
    is_open INT,
    attributes JSONB,
    categories TEXT,
    hours JSONB
);
""")

cur.execute("""
DROP TABLE IF EXISTS review;
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS review (
    review_id TEXT PRIMARY KEY,
    user_id TEXT,
    business_id TEXT,
    stars FLOAT,
    useful INT,
    funny INT,
    cool INT,
    text TEXT,
    date DATE
);
""")

cur.execute("""
DROP TABLE IF EXISTS users;
""")

cur.execute("""
CREATE TABLE IF NOT EXISTS users (
    user_id TEXT PRIMARY KEY,
    name TEXT,
    review_count INT,
    yelping_since DATE,
    useful INT,
    funny INT,
    cool INT,
    elite TEXT,
    friends TEXT,
    fans INT,
    average_stars FLOAT,
    compliment_hot INT,
    compliment_more INT,
    compliment_profile INT,
    compliment_cute INT,
    compliment_list INT,
    compliment_note INT,
    compliment_plain INT,
    compliment_cool INT,
    compliment_funny INT,
    compliment_writer INT,
    compliment_photos INT
);
""")

conn.commit()
print("Tables created successfully.")

with open('data/business_sample.csv', 'r') as f:
    cur.copy_expert("COPY business FROM STDIN WITH CSV HEADER DELIMITER ','", f)
conn.commit()

with open('data/review_sample.csv', 'r') as f:
    cur.copy_expert("COPY review FROM STDIN WITH CSV HEADER DELIMITER ','", f)
conn.commit()

with open('data/user_sample.csv', 'r') as f:
    cur.copy_expert("COPY users FROM STDIN WITH CSV HEADER DELIMITER ','", f)
conn.commit()

print("Data loaded successfully.")

Tables created successfully.
Data loaded successfully.


In [11]:
# Sample query test
cur.execute("""
SELECT * 
FROM business
LIMIT 5;
""")

# Fetch and display results
results = cur.fetchall()
for row in results:
    print(row)

('DBVINWAA-BJdS_-g0vlfGg', 'Zazen', '832 W Lancaster Ave', 'Bryn Mawr', 'PA', '19010', 40.0218819, -75.3192112, 4.5, 24, 0, {'GoodForKids': 'True', 'BusinessParking': "{'garage': False, 'street': True, 'validated': False, 'lot': False, 'valet': False}", 'ByAppointmentOnly': 'False', 'RestaurantsPriceRange2': '2', 'BusinessAcceptsCreditCards': 'True'}, 'Hair Removal, Makeup Artists, Hair Salons, Waxing, Nail Salons, Blow Dry/Out Services, Beauty & Spas', {'Friday': '10:0-19:0', 'Monday': '11:30-18:0', 'Sunday': '11:30-16:30', 'Tuesday': '10:0-18:0', 'Saturday': '9:30-17:30', 'Thursday': '10:0-19:0', 'Wednesday': '10:0-18:0'})
('NCwvtN1oZvLamyOhZ2OzAQ', 'LA Fitness', '1435 Walnut Street', 'Philadelphia', 'PA', '19102', 39.9497264263, -75.1657220984, 2.0, 39, 0, {'GoodForKids': 'False', 'BusinessAcceptsCreditCards': 'False'}, 'Sports Clubs, Gyms, Trainers, Fitness & Instruction, Active Life', {'Friday': '6:0-21:0', 'Monday': '5:0-23:0', 'Sunday': '8:0-18:0', 'Tuesday': '5:0-23:0', 'Saturd

## Benchmark Set of Tasks

### Query 1: Top 5 Highest Rated Businesses

In [4]:
cur.execute("""
SELECT name, stars, review_count
FROM business
ORDER BY stars DESC, review_count DESC
LIMIT 5;
""")

results = cur.fetchall()
print("\n Benchmark Query 1: Top 5 Highest Rated Businesses:")
for row in results:
    print(row)


 Benchmark Query 1: Top 5 Highest Rated Businesses:
('Blues City Deli', 5.0, 991)
('Carlillos Cocina', 5.0, 799)
('Free Tours By Foot', 5.0, 769)
('Tumerico', 5.0, 705)
('Yats', 5.0, 623)


### Query 2: Cities with the Most Businesses

In [5]:
cur.execute("""
SELECT city, COUNT(*) as num_businesses
FROM business
GROUP BY city
ORDER BY num_businesses DESC
LIMIT 5;
""")

results = cur.fetchall()
print("\n Benchmark Query 2: Cities with the Most Businesses:")
for row in results:
    print(row)


 Benchmark Query 2: Cities with the Most Businesses:
('Philadelphia', 14569)
('Tucson', 9250)
('Tampa', 9050)
('Indianapolis', 7540)
('Nashville', 6971)


### Query 3: Average Review Stars by City

In [6]:
cur.execute("""
SELECT city, ROUND(AVG(stars)::numeric, 2) as avg_stars
FROM business
GROUP BY city
ORDER BY avg_stars DESC
LIMIT 10;
""")

results = cur.fetchall()
print("\n Benchmark Query 3: Average Review Stars by City:")
for row in results:
    print(row)


 Benchmark Query 3: Average Review Stars by City:
('Chicago', Decimal('5.00'))
('Center City Philadelphia', Decimal('5.00'))
('Chattanooga', Decimal('5.00'))
('Bridgeton ', Decimal('5.00'))
('Arizona', Decimal('5.00'))
('Catalina Foothills', Decimal('5.00'))
('Belleville ', Decimal('5.00'))
('Carpinteria ', Decimal('5.00'))
('Aliso Viejo', Decimal('5.00'))
('COLMAR', Decimal('5.00'))


### Query 4: Users Who Wrote More Than 50 Reviews

In [7]:
cur.execute("""
SELECT name, review_count
FROM users
WHERE review_count > 50
ORDER BY review_count DESC
LIMIT 10;
""")

results = cur.fetchall()
print("\n Benchmark Query 4: Users Who Wrote More Than 50 Reviews:")
for row in results:
    print(row)


 Benchmark Query 4: Users Who Wrote More Than 50 Reviews:
('Fox', 17473)
('Victor', 16978)
('Bruce', 16567)
('Shila', 12868)
('Kim', 9941)
('Nijole', 8363)
('Vincent', 8354)
('George', 7738)
('Kenneth', 6766)
('Jennifer', 6679)


### Query 5: Most Common Categories (simple)

In [8]:
cur.execute("""
SELECT categories, COUNT(*) as category_count
FROM business
GROUP BY categories
ORDER BY category_count DESC
LIMIT 10;
""")

results = cur.fetchall()
print("\n Benchmark Query 5: Most Common Categories:")
for row in results:
    print(row)


 Benchmark Query 5: Most Common Categories:
('Beauty & Spas, Nail Salons', 1012)
('Restaurants, Pizza', 935)
('Nail Salons, Beauty & Spas', 934)
('Pizza, Restaurants', 823)
('Restaurants, Mexican', 728)
('Restaurants, Chinese', 708)
('Mexican, Restaurants', 672)
('Chinese, Restaurants', 651)
('Food, Coffee & Tea', 508)
('Beauty & Spas, Hair Salons', 493)


## Load MongoDB

In [38]:
# Connect to MongoDB server 
client = MongoClient('localhost', 27017)
db = client['yelp']

# Load data
def load_csv_to_mongo(csv_path, collection_name):
    print(f"Loading {collection_name}...")
    df = pd.read_csv(csv_path)
    records = df.to_dict(orient='records')
    db.collection_name.drop()
    db.collection_name.insert_many(records)
    print(f"Inserted {len(records)} records into {collection_name}.")

load_csv_to_mongo('data/business_sample.csv', 'business')
load_csv_to_mongo('data/review_sample.csv', 'review')
load_csv_to_mongo('data/user_sample.csv', 'users')

print("All data loaded into MongoDB successfully!")

Loading business...
Inserted 15000 records into business.
Loading review...
Inserted 685279 records into review.
Loading users...
Inserted 413059 records into users.
All data loaded into MongoDB successfully!


## Task Selection

### 1. Number of 5-Star Reviews per Business

In [13]:
cur.execute("""
SELECT business_id, COUNT(*) AS five_star_reviews
FROM review
WHERE stars = 5
GROUP BY business_id
ORDER BY five_star_reviews DESC
LIMIT 10;
""")

results = cur.fetchall()
print("Number of 5-Star Reviews per Business:")
for row in results:
    print(row)

Number of 5-Star Reviews per Business:
('ac1AeYqs8Z4_e2X5M3if2A', 4012)
('_C7QiQQc47AOEv4PE3Kong', 3095)
('yPSejq3_erxo9zdVYTBnZA', 2693)
('I_3LMZ_1m2mzR0oLIOePIg', 2675)
('6ajnOk0GcY9xbb5Ocaw8Gw', 1574)
('GuzbBFraIq-fbkjfvaTRvg', 1523)
('3YqUe2FTCQr0pPVK8oCv6Q', 1367)
('vN6v8m4DO45Z4pp8yxxF_w', 1338)
('8uF-bhJFgT4Tn6DTb27viA', 1309)
('PY9GRfzr4nTZeINf346QOw', 1131)


In [14]:
cur.execute("""
EXPLAIN ANALYZE
SELECT business_id, COUNT(*) AS five_star_reviews
FROM review
WHERE stars = 5
GROUP BY business_id
ORDER BY five_star_reviews DESC
LIMIT 10;
""")

plan = cur.fetchall()
print("Execution Plan for usinesses Active in 2018 But Inactive 2019–2020:")
for line in plan:
    print(line[0])

Execution Plan for usinesses Active in 2018 But Inactive 2019–2020:
ac1AeYqs8Z4_e2X5M3if2A
_C7QiQQc47AOEv4PE3Kong
yPSejq3_erxo9zdVYTBnZA
I_3LMZ_1m2mzR0oLIOePIg
6ajnOk0GcY9xbb5Ocaw8Gw
GuzbBFraIq-fbkjfvaTRvg
3YqUe2FTCQr0pPVK8oCv6Q
vN6v8m4DO45Z4pp8yxxF_w
8uF-bhJFgT4Tn6DTb27viA
PY9GRfzr4nTZeINf346QOw


### 2. Avg rating given by each user

#### SQL

In [22]:
cur.execute("""
SELECT r.user_id, COUNT(r.review_id) AS review_count
FROM review r
GROUP BY r.user_id
ORDER BY review_count DESC
LIMIT 5;
""")

results = cur.fetchall()
print("UAvg rating given by each user:")
for row in results:
    print(row)

UAvg rating given by each user:
('_BcWyKQL16ndpBdggh2kNA', 287)
('Xw7ZjaGfr0WNVt6s_5KZfA', 187)
('0Igx-a1wAstiBDerGxXk2A', 181)
('-G7Zkl1wIWBBmD0KRy_sCw', 168)
('ET8n-r7glWYqZhuR6GcdNw', 162)


In [23]:
cur.execute("""
EXPLAIN ANALYZE
SELECT r.user_id, COUNT(r.review_id) AS review_count
FROM review r
GROUP BY r.user_id
ORDER BY review_count DESC
LIMIT 5;
""")

plan = cur.fetchall()
print("Execution Plan for Users Who Reviewed in 3+ Different Cities:")
for line in plan:
    print(line[0])

Execution Plan for Users Who Reviewed in 3+ Different Cities:
Limit  (cost=143282.70..143282.71 rows=5 width=31) (actual time=450.844..454.823 rows=5 loops=1)
  ->  Sort  (cost=143282.70..143701.04 rows=167337 width=31) (actual time=450.842..454.821 rows=5 loops=1)
        Sort Key: (count(review_id)) DESC
        Sort Method: top-N heapsort  Memory: 25kB
        ->  Finalize GroupAggregate  (cost=94711.63..140503.29 rows=167337 width=31) (actual time=198.364..429.305 rows=413069 loops=1)
              Group Key: user_id
              ->  Gather Merge  (cost=94711.63..137156.55 rows=334674 width=31) (actual time=198.357..370.180 rows=512146 loops=1)
                    Workers Planned: 2
                    Workers Launched: 2
                    ->  Partial GroupAggregate  (cost=93711.61..97526.84 rows=167337 width=31) (actual time=190.510..268.327 rows=170715 loops=3)
                          Group Key: user_id
                          ->  Sort  (cost=93711.61..94425.56 rows=285582

#### MQL

In [19]:
pipeline = [
    {"$group": {"_id": "$user_id", "review_count": {"$sum": 1}}},
    {"$sort": {"review_count": -1}},
    {"$limit": 5}
]

results = list(db.review.aggregate(pipeline))
# pd.DataFrame(results)
print("Avg rating given by each user (MongoDB):")
pprint.pprint(results)
for row in results:
    print(f"Number of results: {len(results)}")
    print(row)

Avg rating given by each user (MongoDB):
[{'_id': '_BcWyKQL16ndpBdggh2kNA', 'review_count': 1431},
 {'_id': 'Xw7ZjaGfr0WNVt6s_5KZfA', 'review_count': 845},
 {'_id': '0Igx-a1wAstiBDerGxXk2A', 'review_count': 827},
 {'_id': '-G7Zkl1wIWBBmD0KRy_sCw', 'review_count': 779},
 {'_id': 'bYENop4BuQepBjM1-BI3fA', 'review_count': 742}]
Number of results: 5
{'_id': '_BcWyKQL16ndpBdggh2kNA', 'review_count': 1431}
Number of results: 5
{'_id': 'Xw7ZjaGfr0WNVt6s_5KZfA', 'review_count': 845}
Number of results: 5
{'_id': '0Igx-a1wAstiBDerGxXk2A', 'review_count': 827}
Number of results: 5
{'_id': '-G7Zkl1wIWBBmD0KRy_sCw', 'review_count': 779}
Number of results: 5
{'_id': 'bYENop4BuQepBjM1-BI3fA', 'review_count': 742}


In [40]:
# Enable profiling for all operations
db.command("profile", 2)

# Run aggregation query
pipeline = [
    {"$group": {"_id": "$user_id", "avg_rating": {"$avg": "$stars"}}},
    {"$sort": {"avg_rating": -1}},
    {"$limit": 10}
]
result = list(db.review.aggregate(pipeline))

# Ensure profiling is logged
time.sleep(1)

# Fetch latest profiling info
profile = db["system.profile"].find(
    {"ns": "yelp.review", "op": "command", "command.aggregate": "review"}
).sort("ts", -1).limit(1)

# Show profiling result
for doc in profile:
    pprint.pprint(doc)

# Turn off profiling to avoid overhead
db.command("profile", 0)

{'allUsers': [],
 'client': '127.0.0.1',
 'command': {'$db': 'yelp',
             'aggregate': 'review',
             'cursor': {},
             'lsid': {'id': Binary(b'<\xe6ve[\xf7F\xd7\xba\x97\x1d\x1b\x8fA</', 4)},
             'pipeline': [{'$group': {'_id': '$user_id',
                                      'avg_rating': {'$avg': '$stars'}}},
                          {'$sort': {'avg_rating': -1}},
                          {'$limit': 10}]},
 'cursorExhausted': True,
 'docsExamined': 685279,
 'flowControl': {},
 'hasSortStage': True,
 'keysExamined': 0,
 'locks': {'Global': {'acquireCount': {'r': 73}}},
 'millis': 920,
 'nreturned': 10,
 'ns': 'yelp.review',
 'numYield': 34,
 'op': 'command',
 'planCacheKey': '4714AAE1',
 'planCacheShapeHash': '1BB56745',
 'planSummary': 'COLLSCAN',
 'planningTimeMicros': 3269,
 'protocol': 'op_msg',
 'queryFramework': 'sbe',
 'queryHash': '1BB56745',
 'queryShapeHash': 'B93C95016FFAF0176AFE39DF49A5130593C64A3E253F6539A71AD809A56A725C',
 'responseLe

{'was': 2, 'slowms': 100, 'sampleRate': 1.0, 'ok': 1.0}

### 3. Users who reviewed businesses in 3+ different cities

#### SQL

In [15]:
cur.execute("""
SELECT r.user_id, COUNT(DISTINCT b.city) AS num_cities
FROM review r
JOIN business b ON r.business_id = b.business_id
GROUP BY r.user_id
HAVING COUNT(DISTINCT b.city) >= 3
ORDER BY num_cities DESC
LIMIT 10;
""")

results = cur.fetchall()
print("Users Who Reviewed in 3+ Different Cities:")
for row in results:
    print(row)

Users Who Reviewed in 3+ Different Cities:
('_BcWyKQL16ndpBdggh2kNA', 55)
('-G7Zkl1wIWBBmD0KRy_sCw', 50)
('vmUqcqMjlWoBM6qfmUXgyQ', 37)
('RCZ5M9o2-fxgFuurpmEs3w', 36)
('GcdYgbaF75vj7RO6EZhPOQ', 36)
('ppsm8EDKjA1fp1yTCP3RrQ', 36)
('ouODopBKF3AqfCkuQEnrDg', 34)
('XzpJ4uHkxARCFQiZ9bffyg', 33)
('nxNO7_6U2Nu90v0IdnEhIg', 32)
('zUk_Ww2q1At1QSyRbUjIGQ', 31)


In [16]:
cur.execute("""
EXPLAIN ANALYZE
SELECT r.user_id, COUNT(DISTINCT b.city) AS num_cities
FROM review r
JOIN business b ON r.business_id = b.business_id
GROUP BY r.user_id
HAVING COUNT(DISTINCT b.city) >= 3
ORDER BY num_cities DESC;
""")

plan = cur.fetchall()
print("Execution Plan for Users Who Reviewed in 3+ Different Cities:")
for line in plan:
    print(line[0])

Execution Plan for Users Who Reviewed in 3+ Different Cities:
Sort  (cost=178246.86..178396.12 rows=59705 width=31) (actual time=900.507..901.018 rows=18267 loops=1)
  Sort Key: (count(DISTINCT b.city)) DESC
  Sort Method: quicksort  Memory: 1625kB
  ->  GroupAggregate  (cost=165666.34..173510.59 rows=59705 width=31) (actual time=546.057..899.005 rows=18267 loops=1)
        Group Key: r.user_id
        Filter: (count(DISTINCT b.city) >= 3)
        Rows Removed by Filter: 426534
        ->  Sort  (cost=165666.34..167534.78 rows=747377 width=33) (actual time=545.988..798.991 rows=746656 loops=1)
              Sort Key: r.user_id, b.city
              Sort Method: external merge  Disk: 31824kB
              ->  Hash Join  (cost=1689.50..72317.69 rows=747377 width=33) (actual time=4.234..239.311 rows=746656 loops=1)
                    Hash Cond: (r.business_id = b.business_id)
                    ->  Seq Scan on review r  (cost=0.00..68665.77 rows=747377 width=46) (actual time=0.011..140.

#### MQL

In [41]:
pipeline = [
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    { "$unwind": "$business_info" },
    {
        "$group": {
            "_id": "$user_id",
            "cities_reviewed": { "$addToSet": "$business_info.city" }
        }
    },
    {
        "$project": {
            "user_id": "$_id",
            "num_cities": { "$size": "$cities_reviewed" }
        }
    },
    { "$match": { "num_cities": { "$gte": 3 } } },
    { "$sort": { "num_cities": -1 } },
    { "$limit": 10 }
]

results = list(db.review.aggregate(pipeline))
print("Users Who Reviewed in 3+ Cities (MongoDB):")
for row in results:
    print(f"Number of results: {len(results)}")
    print(row)

KeyboardInterrupt: 

In [None]:
pipeline = [
    # 1. Join with the business collection to get each review’s city
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    # 2. Unwind the joined array
    { "$unwind": "$business_info" },

    # 3. Group by user, collecting the set of cities they’ve reviewed in
    {
        "$group": {
            "_id": "$user_id",
            "uniqueCities": { "$addToSet": "$business_info.city" }
        }
    },
    # 4. Compute the number of distinct cities per user
    {
        "$addFields": {
            "cityCount": { "$size": "$uniqueCities" }
        }
    },
    # 5. Keep only those with three or more distinct cities
    {
        "$match": {
            "cityCount": { "$gte": 3 }
        }
    },
    # 6. Order by how many cities, descending
    { "$sort": { "cityCount": -1 } },
    # 7. Limit to the top 10 users
    { "$limit": 10 }
]

results = list(db.review.aggregate(pipeline))
print("Users Who Reviewed in 3+ Cities (MongoDB):")
pprint.pprint(results)
for row in results:
    print(f"Number of results: {len(results)}")
    print(row)

In [42]:
# Enable profiling for all operations
db.command("profile", 2)

# Run aggregation query
pipeline = [
    # 1. Join with the business collection to get each review’s city
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    # 2. Unwind the joined array
    { "$unwind": "$business_info" },

    # 3. Group by user, collecting the set of cities they’ve reviewed in
    {
        "$group": {
            "_id": "$user_id",
            "uniqueCities": { "$addToSet": "$business_info.city" }
        }
    },
    # 4. Compute the number of distinct cities per user
    {
        "$addFields": {
            "cityCount": { "$size": "$uniqueCities" }
        }
    },
    # 5. Keep only those with three or more distinct cities
    {
        "$match": {
            "cityCount": { "$gte": 3 }
        }
    },
    # 6. Order by how many cities, descending
    { "$sort": { "cityCount": -1 } },
    # 7. Limit to the top 10 users
    { "$limit": 10 }
]

results = list(db.review.aggregate(pipeline))

# Optional delay to ensure profiling is logged
time.sleep(1)

# Fetch latest profiling info
profile = db["system.profile"].find(
    {"ns": "yelp.review", "op": "command", "command.aggregate": "review"}
).sort("ts", -1).limit(1)

# Show profiling result
for doc in profile:
    pprint.pprint(doc)

# Turn off profiling to avoid overhead
db.command("profile", 0)

KeyboardInterrupt: 

### 4. Businesses reviewed in 2018 but NOT reviewed in 2019–2020

#### SQL

In [13]:
cur.execute("""
SELECT DISTINCT b.business_id, b.name
FROM business b
JOIN review r1 ON b.business_id = r1.business_id
WHERE EXTRACT(YEAR FROM r1.date) = 2018
AND b.business_id NOT IN (
    SELECT DISTINCT r2.business_id
    FROM review r2
    WHERE EXTRACT(YEAR FROM r2.date) IN (2019, 2020)
)
LIMIT 10;
""")

results = cur.fetchall()
print("Businesses Active in 2018 But Inactive 2019–2020:")
for row in results:
    print(row)

Businesses Active in 2018 But Inactive 2019–2020:
('_2FrIA5unwbbweldUnpROQ', "Sip N' Cycle")
('_jC2Qwq9yH4Lv5lyfyqBXA', 'Avenue of the Arts Dental')
('_jcvEAM75TohIesGW6iLWw', 'Viking Pastries')
('_L48f6NzAWnJKOBoxs2KOw', 'Bronco Elite')
('_Ld026NfDcxSh8NEQPkxpw', 'St  Louis Bread')
('_mwfGrkuU1Fwemt5m-lrEA', "Soprano's Meat Market")
('_O-FP2AkGgzvndJ6UqzkxA', 'Kingston Martinez & Hogan')
('_OB65sIprNs4eUm1SHjhxw', 'Lasaters Coffee and Tea - Nashville')
('_OxtWf7PXv3O_IbveYk9oQ', 'La Garrafa')
('_tdjQfnrT1k4dNjSbSXotg', 'Nu-Temp Heating & Cooling')


In [14]:
cur.execute("""
EXPLAIN ANALYZE
SELECT DISTINCT b.business_id, b.name
FROM business b
JOIN review r1 ON b.business_id = r1.business_id
WHERE EXTRACT(YEAR FROM r1.date) = 2018
AND b.business_id NOT IN (
    SELECT DISTINCT r2.business_id
    FROM review r2
    WHERE EXTRACT(YEAR FROM r2.date) IN (2019, 2020)
);
""")

plan = cur.fetchall()
print("Execution Plan for usinesses Active in 2018 But Inactive 2019–2020:")
for line in plan:
    print(line[0])

Execution Plan for usinesses Active in 2018 But Inactive 2019–2020:
Unique  (cost=125252.67..125265.52 rows=1713 width=42) (actual time=271.362..272.205 rows=1047 loops=1)
  ->  Sort  (cost=125252.67..125256.96 rows=1713 width=42) (actual time=271.361..271.995 rows=3043 loops=1)
        Sort Key: b.business_id, b.name
        Sort Method: quicksort  Memory: 281kB
        ->  Hash Join  (cost=64334.08..125160.67 rows=1713 width=42) (actual time=199.306..270.560 rows=3043 loops=1)
              Hash Cond: (r1.business_id = b.business_id)
              ->  Gather  (cost=1000.00..61817.59 rows=3426 width=32) (actual time=0.102..63.830 rows=89367 loops=1)
                    Workers Planned: 2
                    Workers Launched: 2
                    ->  Parallel Seq Scan on review r1  (cost=0.00..60474.99 rows=1428 width=32) (actual time=0.336..82.554 rows=29789 loops=3)
                          Filter: (EXTRACT(year FROM date) = '2018'::numeric)
                          Rows Removed b

#### MQL

In [16]:
pipeline = [
    # 1. Join with the business collection to get business details
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    # 2. Unwind the joined array
    { "$unwind": "$business_info" },

    # 3. Project the business ID, name, and extract review year
    {
        "$project": {
            "business_id": 1,
            "business_name": "$business_info.name",
            "reviewYear": { "$year": { "$toDate": "$date" } }
        }
    },
    # 4. Group by business, collecting all years the business was reviewed
    {
        "$group": {
            "_id": {
                "business_id": "$business_id",
                "business_name": "$business_name"
            },
            "yearsReviewed": { "$addToSet": "$reviewYear" }
        }
    },
    # 5. Keep only businesses reviewed in 2018 but NOT in 2019 or 2020
    {
        "$match": {
            "yearsReviewed": {
                "$all": [2018],
                "$nin": [2019, 2020]
            }
        }
    },
    # 6. Sort by business_id ascending
    { "$sort": { "_id.business_id": 1 } },

    # 7. Limit to the top 10 businesses
    { "$limit": 10 }
]

results = list(db.review.aggregate(pipeline))
print("Businesses reviewed in 2018 but NOT reviewed in 2019–2020 (MongoDB):")
pprint.pprint(results)
print(f"Number of results: {len(results)}")
for row in results:
    print(row)

KeyboardInterrupt: 

In [17]:
# Enable profiling for all operations
db.command("profile", 2)

# Run aggregation query
pipeline = [
    # 1. Join with the business collection to get business details
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    # 2. Unwind the joined array
    { "$unwind": "$business_info" },

    # 3. Project the business ID, name, and extract review year
    {
        "$project": {
            "business_id": 1,
            "business_name": "$business_info.name",
            "reviewYear": { "$year": { "$toDate": "$date" } }
        }
    },
    # 4. Group by business, collecting all years the business was reviewed
    {
        "$group": {
            "_id": {
                "business_id": "$business_id",
                "business_name": "$business_name"
            },
            "yearsReviewed": { "$addToSet": "$reviewYear" }
        }
    },
    # 5. Keep only businesses reviewed in 2018 but NOT in 2019 or 2020
    {
        "$match": {
            "yearsReviewed": {
                "$all": [2018],
                "$nin": [2019, 2020]
            }
        }
    },
    # 6. Sort by business_id ascending
    { "$sort": { "_id.business_id": 1 } },

    # 7. Limit to the top 10 businesses
    { "$limit": 10 }
]

results = list(db.review.aggregate(pipeline))

# Optional delay to ensure profiling is logged
time.sleep(1)

# Fetch latest profiling info
profile = db["system.profile"].find(
    {"ns": "yelp.review", "op": "command", "command.aggregate": "review"}
).sort("ts", -1).limit(1)

# Show profiling result
for doc in profile:
    pprint.pprint(doc)

# Turn off profiling to avoid overhead
db.command("profile", 0)

KeyboardInterrupt: 

### 5. Most reviewed business in each city

In [27]:
cur.execute("""

SELECT DISTINCT ON (b.city) b.city, b.name, b.review_count
FROM business b
ORDER BY b.city, b.review_count DESC
LIMIT 10;
""")

results = cur.fetchall()
print("Most Reviewed Business Per City:")
for row in results:
    print(row)

Most Reviewed Business Per City:
('AB Edmonton', 'Ricci Hair Company', 6)
('abington', 'D D Daughters Lace Wig Beautique', 5)
('Abington', 'Kitchen Bar', 377)
('Abington Township', 'Iron Hill Brewery & Restaurant', 223)
('Affton', 'Sushi Hana', 141)
('Afton', '9 Mile Garden', 50)
('Alberta Park Industrial', 'Crosstown Autobody', 5)
('Aldan', 'Penn Pines Diner', 48)
('Algiers', "Domino's Pizza", 28)
('Aliso Viejo', 'Carolyn Rife Photography', 9)


In [30]:
cur.execute("""
EXPLAIN ANALYZE
SELECT DISTINCT ON (b.city) b.city, b.name, b.review_count
FROM business b
ORDER BY b.city, b.review_count DESC;
""")

plan = cur.fetchall()
print("Execution Plan for Most Reviewed Business Per City:")
for line in plan:
    print(line[0])

Execution Plan for Most Reviewed Business Per City:
Unique  (cost=31928.16..32679.89 rows=780 width=34) (actual time=226.329..251.199 rows=1416 loops=1)
  ->  Sort  (cost=31928.16..32304.02 rows=150346 width=34) (actual time=226.328..245.726 rows=150346 loops=1)
        Sort Key: city, review_count DESC
        Sort Method: external merge  Disk: 6744kB
        ->  Seq Scan on business b  (cost=0.00..14887.46 rows=150346 width=34) (actual time=0.060..65.896 rows=150346 loops=1)
Planning Time: 0.163 ms
Execution Time: 257.989 ms


#### MQL

In [None]:
pipeline = [
    # 1. Join with the business collection to get city info
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    # 2. Unwind the joined array
    { "$unwind": "$business_info" },
    # 3. Group by city and business_id to count number of reviews per business per city
    {
        "$group": {
            "_id": {
                "city": "$business_info.city",
                "business_id": "$business_id"
            },
            "review_count": { "$sum": 1 }
        }
    },
    # 4. Sort by city and review count descending
    {
        "$sort": {
            "_id.city": 1,
            "review_count": -1
        }
    },
    # 5. For each city, pick the business with the highest review count
    {
        "$group": {
            "_id": "$_id.city",
            "business_id": { "$first": "$_id.business_id" },
            "review_count": { "$first": "$review_count" }
        }
    },
    # 6. Optional: Rename fields for clarity
    {
        "$project": {
            "city": "$_id",
            "business_id": 1,
            "review_count": 1,
            "_id": 0
        }
    }
]

results = list(db.review.aggregate(pipeline))
print("Most Reviewed Business in Each City (MongoDB):")
print(f"Number of results: {len(results)}")
for row in results:
    print(row)

### (6. Top 5 Restaurants with >1000 reviews and >=4 stars in each city/state)

In [12]:
cur.execute("""
SELECT b.city, b.state, b.name, b.stars, b.review_count
FROM business b
WHERE b.categories ILIKE '%Restaurants%'
AND b.review_count > 1000
AND b.stars >= 4
ORDER BY b.city, b.state, b.stars DESC, b.review_count DESC
LIMIT 5;
""")

results = cur.fetchall()
print("Top 5 Restaurants with >1000 Reviews and 4+ Stars Per City/State:")
for row in results:
    print(row)


 Top 5 Restaurants with >1000 Reviews and 4+ Stars Per City/State:
('Boise', 'ID', 'Fork', 4.0, 1810)
('Boise', 'ID', 'Barbacoa Grill', 4.0, 1099)
('Clearwater', 'FL', 'Clear Sky Cafe', 4.5, 1824)
('Clearwater Beach', 'FL', "Frenchy's Rockaway Grill", 4.0, 2301)
('Clearwater Beach', 'FL', 'Columbia Restaurant', 4.0, 1017)


In [31]:
cur.execute("""
EXPLAIN ANALYZE
SELECT DISTINCT ON (b.city) b.city, b.name, b.review_count
FROM business b
ORDER BY b.city, b.review_count DESC;
""")

plan = cur.fetchall()
print("Execution Plan for Top 5 Restaurants with >1000 Reviews and 4+ Stars Per City/State")
for line in plan:
    print(line[0])

Execution Plan for Top 5 Restaurants with >1000 Reviews and 4+ Stars Per City/State
Unique  (cost=31928.16..32679.89 rows=780 width=34) (actual time=174.860..198.058 rows=1416 loops=1)
  ->  Sort  (cost=31928.16..32304.02 rows=150346 width=34) (actual time=174.859..192.989 rows=150346 loops=1)
        Sort Key: city, review_count DESC
        Sort Method: external merge  Disk: 6744kB
        ->  Seq Scan on business b  (cost=0.00..14887.46 rows=150346 width=34) (actual time=0.015..49.971 rows=150346 loops=1)
Planning Time: 0.106 ms
Execution Time: 201.375 ms


## MongoDB version our Benchmark

In [39]:
# pipeline = [
#     {
#         "$lookup": {
#             "from": "business",
#             "localField": "business_id",
#             "foreignField": "business_id",
#             "as": "business_info"
#         }
#     },
#     { "$unwind": "$business_info" },
#     {
#         "$group": {
#             "_id": "$user_id",
#             "cities_reviewed": { "$addToSet": "$business_info.city" }
#         }
#     },
#     {
#         "$addFields": {
#             "num_cities": { "$size": "$cities_reviewed" }
#         }
#     },
#     {
#         "$match": {
#             "num_cities": { "$gte": 3 }
#         }
#     },
#     { "$sort": { "num_cities": -1 } },
#     { "$limit": 10 }
# ]


pipeline = [
    {
        "$lookup": {
            "from": "business",
            "localField": "business_id",
            "foreignField": "business_id",
            "as": "business_info"
        }
    },
    { "$unwind": "$business_info" },
    {
        "$group": {
            "_id": "$user_id",
            "unique_cities": { "$addToSet": "$business_info.city" }
        }
    },
    {
        "$match": {
            "$expr": { "$gte": [{ "$size": "$unique_cities" }, 3] }
        }
    },
    { "$sort": { "city_count": -1 } },
    { "$limit": 10 }
]



results = list(db.review.aggregate(pipeline))
print("Users Who Reviewed in 3+ Cities (MongoDB):")
pprint.pprint(result)
for row in results:
    print(f"Number of results: {len(results)}")
    print(row)

ServerSelectionTimeoutError: localhost:27017: [Errno 61] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 68192e8ae7f19cf8ad39cdbd, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 61] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>

In [36]:
# cur.close()
# conn.close()