# Spark and MongoDB Installation

In [1]:
mongo_jarname = 'mongo-spark-connector_2.13-10.1.1-all.jar'
mongo_package_name = 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1'

# download the spark mongo connector
!curl -o mongo-spark-connector_2.13-10.1.1-all.jar "https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.13/10.1.1/mongo-spark-connector_2.13-10.1.1-all.jar"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 2323k  100 2323k    0     0  20.5M      0 --:--:-- --:--:-- --:--:-- 20.6M


In [2]:
import os

current_dir = os.getcwd()
print(current_dir)
mongo_jar_location = current_dir + "/" + mongo_jarname
mongo_jar_location

/home/jovyan/Data Engineering_Indi


'/home/jovyan/Data Engineering_Indi/mongo-spark-connector_2.13-10.1.1-all.jar'

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Unified_Spark_Session") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/project_db.reddit_posts") \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/spark-3.5.4-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f74c5167-7fd7-4b02-b9f4-a508484739f2;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 475ms :: artifacts dl 45ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

# Reddit Data

## 1.  Load Data to MongoDB and Creating Spark Dataframe

In [4]:
import pandas as pd
from pymongo import MongoClient

client = MongoClient("localhost", 27017)
db = client["project_db"]

# Load Reddit data
reddit_api_data = pd.read_csv("reddit_api_data.csv")
db["reddit_api_data"].insert_many(reddit_api_data.to_dict(orient="records"))

client.close()

In [5]:
from pymongo import MongoClient

# Connect to local MongoDB
client = MongoClient("localhost", 27017)
db = client["project_db"]

# Load raddit data
data_raddit = list(db["reddit_api_data"].find())

In [6]:
#Clean JSON and Load into Spark¶
#define a cleaning function (Reusable for Both)

def clean_records(data_json):
    for record in data_json:
        record.pop("_id", None)

        # Clean num_comments
        if isinstance(record.get("num_comments"), dict):
            record["num_comments"] = record["num_comments"].get("$numberDouble", "0")
        try:
            record["num_comments"] = int(float(record["num_comments"]))
        except:
            record["num_comments"] = 0

        # Clean score
        try:
            record["score"] = int(record.get("score", 0))
        except:
            record["score"] = 0

        # Clean sentiment fields
        for field in ["sentiment_neg", "sentiment_pos", "sentiment_compound"]:
            val = record.get(field, 0.0)
            if isinstance(val, dict):
                val = val.get("$numberDouble", "0.0")
            try:
                record[field] = float(val)
            except:
                record[field] = 0.0
    return data_json


In [7]:
# Apply cleaning function
data_reddit_cleaned = clean_records(data_raddit)

In [8]:
# Define schema and load into Spark
from pyspark.sql.types import *

schema = StructType([
    StructField("post_or_comment", StringType(), True),
    StructField("product_label", StringType(), True),
    StructField("content", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("num_comments", IntegerType(), True),
    StructField("created_date", StringType(), True),
    StructField("sentiment_neg", DoubleType(), True),
    StructField("sentiment_pos", DoubleType(), True),
    StructField("sentiment_compound", DoubleType(), True)
])

In [9]:
data_reddit_df = spark.createDataFrame(data_reddit_cleaned, schema=schema)

In [10]:
data_reddit_df.show(5)

25/04/01 19:26:17 WARN TaskSetManager: Stage 0 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:26:22 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+---------------+----------------+--------------------+-----+------------+------------+-------------+-------------+------------------+
|post_or_comment|   product_label|             content|score|num_comments|created_date|sentiment_neg|sentiment_pos|sentiment_compound|
+---------------+----------------+--------------------+-----+------------+------------+-------------+-------------+------------------+
|           Post|Tesla Cybertruck|Does anyone like ...|   94|         646|  2024-10-14|          0.0|        0.385|            0.3612|
|        Comment|Tesla Cybertruck|I appreciate the ...|  305|           0|  2024-10-14|        0.099|        0.131|            0.2516|
|        Comment|Tesla Cybertruck|My 3 year old doe...|  122|           0|  2024-10-14|          0.0|          0.0|               0.0|
|        Comment|Tesla Cybertruck|I had a 2 door st...|    9|           0|  2024-10-14|          0.0|        0.026|            0.2023|
|        Comment|Tesla Cybertruck|I actually really...|

## 2. Data Cleaning

### Data Cleaning Process
1. Drop unnecessary columns to merge with Youtube data
2. Add source to the data
3. Drop nulls
4. Trim whitespace
5. Convert string to data type
6. Drop duplicates

In [11]:
from pyspark.sql.functions import lit

# Step 1: Drop unnecessary columns
reddit_clean_df = data_reddit_df.drop(
    "post_or_comment", 
    "score", 
    "num_comments", 
    "sentiment_neg", 
    "sentiment_pos", 
    "sentiment_compound"
)

# Step 2: Add 'source' column with value 'reddit'
reddit_clean_df = reddit_clean_df.withColumn("source", lit("reddit"))

# Preview cleaned DataFrame
reddit_clean_df.show(5, truncate=False)


25/04/01 19:26:25 WARN TaskSetManager: Stage 1 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 1:>                                                          (0 + 1) / 1]

