In [1]:
pip install tinydb

Collecting tinydb
  Downloading tinydb-4.8.2-py3-none-any.whl.metadata (6.7 kB)
Downloading tinydb-4.8.2-py3-none-any.whl (24 kB)
Installing collected packages: tinydb
Successfully installed tinydb-4.8.2
Note: you may need to restart the kernel to use updated packages.


In [2]:
import time
import json
import pandas as pd
import os
import glob
import queue
import threading
import ftfy
from tinydb import TinyDB, Query

# --- 1. Config Paths ---
# Adjust this path to match your local directory structure
BASE_PATH = r"C:\Users\jinna\Downloads\inside-airbnb-bangkok-data"

# Path for the NoSQL database file (simulating a MongoDB cluster)
NOSQL_DB_PATH = os.path.join(BASE_PATH, "database", "reviews_nosql.json")

# Ensure the database directory exists
os.makedirs(os.path.dirname(NOSQL_DB_PATH), exist_ok=True)

# --- 2. Initialize NoSQL Database (TinyDB) ---
nosql_db = TinyDB(NOSQL_DB_PATH)

# --- 3. Find the Latest Reviews File ---
# Search for reviews*.csv.gz in all subfolders
search_pattern = os.path.join(BASE_PATH, "raw_data", "*", "reviews*.csv.gz")
found_files = glob.glob(search_pattern)

if not found_files:
    raise Exception("No reviews files found! Please check your path.")

# Sort to get the latest file based on naming convention or modification time
RAW_FILE = sorted(found_files)[-1]

print(f"Streaming Source: {RAW_FILE}")
print(f"NoSQL Database Path: {NOSQL_DB_PATH}")

# --- 4. Helper Function: Fix Text Encoding ---
def fix_text(text):
    if not isinstance(text, str):
        return ""
    
    # Use ftfy to fix mojibake (broken encoding)
    fixed = ftfy.fix_text(text)
    
    # Remove HTML tags like <br/>
    fixed = fixed.replace("<br/>", "\n").replace("<br>", "\n")
    
    return fixed.strip()

# --- 5. Initialize Simulation Queue (Fake Kafka) ---
kafka_topic_simulation = queue.Queue()

Streaming Source: C:\Users\jinna\Downloads\inside-airbnb-bangkok-data\raw_data\26_September_2025\reviews26_September_2025.csv.gz
NoSQL Database Path: C:\Users\jinna\Downloads\inside-airbnb-bangkok-data\database\reviews_nosql.json


In [3]:
# PRODUCER
def run_producer(file_path, max_records=50, speed=0.1):
    print(f"[Producer] Streaming from {os.path.basename(file_path)}...")
    try:
        # Read CSV in chunks of 1 row to simulate a real-time stream
        for chunk in pd.read_csv(file_path, chunksize=1):
            record = chunk.iloc[0].to_dict()
            
            # Send record to the Queue (Simulating sending to Kafka Topic)
            kafka_topic_simulation.put(record)
            
            # Log every 5th record to avoid cluttering the output
            if int(record.get('id', 0)) % 5 == 0: 
                print(f"   -> [Producer] Sent review_id: {record.get('id')}")
            
            max_records -= 1
            if max_records <= 0: break
            
            # Simulate network latency
            time.sleep(speed)
            
    except Exception as e:
        print(f"[Producer] Error: {e}")
        
    # Send poison pill to stop the consumer
    kafka_topic_simulation.put(None)
    print("[Producer] Finished streaming.")

# CONSUMER
def run_consumer():
    print("[Consumer] Ready and waiting for messages...")
    processed_count = 0
    
    # Get reference to the 'reviews' collection/table in TinyDB
    reviews_table = nosql_db.table('reviews')
    
    while True:
        # 1. Read message from Queue (Kafka)
        record = kafka_topic_simulation.get()
        
        # Check for stop signal
        if record is None:
            print("[Consumer] Received stop signal. Exiting.")
            break
            
        try:
            # --- TRANSFORMATION LOGIC ---
            
            # 1. Date Cleaning: Parse mixed formats to YYYY-MM-DD
            try:
                clean_date = pd.to_datetime(record['date'], format='mixed', dayfirst=True).strftime('%Y-%m-%d')
            except:
                clean_date = None

            # 2. Construct Clean JSON Document
            clean_record = {
                "review_id": int(record.get('id')),       # Convert to int for easier querying
                "listing_id": int(record.get('listing_id')),
                "reviewer_name": fix_text(record.get('reviewer_name')),
                "date": clean_date,
                "comments": fix_text(record.get('comments')), # Fix text encoding here
                "ingested_at": pd.Timestamp.now().isoformat()
            }
            
            # 3. Load to NoSQL (Insert into TinyDB)
            # This mimics MongoDB's insert_one()
            reviews_table.insert(clean_record)
            
            processed_count += 1
            
        except Exception as e:
            print(f"[Consumer] Error processing record: {e}")
            
    print(f"[Consumer] Finished. Total Inserted: {processed_count}")

