In [1]:
import pymongo
import pandas as pd

# MongoDB connection details
MONGO_URI = "mongodb+srv://<username>:<password>.iinrc.mongodb.net/?retryWrites=true&w=majority&appName=makertdata"
MONGO_DB = "stock_data"
MONGO_COLLECTION = "aapl_news"

# Connect to MongoDB
client = pymongo.MongoClient(MONGO_URI)
db = client[MONGO_DB]
collection = db[MONGO_COLLECTION]

# Count documents in the collection
collection.count_documents({})

94

In [2]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Create Spark session with MongoDB connector - using version 3.0.1 which is more widely available
spark = SparkSession.builder \
    .appName("Apple Sentiment Analysis") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local[*]") \
    .getOrCreate()

# MongoDB connection details
mongodb_uri = "mongodb+srv://<username>:<password>@makertdata.iinrc.mongodb.net/?retryWrites=true&w=majority"

# Load data from MongoDB - using the older MongoDB connector syntax
news_df = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", mongodb_uri) \
    .load()

# Register as a temp view for SQL queries
news_df.createOrReplaceTempView("aapl_news")

# Display the first few rows to verify data is loaded correctly
news_df.show(5, truncate=False)

# Get basic info about the dataframe
print("DataFrame Schema:")
news_df.printSchema()

print(f"Total number of records: {news_df.count()}")

25/03/07 15:11:42 WARN Utils: Your hostname, Daniels-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
25/03/07 15:11:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/danielmendoza/anaconda3/envs/DistributedComputing/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/danielmendoza/.ivy2/cache
The jars for the packages stored in: /Users/danielmendoza/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-74b2781c-61f7-4947-bd2b-b6a27966f62f;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
downloading https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/mongo-spark-connector_2.12-3.0.1.jar ...
	[SUCCESSFUL ] org.mongodb.spark#mongo-spark-connector_2.12;3.0.1!mongo-spark-connector_2.12.jar (70ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.0.5/mongodb-driver-sync-4.0.5.jar ...
	[SUCCESSFUL ] org.mongodb#mongodb-driver-sync;4.0.5!mongodb-driver-sync.jar (33ms)
downloading https://