+----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------+
|product_label   |content                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

25/04/01 19:26:29 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 1): Attempting to kill Python Worker
                                                                                

In [12]:
from pyspark.sql.functions import col, trim, length, to_date

# Step 3: Drop rows with nulls in important fields
reddit_clean_df = reddit_clean_df.dropna(subset=["content", "product_label"])

# Step 4: Trim whitespace from strings
reddit_clean_df = reddit_clean_df.withColumn("content", trim(col("content")))
reddit_clean_df = reddit_clean_df.withColumn("product_label", trim(col("product_label")))

# Step 5: Convert string date to DateType
reddit_clean_df = reddit_clean_df.withColumn("created_date", to_date(col("created_date"), "yyyy-MM-dd"))

# Step 6: Drop duplicates (based on content and date)
reddit_clean_df = reddit_clean_df.dropDuplicates(subset=["content", "created_date"])

# View cleaned data
reddit_clean_df.show(5, truncate=False)

25/04/01 19:26:30 WARN TaskSetManager: Stage 2 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 2:>                                                          (0 + 1) / 1]

+------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+------+
|product_label     |content                                                                                                                                                                                                                                                                            

                                                                                

In [13]:
reddit_clean_df.select("content").distinct().show()

25/04/01 19:26:35 WARN TaskSetManager: Stage 5 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+
|             content|
+--------------------+
|If only they give...|
|Tesla sales are p...|
|   Tesla Chumpmobile|
|Q1 deliveries are...|
|Wouldn't get one ...|
|Just trying to of...|
|Oh, I see it now....|
|It's weird that w...|
|They have terribl...|
|It's not a good t...|
|Because it was a ...|
|They've been musked!|
|I put a deposit d...|
|Richard Branson w...|
|The lightning doe...|
|Seen a ton of the...|
|What a fugly car....|
|The next issue is...|
|The broken glass ...|
|I’ve seen a singl...|
+--------------------+
only showing top 20 rows



                                                                                

# Youtube Data

## 1.  Load Data to MongoDB and Creating Spark Dataframe

In [14]:
# Load from MongoDB 
import pandas as pd
from pymongo import MongoClient

# Load the correct final CSV
youtube_api_df = pd.read_csv("youtube_api_data.csv")

# Connect to MongoDB
client = MongoClient("localhost", 27017)
db = client["project_db"]

# Insert data
db["youtube_data"].insert_many(youtube_api_df.to_dict(orient="records"))

# Confirm insertion count
print("New YouTube Comments inserted:", db["youtube_comments"].count_documents({}))

client.close()

New YouTube Comments inserted: 4793


In [15]:
# Connect to MongoDB
client = MongoClient("localhost", 27017)
db = client["project_db"]

print(db["youtube_data"].find_one())

client.close()

{'_id': ObjectId('67ec2a51408604f2c9d96716'), 'Product': 'Tesla Cybertruck', 'Review Text': 'Screw the truck, I never been through neighborhoods like that 😂', 'Review Date': '2025-03-29T20:03:24Z', 'source': 'YouTube', 'features': 'none', 'like_count': 0}


In [16]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pymongo import MongoClient

# Connect to MongoDB and load fresh valid data
client = MongoClient("localhost", 27017)
db = client["project_db"]
youtube_comments_data = list(db["youtube_data"].find())

