In [1]:
import pandas as pd
from pymongo import MongoClient
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
MONGODB_URI = os.environ.get('MONGODB_URI')
DATABASE_NAME = "msds697_project"

In [3]:
client = MongoClient(MONGODB_URI)
db = client[DATABASE_NAME]
gnews_collection = db['gnews']
twitter_collection = db['twitter']

In [4]:
ss = SparkSession.builder.appName('read_from_mongo').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/28 14:51:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
def collection_to_dataframe(collection):
    cursor = collection.find()
    list_of_dictionaries = list(cursor)
    for doc in list_of_dictionaries:
        if "_id" in doc:
            doc["_id"] = str(doc["_id"])
    spark_df = ss.createDataFrame(list_of_dictionaries)
    if "_id" in spark_df.columns:
        spark_df = spark_df.drop("_id")
    return spark_df

In [6]:
gnews = collection_to_dataframe(gnews_collection)
twitter = collection_to_dataframe(twitter_collection)

In [7]:
gnews.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- source: string (nullable = true)
 |-- title: string (nullable = true)



In [8]:
twitter.printSchema()

root
 |-- Source: string (nullable = true)
 |-- label: long (nullable = true)
 |-- text: string (nullable = true)



In [9]:
gnews.groupBy('source').agg(F.count('title').alias('count_by_source')).sort('count_by_source',ascending=0).show()

[Stage 0:>                                                        (0 + 10) / 10]

+--------------------+---------------+
|              source|count_by_source|
+--------------------+---------------+
|            NBC News|           3111|
|The Associated Press|           1460|
|            CBS News|           1154|
|            ABC News|            829|
|        PBS NewsHour|            388|
|                 CNN|            180|
|                 NPR|            119|
|  The New York Times|             64|
|            Fox News|             60|
|             Reuters|             22|
|           USA TODAY|             19|
| The Washington Post|             11|
|The Wall Street J...|              7|
|           Bloomberg|              6|
|                 PBS|              4|
|   CNN International|              2|
|   Los Angeles Times|              2|
|      CNN Press Room|              2|
|   Reuters Institute|              2|
|The New York Time...|              2|
+--------------------+---------------+



                                                                                

In [10]:
gnews.createOrReplaceTempView("gnews_table")

ss.sql("""
    SELECT source, COUNT(title) AS count_by_source
    FROM gnews_table
    GROUP BY source
    ORDER BY count_by_source DESC
""").show()

+--------------------+---------------+
|              source|count_by_source|
+--------------------+---------------+
|            NBC News|           3111|
|The Associated Press|           1460|
|            CBS News|           1154|
|            ABC News|            829|
|        PBS NewsHour|            388|
|                 CNN|            180|
|                 NPR|            119|
|  The New York Times|             64|
|            Fox News|             60|
|             Reuters|             22|
|           USA TODAY|             19|
| The Washington Post|             11|
|The Wall Street J...|              7|
|           Bloomberg|              6|
|                 PBS|              4|
|   CNN International|              2|
|   Los Angeles Times|              2|
|      CNN Press Room|              2|
|   Reuters Institute|              2|
|The New York Time...|              2|
+--------------------+---------------+



In [11]:
twitter.groupBy('Source').agg(F.count('text').alias('count_by_source')).sort('count_by_source',ascending=0).show()

+-------+---------------+
| Source|count_by_source|
+-------+---------------+
|FoxNews|           1002|
|   ESPN|            426|
|Twitter|            398|
+-------+---------------+



In [12]:
twitter.createOrReplaceTempView("twitter_table")

ss.sql("""
    SELECT Source, COUNT(text) AS count_by_source
    FROM twitter_table
    GROUP BY Source
    ORDER BY count_by_source DESC
""").show()

+-------+---------------+
| Source|count_by_source|
+-------+---------------+
|FoxNews|           1002|
|   ESPN|            426|
|Twitter|            398|
+-------+---------------+

