# Part 0: Setting up

## Install and start MongoDB

In [None]:
!apt install -qq mongodb
!service mongodb start

## Download dataset and push to our mongodb 

In [None]:
# It's already the 21st century and people are very impatient, so they use Brotli for text and Zstd for everything else.
# Reference: https://github.com/google/brotli
!apt-get install -qq brotli

In [None]:
!wget -q https://csc14118.github.io/thuoc_raw.json.br
!wget -q https://csc14118.github.io/gia_ke_khai_raw.json.br
!wget -q https://csc14118.github.io/movies_lang.json.br 

In [None]:
!brotli -d *.br

In [None]:
!pip install -q pymongo

In [None]:
import json
from pymongo import MongoClient

client = MongoClient()

# Creation of the new database
db = client['input_data']

collection_name = ["gia_ke_khai_raw", "movies_lang", "thuoc_raw"]

# Push our data to mongodb
for data in collection_name:
    collection = db[data]
    chunks = json.load(open(f'{data}.json'))
    collection.insert_many(chunks)

# Create a dummy database to test
db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

## Install pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q "https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

In [None]:
import pyspark
print(pyspark.__version__)

## Dirty trick to connect spark to our mongodb

In industry environment, please read the docs carefully to seting up these complicated things.

In [None]:
!rm $SPARK_HOME/jars/mongo*.jar
!rm $SPARK_HOME/jars/bson*.jar

In [None]:
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.12/mongodb-driver-3.12.12.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.12/mongo-java-driver-3.12.12.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.6.0/bson-4.6.0.jar

In [None]:
from pyspark.shell import spark
from pyspark import SparkContext,SparkConf

uri = "mongodb://localhost:27017/input_data"

from pyspark.sql import SparkSession

spark_jb = "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"
my_spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1g") \
    .appName("csc14112") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .getOrCreate()

In [None]:
# Test read data from our mongo db
p = my_spark.read.format("mongodb").option("database","dummy").option("collection", "chunk").load()
p.printSchema()

In [None]:
p.show()

# Part 1: Introduction to PySpark


In this lab assignment, we will work with a movie dataset loaded into our MongoDB at `input_data.movies_lang`. We will use PySpark RDD and DataFrame to perform the following tasks:

In [None]:
# read data from mongodb as DataFrame
movies_df = my_spark.read.format("mongodb").option("database", "input_data").option("collection", "movies_lang").load()
# convert into RDD
movies_rdd = movies_df.rdd
# print the schema
movies_df.printSchema()

### (a) Count the number of movies by country. Sort by count in decreasing order.

In [None]:
# count the number of movies by country
n_movies_by_country_rdd = movies_rdd \
                          .map(lambda row: (row.country, 1)) \
                          .reduceByKey(lambda x, y: x + y) \
                          .sortBy(lambda x: x[1], ascending=False)
n_movies_by_country_rdd.collect()

### (b) Return the titles of the movies produced in France.

In [None]:
# filter the movies produced in France and select the titles
movies_in_fr_rdd = movies_rdd \
                  .filter(lambda row: row.country == "FR") \
                  .map(lambda row: row.title)
movies_in_fr_rdd.collect()

### (c) Return the title of the movies of which Sofia Coppola is one of the actresses. 

In [None]:
def is_has_sofia(actors):
  return any(actor.first_name == "Sofia" and 
             actor.last_name == "Coppola" for
             actor in actors)

# filter the movies where Sofia Coppola is one of the actors and select the titles
sofia_movies_rdd = movies_rdd \
                  .filter(lambda row: is_has_sofia(row.actors)) \
                  .map(lambda row: row.title)
sofia_movies_rdd.collect()

### (d) Return the names and birth dates of the directors of movies produced in France.


In [None]:
# extract the names and birth dates
directors_rdd = movies_rdd \
                .filter(lambda row: row.country == "FR") \
                .map(lambda row: (row.director.first_name + " " + row.director.last_name, row.director.birth_date))
directors_rdd.collect()

### (e) Return the average number of actors in a film.


In [None]:
# calculate the total number of actors and the total number of films
total_actors = movies_rdd \
              .map(lambda doc: len(doc['actors'])) \
              .sum()
total_films = movies_rdd.count()