# Keep only valid rows (correct structure)
valid_rows = [row for row in youtube_comments_data if 'Product' in row]

# Remove MongoDB `_id` field
for row in valid_rows:
    row.pop('_id', None)

# Define schema for Spark
youtube_schema = StructType([
    StructField("Product", StringType(), True),
    StructField("Review Text", StringType(), True),
    StructField("Review Date", StringType(), True),
    StructField("source", StringType(), True),
    StructField("features", StringType(), True),
    StructField("like_count", IntegerType(), True)
])

# Create Spark DataFrame
youtube_data_df = spark.createDataFrame(valid_rows, schema=youtube_schema)

# Show final clean result
youtube_data_df.show(10, truncate=False)



25/04/01 19:26:37 WARN TaskSetManager: Stage 8 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.


+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------+--------+----------+
|Product         |Review Text                                                                                                                                                                     |Review Date         |source |features|like_count|
+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------+--------+----------+
|Tesla Cybertruck|Screw the truck, I never been through neighborhoods like that 😂                                                                                                                |2025-03-29T20:03:24Z|YouTube|none    |0         |
|Tesla Cybertruck|Ico

In [17]:
df_rows = youtube_data_df.count()
df_cols = len(youtube_data_df.columns)
print(f"youtube_comments_df shape: ({df_rows}, {df_cols})")

25/04/01 19:26:38 WARN TaskSetManager: Stage 9 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.


youtube_comments_df shape: (14379, 6)


In [18]:
youtube_data_df.select("Product").distinct().show()

25/04/01 19:26:39 WARN TaskSetManager: Stage 12 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+
|             Product|
+--------------------+
|  Chevy Silverado EV|
|Ford F-150 Lightning|
|       GMC Hummer EV|
|          Rivian R1T|
|    Tesla Cybertruck|
+--------------------+



In [19]:
from pyspark.sql.functions import col, lit, when, concat_ws

youtube_clean_df = youtube_data_df \
    .withColumnRenamed("Product", "product_label") \
    .withColumn("source", lit("YouTube")) \
    .withColumnRenamed("Review Text", "content") \
    .withColumnRenamed("Review Date", "created_date") \
    .select("product_label", "content", "created_date", "source")



youtube_clean_df.show(5, truncate=100)

25/04/01 19:26:40 WARN TaskSetManager: Stage 15 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.


+----------------+----------------------------------------------------------------------------------------------------+--------------------+-------+
|   product_label|                                                                                             content|        created_date| source|
+----------------+----------------------------------------------------------------------------------------------------+--------------------+-------+
|Tesla Cybertruck|                                    Screw the truck, I never been through neighborhoods like that 😂|2025-03-29T20:03:24Z|YouTube|
|Tesla Cybertruck|Iconic...in the sense that it&#39;s one of the worst cars (if you can call it a car) that I&#39;v...|2025-03-28T17:35:58Z|YouTube|
|Tesla Cybertruck|                                                                      this is literally a tv episode|2025-03-27T19:58:54Z|YouTube|
|Tesla Cybertruck|                                                             that intro monologue actuall

## 2. Data Cleaning

In [20]:
from pyspark.sql.functions import col, to_date, length

# Step 1: Keep only rows where 'created_date' is long enough
youtube_clean_df = youtube_clean_df.filter(length(col("created_date")) >= 10)

# Step 2: Truncate to 'yyyy-MM-dd'
youtube_clean_df = youtube_clean_df.withColumn("created_date", col("created_date").substr(1, 10))

# Step 3: Convert to DateType safely
youtube_clean_df = youtube_clean_df.withColumn("created_date", to_date(col("created_date"), "yyyy-MM-dd"))

# Step 4: Show the result
youtube_clean_df.show(5, truncate=False)


25/04/01 19:26:40 WARN TaskSetManager: Stage 16 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.


+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------+
|product_label   |content                                                                                                                                                      |created_date|source |
+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------+
|Tesla Cybertruck|Screw the truck, I never been through neighborhoods like that 😂                                                                                             |2025-03-29  |YouTube|
|Tesla Cybertruck|Iconic...in the sense that it&#39;s one of the worst cars (if you can call it a car) that I&#39;ve ever seen. Thank god you can&#39;t buy them in my country.|2025-03-28  |YouTube|
|Tesla Cybe

