# 1.0 Spark & Mongo Work

## 1.1 Assessment Plan

This notebook will follow this procedure

- Read in Data from Hadoop/HFDS

- Create Spark Session and read in the csv's

- Perform some spark processing

- Store SOurce Dataset in NoSQL DB - MOngoDB chosen

- Extract dataset from MongDB to csv


## 1.2 Reading from Hadoop

This section involves reading in our twitter data which is stored in hdfs

My data is stored in hdfs as below:

In [1]:
#hduser@Vm:~/$ hadoop fs -ls /
#Found 2 items
#-rw-r--r--   1 hduser supergroup 1813056825 2023-05-23 22:11 /tweets.csv
#-rw-r--r--   1 hduser supergroup   85123366 2023-05-24 10:39 /users.csv


## 1.3 Spark Work

In [2]:
from pyspark.sql import SparkSession
#importing spark session 
import logging

Creating a spark session

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
# mimising the error output for spark processes
spark.sparkContext.setLogLevel("ERROR")

In [5]:
#Now that my data is stored in hdfs, lets read it into spark
#Data is stored in / , which is hdfs directory

On first attempt at reading in my data, the data was all of type string.

Now lets create a shcema that we can use going forward first

This dataset came with 2 csv files, one for tweets, and one for users/user details. Later I will merge them

In [6]:
# Now read in data from hdfs


tweets = spark.read.option("header","true").option("delimiter", ",").csv('/tweets.csv')
users = spark.read.option("header","true").option("delimiter", ",").csv('/users.csv')


tweets.show(5)
users.show(5)

                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+--------------------+----------+--------------------+--------------------+--------------+-----------+--------------+
|            tweetUrl|                date|     renderedContent|             tweetId|              userId|replyCount|retweetCount|           likeCount|quoteCount|              source|               media|retweetedTweet|quotedTweet|mentionedUsers|
+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+--------------------+----------+--------------------+--------------------+--------------+-----------+--------------+
|https://twitter.c...|2021-03-30 03:33:...|          Support 👇|                null|                null|      null|        null|                null|      null|                null|                null|          null|       null|          null|
|    #Farmers

In [7]:
tweets.printSchema()

root
 |-- tweetUrl: string (nullable = true)
 |-- date: string (nullable = true)
 |-- renderedContent: string (nullable = true)
 |-- tweetId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- replyCount: string (nullable = true)
 |-- retweetCount: string (nullable = true)
 |-- likeCount: string (nullable = true)
 |-- quoteCount: string (nullable = true)
 |-- source: string (nullable = true)
 |-- media: string (nullable = true)
 |-- retweetedTweet: string (nullable = true)
 |-- quotedTweet: string (nullable = true)
 |-- mentionedUsers: string (nullable = true)



In [8]:
users.printSchema()

root
 |-- username: string (nullable = true)
 |-- displayname: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rawDescription: string (nullable = true)
 |-- descriptionUrls: string (nullable = true)
 |-- verified: string (nullable = true)
 |-- created: string (nullable = true)
 |-- followersCount: string (nullable = true)
 |-- friendsCount: string (nullable = true)
 |-- statusesCount: string (nullable = true)
 |-- favouritesCount: string (nullable = true)
 |-- listedCount: string (nullable = true)
 |-- mediaCount: string (nullable = true)
 |-- location: string (nullable = true)
 |-- protected: string (nullable = true)
 |-- linkUrl: string (nullable = true)
 |-- profileImageUrl: string (nullable = true)
 |-- profileBannerUrl: string (nullable = true)
 |-- profileUrl: string (nullable = true)



In [9]:
row_count = tweets.count()
column_count = len(tweets.columns)

print("Tweets DF Rows: {}".format(row_count))
print(" Tweets DF Columns: {}".format(column_count))




Tweets DF Rows: 3113384
 Tweets DF Columns: 14


                                                                                

In [10]:
row_count2 = users.count()
column_count2 = len(users.columns)

print("Users DF Rows: {}".format(row_count2))
print("Users DF Columns: {}".format(column_count2))


[Stage 6:>                                                          (0 + 1) / 1]

Users DF Rows: 345992
Users DF Columns: 19


                                                                                

Now lets check for issues where the hastag isnt present in tweets and create new dataframe with the ones that do contain FarmersProtest

In [11]:
hashtag = 'FarmersProtest'

In [12]:
from pyspark.sql import functions as F
correct_tweets = tweets.filter(F.lower(tweets.renderedContent).rlike(fr"(?i){hashtag}"))

In [13]:
count = correct_tweets .count()

                                                                                

In [14]:
print("Number of tweets with the hashtag", hashtag, ":", count)