# calculate the average number of actors in a film
avg_actors = total_actors / total_films
avg_actors

### (f) Return the name of the actor that acted in the most movies.

In [None]:
# get the actors and number of their movies
actors_rdd = movies_rdd \
            .map(lambda x: x.actors) \
            .flatMap(lambda x: x) \
            .map(lambda x: (x._id, (x.first_name + " " + x.last_name, 1))) \
            .reduceByKey(lambda x, y: (x[0], x[1] + y[1])) \
            .sortBy(lambda x: x[1][1], ascending=False)

# actors_rdd.first()[1][0]
actors_rdd.first()


# Part 2: Real-world Data Manipulation

In this part of the lab, we will work with two collections in our MongoDB: `gia_ke_khai_raw` and `thuoc_raw` loaded at `input_data.gia_ke_khai_raw` and `input_data.thuoc_raw` respectively. We will use PySpark RDD and DataFrame to perform the following tasks:

### (a)  Read the datasets into a DataFrame and print out the schema and the number of records.

In [None]:
# read data from mongodb as DataFrame
gia_ke_khai_df = my_spark.read \
                .format("mongodb") \
                .option("database", "input_data") \
                .option("collection", "gia_ke_khai_raw") \
                .load()
# convert into RDD
gia_ke_khai_rdd = gia_ke_khai_df.rdd
# print the schema
gia_ke_khai_df.printSchema()

In [None]:
gia_ke_khai_rdd.count()

In [None]:
# read data from mongodb as DataFrame
thuoc_df = my_spark.read \
                .format("mongodb") \
                .option("database", "input_data") \
                .option("collection", "thuoc_raw") \
                .load()
# convert into RDD
thuoc_rdd = thuoc_df.rdd
# print the schema
thuoc_df.printSchema()

In [None]:
thuoc_rdd.count()

### (b) Show all records in the `thuoc_raw` collection that have the same active pharmaceutical ingredient (API) in their `hoatChat` field as their medicine name.


Notes: In the context of medication, API stands for Active Pharmaceutical Ingredient, which is the biologically active component in a drug that produces the intended therapeutic effect. In other words, it is the chemical substance that gives a medicine its medicinal properties.

In [None]:
# filter the RDD to include only records with the same value for hoatChat and tenThuoc
result_rdd = thuoc_rdd.filter(lambda row: row["hoatChat"] == row["tenThuoc"])

# print the results
result_rdd.collect()

### (c) Create a new DataFrame from the `thuoc_raw` collection that splits the API in the `hoatChat` field into multiple rows. For example, "paracetamol" is the API in "Paracetamol 500 mg," and "amoxicillin" is the API in various medications such as "Amogentine 500mg/125mg," "Augbactam 1g/200mg," and "Viamomentin." The resulting DataFrame should have two columns: `hoatChat` and `thuocTuongUng` as a list. After processing the data, write it back to our MongoDB at `output_data.thuocthaythe`.

In [None]:
import re
class APIRegexHelper:
	def __init__(self, string):
		self.string = string
		
	# remove mass and unit
	def remove_mass_and_unit(self):
		# remove mass
		self.string = re.sub(r'\d+((,|\.)\d+)*\s*(mg|mcg|IU|UI|g|ml|l|kg|mm|\%|đơn vị USP)', '', self.string)
		return self

	# remove leading and trailing spaces
	def remove_space(self):
		self.string = self.string.strip()
		return self

	# remove all the string after the first parenthesis
	def remove_parenthesis(self):
		self.string = re.sub(r'(\(|\)).*', '', self.string)
		return self
	
	# remove all the slash
	def remove_slash(self):
		self.string = re.sub(r'\s*/', '', self.string)
		return self

	# remove all the string before the first colon
	def remove_colon(self):
		self.string = re.sub(r'.*:', '', self.string)
		return self
	
	# remove ratio
	def remove_ratio(self):
		self.string = re.sub(r'\d+:\d+', '', self.string)
		return self
	
	# remove line break
	def remove_line_break(self):
		self.string = re.sub(r'\n', '', self.string)
		return self
	
	# to lower case
	def to_lower_case(self):
		self.string = self.string.lower()
		return self
	
	# remove similar strings
	def remove_similar_string(self):
		self.string = re.sub(r'(tương đương|tương ứng|dưới dạng|dạng).*', '', self.string)
		return self