In [22]:
from pyspark.sql.functions import col, to_date, trim, length, regexp_replace

# Step 1: Remove rows with null or short essential fields
youtube_clean_df = youtube_clean_df.filter(
    (col("content").isNotNull()) &
    (col("product_label").isNotNull()) &
    (col("created_date").isNotNull()) &
    (length(col("content")) > 5)
)


# Step 2: Trim whitespace
youtube_clean_df = youtube_clean_df.withColumn("product_label", trim(col("product_label")))
youtube_clean_df = youtube_clean_df.withColumn("content", trim(col("content")))

# Step 3: Decode common HTML entities (e.g., &#39; → ')
youtube_clean_df = youtube_clean_df.withColumn("content", regexp_replace("content", "&#39;", "'"))
youtube_clean_df = youtube_clean_df.withColumn("content", regexp_replace("content", "&quot;", '"'))
youtube_clean_df = youtube_clean_df.withColumn("content", regexp_replace("content", "&amp;", "&"))

# Step 4: Drop duplicate comments (optional)
youtube_clean_df = youtube_clean_df.dropDuplicates(["product_label", "content", "created_date"])

# ✅ Final Preview
youtube_clean_df.show(5, truncate=100)


25/04/01 19:28:08 WARN TaskSetManager: Stage 20 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
[Stage 20:>                                                         (0 + 1) / 1]

+------------------+----------------------------------------------------------------------------------------------------+------------+-------+
|     product_label|                                                                                             content|created_date| source|
+------------------+----------------------------------------------------------------------------------------------------+------------+-------+
|Chevy Silverado EV|                      "I have a Cyber Truck"<br>and we're done listening to this guy talk about cars|  2024-11-27|YouTube|
|Chevy Silverado EV|"Like an avalanche or whatever" Chevrolet invented the midgate, subaru copied and now Chevrolet d...|  2023-09-04|YouTube|
|Chevy Silverado EV|   "Rip gas trucks." That fancy toy costs nearly double what the equivalent gas truck does. Wake up.|  2023-08-24|YouTube|
|Chevy Silverado EV|                                                               $ Almost 100 grand, nine tons? Yikes!|  2025-02-05|YouTube|

                                                                                

# Combining Dataset

In [24]:
# Combine all sources: BestBuy, YouTube, Reddit
from functools import reduce

combined_df = youtube_clean_df.unionByName(reddit_clean_df)

# Optional: Preview the final combined dataset
combined_df.show(20, truncate=100)


25/04/01 19:29:49 WARN TaskSetManager: Stage 26 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:29:50 WARN TaskSetManager: Stage 27 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 29:>                                                         (0 + 1) / 1]

+------------------+----------------------------------------------------------------------------------------------------+------------+-------+
|     product_label|                                                                                             content|created_date| source|
+------------------+----------------------------------------------------------------------------------------------------+------------+-------+
|Chevy Silverado EV|                      "I have a Cyber Truck"<br>and we're done listening to this guy talk about cars|  2024-11-27|YouTube|
|Chevy Silverado EV|"Like an avalanche or whatever" Chevrolet invented the midgate, subaru copied and now Chevrolet d...|  2023-09-04|YouTube|
|Chevy Silverado EV|   "Rip gas trucks." That fancy toy costs nearly double what the equivalent gas truck does. Wake up.|  2023-08-24|YouTube|
|Chevy Silverado EV|                                                               $ Almost 100 grand, nine tons? Yikes!|  2025-02-05|YouTube|

                                                                                

In [25]:
print(f"Shape: ({combined_df.count()} rows, {len(combined_df.columns)} columns)")

25/04/01 19:30:16 WARN TaskSetManager: Stage 34 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:30:17 WARN TaskSetManager: Stage 35 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 35:>                                                         (0 + 1) / 1]

Shape: (15189 rows, 4 columns)


                                                                                

In [27]:
combined_df.select("source").distinct().show(truncate=False)
combined_df.select("product_label").distinct().show()

25/04/01 19:30:50 WARN TaskSetManager: Stage 43 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:30:51 WARN TaskSetManager: Stage 44 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+
|source |
+-------+
|YouTube|
|reddit |
+-------+



25/04/01 19:30:53 WARN TaskSetManager: Stage 56 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:30:54 WARN TaskSetManager: Stage 57 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
[Stage 57:>                                                         (0 + 1) / 1]

