In [15]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext, functions as F
from pyspark.sql.functions import *

mongo_uri = "mongodb://hadoop-vm.internal.cloudapp.net:27017/ca2"

# Spark version 3.2.3
# MongoDB version 6.0.5
# Java Version 11

# create a spark session
# Jars dependencies available in maven repository
# https://mvnrepository.com/search?q=mongodb-driver-sync
spark = SparkSession.builder \
    .appName('Tweets') \
    .config("spark.mongodb.read.connection.uri", mongo_uri) \
    .config("spark.mongodb.write.connection.uri", mongo_uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.jars.packages", "org.mongodb:mongodb-driver-core:4.9.1") \
    .config("spark.jars.packages", "org.mongodb:mongodb-driver-sync:4.9.1") \
    .config("spark.jars.packages", "org.mongodb:bson:4.9.1") \
    .getOrCreate()

# read data from mongodb collection "tweets" into a dataframe "df"
df = spark.read \
    .format("mongodb") \
    .option("connection.uri", mongo_uri) \
    .option("database", "ca2") \
    .option("collection", "tweets") \
    .load()

df.printSchema()

root
 |-- _id: long (nullable = true)
 |-- coordinates: void (nullable = true)
 |-- full_text: string (nullable = true)
 |-- geo: void (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestamp_ms: string (nullable = true)



# Create a timeseries collection in Mongo DB


## Step 1: Mongo shell create timeseries_tweets collection
```javascript
use ca2;

db.createCollection("timeseries_tweets", {
  "timeseries": {
    "timeField": "timestamp",
    "metaField": "laboratory", 
    "granularity": "minutes"
  }
});
```


## Step 2: On existing tweets collection create a new timestamp field 

```javascript
db.tweets.find().forEach(function (doc) {
  var timestamp = new Date(Number(doc.timestamp_ms));
  var updateDoc = { $set: { "timestamp": timestamp } };
  db.tweets.updateOne({ "_id": doc._id }, updateDoc);
});
```



## Step 3: Migrate tweets collection to timeseries_tweets

```javascript
db.tweets.find().forEach(function (doc) {
  db.timeseries_tweets.insertOne(doc);
});

```

## Tweets Statistics

### Total count tweets by laboratory

In [8]:
# No need to compare with lower case as we are looking for exact match on hastags
def contains_hashtags(column, hashtags):
    condition = None
    for hashtag in hashtags:
        if condition is None:
            condition = column.contains(hashtag)
        else:
            condition = condition | column.contains(hashtag)
    return condition


In [9]:
laboratories = ['#Moderna', '#Pfizer', '#BioNTech', '#AstraZeneca', '#JohnsonAndJohnson', '#Janssen', '#Novavax', '#Sinovac', '#Sinopharm', '#Sanofi', '#GSK']

In [10]:
# Filtering tweets containing laboratories
df_lab = df.filter(contains_hashtags(col("text"), laboratories))

In [13]:
df_lab_monthly = df_lab.groupBy(   
    when(col("text").contains('#AstraZeneca'), "AstraZeneca").alias("laboratory"),
    date_trunc("month", "timestamp").alias("month")
).count() \
.groupBy("laboratory").agg(avg("count").alias("avg_tweets_per_month"))

In [14]:
df_lab_monthly.show()

+-----------+--------------------+
| laboratory|avg_tweets_per_month|
+-----------+--------------------+
|AstraZeneca|   636.9166666666666|
|       null|   778.5833333333334|
+-----------+--------------------+



# Create AstraZeneca collection

In [16]:
df_astrazeneca = df.filter(col("text").contains('#AstraZeneca'))
df_astrazeneca.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("connection.uri", mongo_uri) \
    .option("database", "ca2") \
    .option("collection", "astrazeneca_tweets") \
    .save()