class UltimateAPIRegex:
	def __init__(self, string):
		self.string = string
		self.apis = []
		
	def get_apis(self):
		# split by comma or semicolon that not followed by digit
		temp_list = re.split(r'(,|;)\s*(?![0-9])', self.string)
		for s in temp_list:
			temp = APIRegexHelper(s) \
					.remove_line_break() \
					.remove_mass_and_unit() \
			 		.remove_parenthesis() \
					.remove_ratio() \
			 		.remove_colon() \
			 		.remove_slash() \
					.remove_similar_string() \
					.remove_space() \
					.to_lower_case()					
			self.apis.append(temp.string)
		self.remove_incorrect_string().add_vitamin()
		return self.apis
	
	# remove incorrect strings
	def remove_incorrect_string(self):
		incorrect_strings = [';', '(', ')', ':', '', ' ', ',', '...', '--', '…']
		self.apis = [s for s in self.apis if s not in incorrect_strings]
		return self

	# add vitamin
	def add_vitamin(self):
		for i in range(len(self.apis)):
			if len(self.apis[i]) <= 2:
				self.apis[i] = 'vitamin ' + self.apis[i]
		return self

In [None]:
thuoc_thay_the_rdd = thuoc_rdd \
                    .map(lambda x: (UltimateAPIRegex(x.hoatChat).get_apis(), x.tenThuoc)) \
                    .flatMap(lambda x: [(v, x[1]) for v in x[0]]) \
                    .groupByKey() \
                    .map(lambda x: (x[0], list(x[1])))
        
thuoc_thay_the_rdd.top(10)

In [None]:
# turn into DataFrame
thuoc_thay_the_df = thuoc_thay_the_rdd.toDF(["hoatChat", "thuocTuongUng"])
thuoc_thay_the_df.show()

In [None]:
# write to database
thuoc_thay_the_df.write \
.format("mongodb") \
.option("database", "output_data") \
.option("collection", "thuocthaythe") \
.mode("overwrite") \
.save()

In [None]:
# check
thuoc = my_spark.read \
.format("mongodb") \
.option("database", "output_data") \
.option("collection", "thuocthaythe") \
.load()
thuoc.show()

### (d) Create new DataFrame from two mentioned above that contains  `tenThuoc`, `hoatChat`, `dongGoi`, `dvt` and `giaBan`. After process the data, write it back to our mongodb at `output_data.giathuoc`.

In [None]:
gia_ke_khai_rdd = gia_ke_khai_df.rdd
thuoc_rdd = thuoc_df.rdd
# select necessary columns from gia_ke_khai_rdd
gia_ke_khai_rdd = gia_ke_khai_rdd \
                  .map(lambda x: (
                      x["sdk"],
                      {
                          "dvt": x["dvt"],
                          "giaBan": x["giaBan"]
                       }
                  ))
# select necessary columns from thuoc_rdd
thuoc_rdd = thuoc_rdd \
            .map(lambda x: (
                x["soDangKy"],
                {
                    "tenThuoc": x["tenThuoc"],
                    "hoatChat": x["hoatChat"],
                    "dongGoi": x["dongGoi"],
                }
            ))
# join two rdds on the common keys
joined_rdd = gia_ke_khai_rdd.join(thuoc_rdd)
# create a dictionary with necessary columns and values
giathuoc_rdd = joined_rdd \
            .map(lambda x: {
                "tenThuoc": x[1][1]["tenThuoc"],
                "hoatChat": x[1][1]["hoatChat"],
                "dongGoi": x[1][1]["dongGoi"],
                "dvt": x[1][0]["dvt"],
                "giaBan": x[1][0]["giaBan"]
            })

giathuoc_df = giathuoc_rdd.toDF()
giathuoc_df.show()

In [None]:
# write to database
giathuoc_df.write \
.format("mongodb") \
.option("database", "output_data") \
.option("collection", "giathuoc") \
.mode("overwrite") \
.save()

In [None]:
# check
giathuoc = my_spark.read \
.format("mongodb") \
.option("database", "output_data") \
.option("collection", "giathuoc") \
.load()

giathuoc.show()