In [4]:
# Clean the old table before starting a new run (Optional)
if 'reviews' in nosql_db.tables():
    nosql_db.drop_table('reviews')
    print("Old 'reviews' table dropped.")

# Initialize Threads
producer_thread = threading.Thread(target=run_producer, args=(RAW_FILE, 50, 0.05)) 
consumer_thread = threading.Thread(target=run_consumer)

print("STARTING STREAMING PIPELINE TO NoSQL...\n")

# Start threads
consumer_thread.start()
time.sleep(1) # Allow consumer to initialize
producer_thread.start()

# Wait for threads to complete
producer_thread.join()
consumer_thread.join()

print(f"\nDONE! Data has been ingested into {NOSQL_DB_PATH}")

STARTING STREAMING PIPELINE TO NoSQL...

[Consumer] Ready and waiting for messages...
[Producer] Streaming from reviews26_September_2025.csv.gz...
   -> [Producer] Sent review_id: 3434915
   -> [Producer] Sent review_id: 3603975
   -> [Producer] Sent review_id: 3656405
   -> [Producer] Sent review_id: 6790260
   -> [Producer] Sent review_id: 17526780
   -> [Producer] Sent review_id: 22012760
   -> [Producer] Sent review_id: 25522765
   -> [Producer] Sent review_id: 29351885
   -> [Producer] Sent review_id: 59402025
[Producer] Finished streaming.
[Consumer] Received stop signal. Exiting.
[Consumer] Finished. Total Inserted: 50

DONE! Data has been ingested into C:\Users\jinna\Downloads\inside-airbnb-bangkok-data\database\reviews_nosql.json


In [5]:
# --- Query Data from NoSQL to Verify ---
print("Querying data from TinyDB (NoSQL)...")

reviews_table = nosql_db.table('reviews')
ReviewQuery = Query()

# 1. Count total documents
count = len(reviews_table)
print(f"Total Documents in Database: {count}")

# 2. Example Query: Find reviews for a specific listing
# We grab the listing_id from the first record to ensure we find something
if count > 0:
    sample_doc = reviews_table.all()[0]
    target_listing = sample_doc['listing_id']

    print(f"\n--- Searching for reviews of Listing ID: {target_listing} ---")
    results = reviews_table.search(ReviewQuery.listing_id == target_listing)

    for r in results:
        print(f"Date: {r['date']}")
        print(f"User: {r['reviewer_name']}")
        # Print only the first 100 characters of the comment
        print(f"Comment: {r['comments'][:100]}...") 
        print("-" * 30)
else:
    print("Database is empty.")

Querying data from TinyDB (NoSQL)...
Total Documents in Database: 50

--- Searching for reviews of Listing ID: 27934 ---
Date: 2012-04-07
User: Michael
Comment: We stayed in the apartment for a week and we enjoyed it very much. Nuttee is a very nice host, and s...
------------------------------
Date: 2012-05-07
User: Scott
Comment: My girlfriend and I recently stayed in Nuttee's condo for a month.  It is a beautiful condo, with a ...
------------------------------
Date: 2012-06-20
User: Marc
Comment: I stayed for one month at the condo and was realy pleased. 

The condo is at the 19th floor, quiet, ...
------------------------------
Date: 2012-07-08
User: Leyla
Comment: Nuttee was a great host! I really enjoyed her apartment and she was absolutely lovely! She even had ...
------------------------------
Date: 2012-08-13
User: Rachel
Comment: Nuttee was an amazing host. She and her daughter waited for us at the lobby and gave us a quick rund...
------------------------------
Date: 2012-0