Number of tweets with the hashtag FarmersProtest : 439004


NOw lets merge the dataframes, keeping userId as the join

Pandas uses merge, but Spark uses a 'join' method

In [15]:
test_df = correct_tweets.join(users , on='userID', how='inner')

In [16]:
test_df.show(3)



+--------------------+--------------------+--------------------+--------------------+--------------------+----------+------------+---------+----------+--------------------+--------------------+--------------+-----------+--------------------+--------------+-------------+--------------------+---------------+--------+--------------------+--------------+------------+-------------+---------------+-----------+----------+----------------+---------+-------+--------------------+--------------------+--------------------+
|              userId|            tweetUrl|                date|     renderedContent|             tweetId|replyCount|retweetCount|likeCount|quoteCount|              source|               media|retweetedTweet|quotedTweet|      mentionedUsers|      username|  displayname|      rawDescription|descriptionUrls|verified|             created|followersCount|friendsCount|statusesCount|favouritesCount|listedCount|mediaCount|        location|protected|linkUrl|     profileImageUrl|    profi

                                                                                

Sccesfully joined both datasets

Finally for now, lets justt keep columns of interest to us (renderedContent, date and username) for now

In [17]:
keep_columns = ["date","username","renderedContent"]

In [18]:
final_df = test_df.select(keep_columns)

In [19]:
final_df.show()



+--------------------+---------------+--------------------+
|                date|       username|     renderedContent|
+--------------------+---------------+--------------------+
|2021-06-14 05:21:...|      Kush_2308|@noconversion It ...|
|2021-06-04 14:07:...|      Kush_2308|@Shivam_h9 This w...|
|2021-02-03 18:21:...| Mohit__yadav19|We love our farme...|
|2021-02-02 21:02:...| Mohit__yadav19|Thanks for standi...|
|2021-02-02 20:56:...| Mohit__yadav19|whole Bollywood i...|
|2021-02-02 20:47:...| Mohit__yadav19|The whole world i...|
|2021-02-02 20:45:...| Mohit__yadav19|You bjp agent how...|
|2021-02-02 20:32:...| Mohit__yadav19|International peo...|
|2021-11-19 06:15:...| Mohit__yadav19|Today I remembere...|
|2021-11-19 05:40:...|    Amritsohana|Finally We Won🙏❤...|
|2021-04-16 03:39:...|  curioussinghs|@pacifistrebel Ab...|
|2021-10-31 13:22:...|  curioussinghs|#FarmersProtest h...|
|2021-02-05 17:27:...|  DrGauravGarg4|#UNHRC issues adv...|
|2021-08-29 05:53:...|Rajwant79905048|I t

                                                                                

In [20]:
row_count3 = final_df.count()
column_count3 = len(final_df.columns)

print("Final Spark Work DF Rows: {}".format(row_count3))
print("Final Spark Work DF Columns: {}".format(column_count3))



Final Spark Work DF Rows: 818753
Final Spark Work DF Columns: 3


                                                                                

## 1.4 MongoDB Work

Here the plan is to store the above dataframe into the NoSQL DB - MongoDB


In [None]:
pip install pymongo==3.11.0


I got stuck with recurrent errors here, ended up having to downgrade versions of Pymongo to 3.11.0

In [None]:
from pymongo import MongoClient

In [None]:
#client = MongoClient('mongodb://localhost:27017')

Here I needed to download the mogodb spark connector:
    
    
This was downlaoded from : 

https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/3.0.1/
    
THen moved to spark jars directory with:

 cp /home/hduser/Downloads/mongo-spark-connector_2.12-3.0.1.jar /usr/local/spark-3.1.3-bin-hadoop3.2/jars/


In [27]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pymongo import MongoClient

In [28]:
print(pyspark.__version__)

3.1.3


In [24]:
from pyspark.sql import SparkSession
from pymongo import MongoClient




In [25]:
# Convert the DataFrame to a list of dictionaries
data = final_df.toJSON().map(lambda x: eval(x)).collect()

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017')
db = client['local']

# Create a new collection and insert the data
collection_name = 'my_collection'
db[collection_name].insert_many(data)

# Verify the collection and inserted documents
collection_names = db.list_collection_names()
if collection_name in collection_names:
    print(f"Collection '{collection_name}' created successfully.")
    print(f"Inserted {len(data)} documents into '{collection_name}'.")
else:
    print(f"Failed to create collection '{collection_name}'.")

# Close the MongoDB connection
client.close()

# Stop the SparkSession
spark.stop()


                                                                                

Collection 'my_collection' created successfully.
Inserted 818753 documents into 'my_collection'.