+--------------------+
|       product_label|
+--------------------+
|  Chevy Silverado EV|
|Ford F-150 Lightning|
|       GMC Hummer EV|
|          Rivian R1T|
|    Tesla Cybertruck|
+--------------------+



                                                                                

# Loading the Cleaned Dataset to PostGres

In [28]:
import os

# Set environment variables for your database here for convenience
os.environ["POSTGRES_HOST"] = "uclba-de25v2.cluster-cowglvndjvxv.eu-west-2.rds.amazonaws.com"
os.environ["POSTGRES_DB"] = "postgres"
os.environ["POSTGRES_PASSWORD"] = "a7sMkM"
os.environ["POSTGRES_USER"] = "jaewon.lee.24@ucl.ac.uk"
os.environ["POSTGRES_SCHEMA"] = "schema_jaewonlee24uclacuk"

In [29]:
print(os.getenv("POSTGRES_HOST"))  # Should print: your_postgres_host
print(os.getenv("POSTGRES_DB"))    # Should print: your_database_name
# print(os.getenv("POSTGRES_PASSWORD"))    # You Should never print: passwords or sensitive data

uclba-de25v2.cluster-cowglvndjvxv.eu-west-2.rds.amazonaws.com
postgres


In [30]:
# Environment variables for PostgreSQL connection
POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost")
POSTGRES_DB = os.getenv("POSTGRES_DB", "postgres")
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", 5432) # leave at the default port
POSTGRES_SCHEMA = os.getenv("POSTGRES_SCHEMA", "public")

In [31]:
import os
import sqlalchemy
from sqlalchemy import create_engine

# Create PostgreSQL connection string
pg_conn_string = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

# Create the SQLAlchemy engine
pg_engine = create_engine(pg_conn_string, connect_args={"options": f"-c search_path={POSTGRES_SCHEMA}"})

In [32]:
# saving df as csv

# Convert the full final Spark DataFrame to Pandas again
combined_pd = combined_df.toPandas()

# Export the combined DataFrame to a CSV file
combined_pd.to_csv('combined_clean.csv', index=False)

