# MovieLens Data Enrichment - WebCrawling with bs4 and Pandas UDF

In [1]:
# install Pyspark on Google Colab
from google.colab import drive 
drive.mount('/content/drive')
!apt-get -y install openjdk-8-jre-headless
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
Suggested packages:
  libnss-mdns fonts-dejavu-extra fonts-ipafont-gothic fonts-ipafont-mincho
  fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jre-headless
0 upgraded, 1 newly installed, 0 to remove and 19 not upgraded.
Need to get 28.2 MB of archives.
After this operation, 104 MB of additional disk space will be used.
Ign:1 http://archive.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u312-b07-0ubuntu1~18.04
Err:1 http://security.ubuntu.com/ubuntu bionic-updates/universe amd64 openjdk-8-jre-headless amd64 8u312-b07-0ubuntu1~18.04
  404  Not Found [IP: 91.189.91.38 80]
E: Failed to fetch http://security.ubuntu.com/ubuntu/pool/universe/o/openjdk-8/openjdk-8-jre-headless_

In [2]:
# connect to google drive
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# import packages
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests as r
from bs4 import BeautifulSoup

In [4]:
# read links dataset
df_links = spark.read.csv("/content/drive/My Drive/Colab Notebooks/Recommendation/links.csv", \
                    header=True, inferSchema=True)
# drop the column we do not need
df_links = df_links.drop(df_links.tmdbId)
# add a partitionId key for pandas UDF to scale up
df_links = df_links.withColumn("partitionId", df_links.movieId%100)
df_links.show(5)

+-------+------+-----------+
|movieId|imdbId|partitionId|
+-------+------+-----------+
|      1|114709|          1|
|      2|113497|          2|
|      3|113228|          3|
|      4|114885|          4|
|      5|113041|          5|
+-------+------+-----------+
only showing top 5 rows



In [5]:
schema = StructType([StructField('imdbId', StringType(), True),
                     StructField('duration', StringType(), True),
                     StructField('n_rating', StringType(), True),
                     StructField('n_user_review', StringType(), True),
                     StructField('n_critic_review', StringType(), True)
                    ]) 

In [6]:
# define the Pandas UDF 
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def apply_model(sample_pd: pd.DataFrame)-> pd.DataFrame:
	# get all the imbd id
	imdbid_list = list(sample_pd.iloc[:,1])  
	def tolink(id):
		return "https://www.imdb.com/title/tt"+"0"*(7-len(str(id)))+str(id)
	url_list = list(map(tolink, imdbid_list))

	content = []
	for i in range(len(url_list)):
		response = r.get(url_list[i])
		# Parse the content of the request with BeautifulSoup
		soup = BeautifulSoup(response.text, 'html.parser')
		dcontainers = soup.find_all('div', class_ = 'sc-80d4314-2')
		duration = dcontainers[0].find_all('li',class_ = 'ipc-inline-list__item')[-1].text
		n_rating = soup.find('div', class_ = 'sc-7ab21ed2-3').text
		rcontainers = soup.find_all('li', class_ = 'ipc-inline-list__item sc-124be030-1 ghlYSH')
		n_user_review = rcontainers[0].find('span', class_ = 'score').text
		n_critic_review = rcontainers[1].find('span', class_ = 'score').text
		data = [imdbid_list[i], duration, n_rating, n_user_review, n_critic_review]
		# Append the info to the complete dataset
		content.append(data)
		
	movie_content = pd.DataFrame(content, columns = ['imdbId', 'duration', 'n_rating', 'n_user_review','n_critic_review'])
	movie_content['imdbId'] = movie_content['imdbId'].astype(str)
	movie_content['duration'] = movie_content['duration'].astype(str)
	movie_content['n_rating'] = movie_content['n_rating'].astype(str)
	movie_content['n_user_review'] = movie_content['n_user_review'].astype(str)
	movie_content['n_critic_review'] = movie_content['n_critic_review'].astype(str)
 
	return movie_content

In [7]:
# partition the data and run the UDF
fullraw = df_links.groupby(df_links.partitionId).apply(apply_model)
fullraw.show(5)



+------+--------+--------+-------------+---------------+
|imdbId|duration|n_rating|n_user_review|n_critic_review|
+------+--------+--------+-------------+---------------+
|115907|  1h 51m|     21K|           70|             50|
|110932|  2h 13m|     69K|          184|             67|
|107614|   2h 5m|    266K|          325|             60|
|110395|  1h 41m|    4.6K|           45|             18|
|112368|  1h 30m|    6.8K|           61|             21|
+------+--------+--------+-------------+---------------+
only showing top 5 rows



In [8]:
# convert duration to minute
def trans_duration(df):
  # add new columns
  df_result = df.withColumn('hour', substring('duration', 1,1))\
        .withColumn('minute', substring('duration', -3,2))
  # change dtypes
  df_result = df_result.withColumn("hour",df_result.hour.cast('integer'))
  df_result = df_result.withColumn("minute",df_result.minute.cast('integer'))
  # duration in minute
  df_result = df_result.withColumn('duration', df_result['hour']*60 + df_result['minute']).drop("hour","minute")
  return df_result.drop("hour","minute")

In [9]:
# numerical transformation
# K -> 1,000
# M -> 1,000,000
def Kto1000(df, Numbers:str):
  df_out = df.withColumn(Numbers, when(col(Numbers).like("%K"), (regexp_replace(Numbers, 'K', '').cast('double')*1000))\
      .when(col(Numbers).like("%M"), (regexp_replace(Numbers, 'M', '').cast('double')*1000000))\
      .when(col(Numbers).like("%B"), (regexp_replace(Numbers, 'B', '').cast('double')*1000000000))\
      .otherwise((regexp_replace(Numbers, ' ', '').cast('double'))))
  return df_out

In [11]:
def transfer(df):
  df_tr = trans_duration(df)
  df_tr = Kto1000(df_tr, "n_rating")
  df_tr = Kto1000(df_tr, "n_user_review")
  df_tr = Kto1000(df_tr, "n_critic_review")
  return df_tr

In [13]:
fullraw_tr = transfer(fullraw)
fullraw_tr.show(5) # 2 minutes

+------+--------+--------+-------------+---------------+
|imdbId|duration|n_rating|n_user_review|n_critic_review|
+------+--------+--------+-------------+---------------+
|115907|     111| 21000.0|         70.0|           50.0|
|110932|     133| 69000.0|        184.0|           67.0|
|107614|     125|266000.0|        325.0|           60.0|
|110395|     101|  4600.0|         45.0|           18.0|
|112368|      90|  6800.0|         61.0|           21.0|
+------+--------+--------+-------------+---------------+
only showing top 5 rows