+--------------------------+-----------------------------------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|_id                       |ticker_sentiment                               |time_published |title                                                                                                                                                                                                                                     |
+--------------------------+-----------------------------------------------+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{67b8d4f5be7232

In [4]:
import time

# Time the Spark SQL query execution
start_time = time.time()

# Run the sentiment analysis with SparkSQL
sentiment_analysis = spark.sql("""
    WITH sentiment_data AS (
        SELECT 
            explode(ticker_sentiment) as ticker_info,
            time_published
        FROM aapl_news
    )
    
    SELECT 
        CONCAT(SUBSTRING(time_published, 1, 4), '-', 
               SUBSTRING(time_published, 5, 2), '-', 
               SUBSTRING(time_published, 7, 2)) as date,
        AVG(CAST(ticker_info.ticker_sentiment_score AS DOUBLE)) as avg_sentiment,
        COUNT(*) as article_count
    FROM sentiment_data
    WHERE ticker_info.ticker = 'AAPL'
    GROUP BY CONCAT(SUBSTRING(time_published, 1, 4), '-', 
                   SUBSTRING(time_published, 5, 2), '-', 
                   SUBSTRING(time_published, 7, 2))
    ORDER BY date
""")

# Calculate execution time
spark_execution_time = time.time() - start_time
print(f"Spark SQL execution time: {spark_execution_time:.4f} seconds")

# Show the results
print("Sentiment Analysis Results:")
sentiment_analysis.show(20, False)

# Convert to Pandas dataframe for easier manipulation (optional)
pandas_df = sentiment_analysis.toPandas()
print("Results as Pandas DataFrame:")
print(pandas_df)

Spark SQL execution time: 0.5489 seconds
Sentiment Analysis Results:
+----------+---------------------+-------------+
|date      |avg_sentiment        |article_count|
+----------+---------------------+-------------+
|2025-01-29|0.08027400000000001  |3            |
|2025-01-30|0.1187685            |2            |
|2025-01-31|-0.028153333333333325|3            |
|2025-02-03|-0.14467             |1            |
|2025-02-04|0.08738233333333334  |3            |
|2025-02-05|0.084617             |1            |
|2025-02-06|-0.051376            |1            |
|2025-02-10|0.17859125           |4            |
|2025-02-12|0.0457248            |5            |
|2025-02-13|0.221359             |1            |
|2025-02-14|0.030186199999999996 |5            |
|2025-02-15|0.152276             |3            |
|2025-02-16|0.18065033333333333  |3            |
|2025-02-17|0.10014              |2            |
|2025-02-18|0.12344              |4            |
|2025-02-19|0.135929             |13           |


In [5]:
from pymongo import MongoClient
import time

# Connect to MongoDB
client = MongoClient("mongodb+srv://<username>:<password>@makertdata.iinrc.mongodb.net/")
db = client["stock_data"]
collection = db["aapl_news"]

# Time the MongoDB aggregation query
start_time = time.time()

# MongoDB aggregation pipeline (same as the one you used)
pipeline = [
  { "$unwind": "$ticker_sentiment" },
  { "$match": { "ticker_sentiment.ticker": "AAPL" } },
  { "$addFields": {
      "date": {
        "$concat": [
          { "$substr": ["$time_published", 0, 4] }, "-",
          { "$substr": ["$time_published", 4, 2] }, "-",
          { "$substr": ["$time_published", 6, 2] }
        ]
      }
    }
  },
  { "$group": {
      "_id": "$date",
      "avgSentiment": { "$avg": { "$toDouble": "$ticker_sentiment.ticker_sentiment_score" } },
      "articleCount": { "$sum": 1 }
    }
  },
  { "$sort": { "_id": 1 } }
]

# Execute the query
result = list(collection.aggregate(pipeline))

# Calculate execution time
mongo_execution_time = time.time() - start_time
print(f"MongoDB execution time: {mongo_execution_time:.4f} seconds")

# Print results for comparison
for doc in result:
    print(doc)

MongoDB execution time: 0.8236 seconds
{'_id': '2025-01-29', 'avgSentiment': 0.08027400000000001, 'articleCount': 3}
{'_id': '2025-01-30', 'avgSentiment': 0.1187685, 'articleCount': 2}
{'_id': '2025-01-31', 'avgSentiment': -0.028153333333333325, 'articleCount': 3}
{'_id': '2025-02-03', 'avgSentiment': -0.14467, 'articleCount': 1}
{'_id': '2025-02-04', 'avgSentiment': 0.08738233333333334, 'articleCount': 3}
{'_id': '2025-02-05', 'avgSentiment': 0.084617, 'articleCount': 1}
{'_id': '2025-02-06', 'avgSentiment': -0.051376, 'articleCount': 1}
{'_id': '2025-02-10', 'avgSentiment': 0.17859125, 'articleCount': 4}
{'_id': '2025-02-12', 'avgSentiment': 0.0457248, 'articleCount': 5}
{'_id': '2025-02-13', 'avgSentiment': 0.221359, 'articleCount': 1}
{'_id': '2025-02-14', 'avgSentiment': 0.030186199999999996, 'articleCount': 5}
{'_id': '2025-02-15', 'avgSentiment': 0.152276, 'articleCount': 3}
{'_id': '2025-02-16', 'avgSentiment': 0.18065033333333333, 'articleCount': 3}
{'_id': '2025-02-17', 'avgS

In [7]:
from pymongo import MongoClient
import time

# Connect to MongoDB
client = MongoClient("mongodb+srv://<username>:<password>@makertdata.iinrc.mongodb.net/")
db = client["stock_data"]
collection = db["market_prices_master"]  # Use your actual collection name

# Time the MongoDB aggregation query
start_time = time.time()

# MongoDB aggregation pipeline for price statistics by symbol
pipeline = [
  # Group by symbol
  { "$group": {
      "_id": "$symbol",
      "count": { "$sum": 1 },
      "avg_close": { "$avg": "$close" },
      "min_close": { "$min": "$close" },
      "max_close": { "$max": "$close" },
      "avg_volume": { "$avg": "$volume" },
      "total_volume": { "$sum": "$volume" },
      # Calculate price volatility stats
      "avg_price_range": { 
        "$avg": { 
          "$subtract": ["$high", "$low"] 
        }
      },
      "max_price_range": {
        "$max": { 
          "$subtract": ["$high", "$low"] 
        }
      }
    }
  },
  
  # Calculate price range as percentage of average closing price
  { "$addFields": {
      "avg_volatility_pct": {
        "$multiply": [
          { "$divide": ["$avg_price_range", "$avg_close"] },
          100
        ]
      }
    }
  },
  
  # Sort by trading volume (most active stocks first)
  { "$sort": { "total_volume": -1 } },
  
  # Limit to top 10 stocks by volume
  { "$limit": 10 }
]

# Execute the query
result = list(collection.aggregate(pipeline))

# Calculate execution time
mongo_execution_time = time.time() - start_time
print(f"MongoDB execution time: {mongo_execution_time:.4f} seconds")

# Print results for comparison
print("MongoDB Results:")
for doc in result:
    print(f"Symbol: {doc['_id']}, Avg Close: ${doc['avg_close']:.2f}, Avg Volatility: {doc['avg_volatility_pct']:.2f}%, Total Volume: {doc['total_volume']:,}")

MongoDB execution time: 0.8502 seconds
MongoDB Results:
Symbol: TSLA, Avg Close: $338.47, Avg Volatility: 4.75%, Total Volume: 8,689,641,720.0
Symbol: AAPL, Avg Close: $237.72, Avg Volatility: 2.25%, Total Volume: 5,427,330,811.0
Symbol: GOOGL, Avg Close: $193.84, Avg Volatility: 2.13%, Total Volume: 2,660,553,958.0
Symbol: MSFT, Avg Close: $424.63, Avg Volatility: 1.72%, Total Volume: 2,138,416,953.0
Symbol: BA, Avg Close: $173.01, Avg Volatility: 3.32%, Total Volume: 175,229,284.0
Symbol: GBTC, Avg Close: $79.65, Avg Volatility: 3.29%, Total Volume: 64,187,835.0


In [8]:
from pyspark.sql import SparkSession
import time

# Create Spark session with MongoDB connector
spark = SparkSession.builder \
    .appName("Stock Price Analysis") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local[*]") \
    .getOrCreate()

# MongoDB connection details
mongodb_uri = "mongodb+srv://<username>:<password>.iinrc.mongodb.net/stock_data.market_prices_master"

# Load data from MongoDB
prices_df = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", mongodb_uri) \
    .load()

# Register as a temp view for SQL queries
prices_df.createOrReplaceTempView("market_prices")

# Time the Spark SQL query execution
start_time = time.time()

# Run the price statistics analysis with SparkSQL
price_analysis = spark.sql("""
    SELECT 
        symbol,
        COUNT(*) as trading_days,
        AVG(close) as avg_close,
        MIN(close) as min_close,
        MAX(close) as max_close,
        AVG(volume) as avg_volume,
        SUM(volume) as total_volume,
        AVG(high - low) as avg_price_range,
        MAX(high - low) as max_price_range,
        (AVG(high - low) / AVG(close)) * 100 as avg_volatility_pct
    FROM market_prices
    GROUP BY symbol
    ORDER BY total_volume DESC
    LIMIT 10
""")

# Calculate execution time
spark_execution_time = time.time() - start_time
print(f"Spark SQL execution time: {spark_execution_time:.4f} seconds")

# Show the results
print("Spark SQL Results:")
price_analysis.show(truncate=False)

25/03/07 15:33:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Spark SQL execution time: 0.0950 seconds
Spark SQL Results:
+------+------------+------------------+---------+---------+--------------------+-------------+------------------+------------------+------------------+
|symbol|trading_days|avg_close         |min_close|max_close|avg_volume          |total_volume |avg_price_range   |max_price_range   |avg_volatility_pct|
+------+------------+------------------+---------+---------+--------------------+-------------+------------------+------------------+------------------+
|TSLA  |100         |338.4703          |213.65   |479.86   |8.68964172E7        |8.68964172E9 |16.067099000000002|61.5299           |4.746974549908812 |
|AAPL  |102         |237.72049019607837|222.64   |259.02   |5.320912559803922E7 |5.427330811E9|5.347441176470589 |13.75             |2.2494658209983807|
|GOOGL |99          |193.84404040404038|183.61   |206.38   |2.6874282404040404E7|2.660553958E9|4.124191919191918 |9.259999999999991 |2.127582519738871 |
|MSFT  |99          |4