25/04/01 19:38:01 WARN TaskSetManager: Stage 65 contains a task of very large size (2517 KiB). The maximum recommended task size is 1000 KiB.
25/04/01 19:38:02 WARN TaskSetManager: Stage 66 contains a task of very large size (17171 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [34]:
# Write the DataFrame to PostgreSQL

# Convert the Spark DataFrame to a Pandas DataFrame
combined_pd = combined_df.toPandas()


combined_pd.to_sql(
    name="combined_data",
    con=pg_engine,
    if_exists="replace",
    index=False,
    chunksize=1000   # Upload 1000 rows at a time
)


15189

In [35]:
#check if all row uploaded
combined_pd.shape


(15189, 4)

In [36]:
#verified the upload by querying the PostgreSQL database's system catalog:
import pandas as pd
query = """
SELECT 
    column_name,
    data_type,
    character_maximum_length,
    is_nullable,
    ordinal_position
FROM information_schema.columns
WHERE table_name = 'combined_data'
  AND table_schema = 'schema_jaewonlee24uclacuk'
ORDER BY ordinal_position;
"""

pd.read_sql(query, con=pg_engine)


Unnamed: 0,column_name,data_type,character_maximum_length,is_nullable,ordinal_position
0,product_label,text,,YES,1
1,content,text,,YES,2
2,created_date,date,,YES,3
3,source,text,,YES,4


In [37]:
#check if the combined_reviews is in my schema

import pandas as pd

query = """
SELECT table_name 
FROM information_schema.tables 
WHERE table_schema = 'schema_jaewonlee24uclacuk';
"""

table_list_df = pd.read_sql(query, con=pg_engine)
table_list_df




Unnamed: 0,table_name
0,example_table
1,amazon_reviews
2,combined_data


In [39]:
# analysis

query = """
SELECT "product_label", COUNT(*) AS count
FROM combined_data
GROUP BY "product_label"
ORDER BY count DESC;
"""


pd.read_sql(query, con=pg_engine)


Unnamed: 0,product_label,count
0,Tesla Cybertruck,5274
1,Ford F-150 Lightning,2955
2,Rivian R1T,2486
3,GMC Hummer EV,2404
4,Chevy Silverado EV,2070


In [43]:
query = """
SELECT "product_label", "source", COUNT(*) AS review_count
FROM combined_data
GROUP BY "product_label", "source"
ORDER BY review_count DESC;
"""

pd.read_sql(query, con=pg_engine)


Unnamed: 0,product_label,source,review_count
0,Tesla Cybertruck,reddit,4294
1,Ford F-150 Lightning,reddit,1964
2,Rivian R1T,reddit,1595
3,GMC Hummer EV,reddit,1413
4,Chevy Silverado EV,reddit,1199
5,GMC Hummer EV,YouTube,991
6,Ford F-150 Lightning,YouTube,991
7,Tesla Cybertruck,YouTube,980
8,Rivian R1T,YouTube,891
9,Chevy Silverado EV,YouTube,871


# Set up the embedding and FAISS step

In [45]:
import pandas as pd
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer

# Load data
df = pd.read_csv("combined_clean.csv")
df = df.dropna(subset=['content'])

# Embed content
model = SentenceTransformer('all-MiniLM-L6-v2')
texts = df['content'].tolist()
embeddings = model.encode(texts, convert_to_numpy=True)

# Build FAISS index
dim = embeddings.shape[1]
index = faiss.IndexFlatL2(dim)
index.add(embeddings)

# Save index and metadata
faiss.write_index(index, "cybertruck_faiss.index")
df[['product_label', 'content', 'created_date', 'source']].to_csv("cybertruck_metadata.csv", index=False)


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [47]:
import faiss
import pandas as pd
from sentence_transformers import SentenceTransformer

# Load index and metadata
index = faiss.read_index("cybertruck_faiss.index")
metadata = pd.read_csv("cybertruck_metadata.csv")
model = SentenceTransformer('all-MiniLM-L6-v2')

def search(query, top_k=5):
    query_vector = model.encode([query])
    D, I = index.search(query_vector, top_k)
    results = metadata.iloc[I[0]]
    return results

# Example use
query = "What do users say about Cybertruck design?"
results = search(query)
print(results[['content', 'source', 'created_date']])


                                                content   source created_date
4396  The Cybertruck's design is bold and futuristic...  YouTube   2025-03-07
4229  Love the Cybertruck's bold design, but curious...  YouTube   2025-03-07
4228  Love the Cybertruck's bold design, but curious...  YouTube   2025-03-11
4412  The cybertruck feels like it wouldve made a be...  YouTube   2025-03-26
6141  Cybertruck is done, might as well scrap the de...   reddit   2021-05-20


# RAG using Gemini

In [58]:
import google.generativeai as genai
from sentence_transformers import SentenceTransformer
import pandas as pd
import faiss

# Load your model and FAISS index
model = SentenceTransformer('all-MiniLM-L6-v2')
index = faiss.read_index("cybertruck_faiss.index")
metadata = pd.read_csv("cybertruck_metadata.csv")

# Configure Gemini
genai.configure(api_key="AIzaSyAcdL28C5c2havVkOOTMusG8Cyxf-DEXzU")
gemini_model = genai.GenerativeModel("models/gemini-1.5-flash-latest")

# Function to search + generate
def answer_with_gemini(query, top_k=5):
    # Embed the query
    query_vec = model.encode([query])
    D, I = index.search(query_vec, top_k)

    # Retrieve the top_k most relevant comments
    results = metadata.iloc[I[0]]
    context = "\n".join(f"- {row['content']}" for _, row in results.iterrows())

    # Construct prompt
    prompt = f"""
    You are a customer sentiment analyst using comments from Reddit and YouTube.

    Context:
    {context}

    Question:
    {query}
    """

    # Generate answer using Gemini
    response = gemini_model.generate_content(prompt)
    return response.text


In [60]:
answer = answer_with_gemini("What do people think about the Cybertruck's price?")
print(answer)


Sentiment regarding the Cybertruck's price is mixed and largely depends on the context of its current price relative to competitors.

**Negative Sentiment:**  A significant portion of the comments express concern that the *new* price point, coupled with the emergence of competing vehicles, makes the Cybertruck a poor value proposition.  There's a feeling it will "bomb" at the current pricing because alternatives now exist at a similar price.  One commenter even states they wouldn't buy it regardless of price due to its aesthetics.

**Positive Sentiment:** Some remain optimistic, hoping the price stays the same (presumably the *original* price) and that its success would disrupt the truck market.  This suggests a belief that the original pricing was competitive and desirable.

**Neutral/Uncertain Sentiment:**  One comment highlights a lack of understanding of the Cybertruck's value compared to competitors, requesting a side-by-side comparison, indicating uncertainty about its current pr

# MCP Sentiment Pipeline

In [62]:
import google.generativeai as genai
import pandas as pd
import faiss
from sentence_transformers import SentenceTransformer

# Configure Gemini
GENAI_API_KEY = "AIzaSyAcdL28C5c2havVkOOTMusG8Cyxf-DEXzU"
genai.configure(api_key=GENAI_API_KEY)
gemini = genai.GenerativeModel("models/gemini-1.5-flash-latest")

# Load embedding model, FAISS index, and metadata
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
index = faiss.read_index("cybertruck_faiss.index")
metadata = pd.read_csv("cybertruck_metadata.csv")

def retrieve_comments(product, top_k=5):
    """Retrieves top_k comments for a given product using FAISS."""
    comments = metadata[metadata['product_label'].str.lower() == product.lower()]['content'].tolist()
    if not comments:
        return "No comments found for this product."
    vectors = embedding_model.encode(comments, convert_to_numpy=True)
    index_local = faiss.IndexFlatL2(vectors.shape[1])
    index_local.add(vectors)
    D, I = index_local.search(vectors[:1], top_k)
    selected = [comments[i] for i in I[0]]
    return "\n".join(f"- {c}" for c in selected)

def cybertruck_analyst(comments):
    prompt = f"""
    You are a sentiment analyst.
    Summarize what users say about the Tesla Cybertruck based on the following comments:

    {comments}

    Focus on design, build quality, and performance.
    """
    return gemini.generate_content(prompt).text

def competitor_analyst(product, comments):
    prompt = f"""
    You are a sentiment analyst.
    Summarize what users say about the {product} based on the following comments:

    {comments}

    Focus on design, build quality, and performance.
    """
    return gemini.generate_content(prompt).text

def comparator(cyber_summary, rival_summary):
    prompt = f"""
    Compare public sentiment between the Tesla Cybertruck and its competitor.

    Cybertruck Summary:
    {cyber_summary}

    Competitor Summary:
    {rival_summary}

    Focus on differences and similarities in user opinions.
    """
    return gemini.generate_content(prompt).text

def strategist(comparison_text):
    prompt = f"""
    You are a Tesla product strategist.
    Based on the following comparison, suggest improvements or strategies for the Cybertruck:

    {comparison_text}
    """
    return gemini.generate_content(prompt).text

# Example end-to-end for all competitors
if __name__ == "__main__":
    competitors = ["Rivian R1T", "Ford F-150 Lightning", "Chevy Silverado EV", "GMC Hummer EV"]
    cyber_comments = retrieve_comments("Tesla Cybertruck")
    cyber_summary = cybertruck_analyst(cyber_comments)

    for rival in competitors:
        print(f"\n=== Comparing Cybertruck with {rival} ===")
        rival_comments = retrieve_comments(rival)
        rival_summary = competitor_analyst(rival, rival_comments)
        comparison = comparator(cyber_summary, rival_summary)
        recommendation = strategist(comparison)

        print(recommendation)
        print("\n" + "="*60 + "\n")



=== Comparing Cybertruck with Rivian R1T ===
The data highlights a critical marketing and product strategy gap for the Cybertruck. While the high volume of polarized feedback might seem negative, it actually presents an opportunity. The R1T's lack of engagement indicates a potential problem with its market penetration and brand awareness.  Here's a multi-pronged strategy to address the Cybertruck's issues and capitalize on the R1T's relative silence:

**I. Addressing Cybertruck Concerns:**

* **Design Refinements (Mitigate Polarization):**  The data clearly shows design is a major point of contention.  While maintaining the iconic Cybertruck aesthetic, we should explore subtle design modifications to soften the harsh edges and increase perceived practicality. This could involve minor adjustments to proportions, integrating more traditional design cues subtly, or exploring different trim and color options to broaden appeal.

* **Build Quality Assurance & Transparency:**  The perceived 