In [16]:
import os
from datetime import date

import requests

HOME = os.path.expanduser('~')
DATALAKE_ROOT_FOLDER = HOME + "/datalake/"


def fetch_data_from_imdb1(**kwargs):
   current_day = date.today().strftime("%Y%m%d")
   TARGET_PATH = DATALAKE_ROOT_FOLDER + "raw/imdb/MovieRating/" + current_day + "/"
   if not os.path.exists(TARGET_PATH):
       os.makedirs(TARGET_PATH)

   url = 'https://datasets.imdbws.com/title.ratings.tsv.gz'
   r = requests.get(url, allow_redirects=True)
   open(TARGET_PATH + 'title.ratings.tsv.gz', 'wb').write(r.content)


In [17]:
fetch_data_from_imdb1()

In [18]:
HOME = os.path.expanduser('~')
DATALAKE_ROOT_FOLDER = HOME + "/datalake/"

def fetch_data_from_imdb2(url, data_entity_name, **kwargs):
    current_day = date.today().strftime("%Y%m%d")
    TARGET_PATH = DATALAKE_ROOT_FOLDER + "raw/imdb/" + data_entity_name + "/" + current_day + "/"
    if not os.path.exists(TARGET_PATH):
        os.makedirs(TARGET_PATH)
        
    file_name = url.split("/")[-1]
    file_path = os.path.join(TARGET_PATH, file_name)
    
    r = requests.get(url, allow_redirects=True)
    open(file_path, 'wb').write(r.content)


In [19]:
fetch_data_from_imdb2('https://datasets.imdbws.com/title.ratings.tsv.gz', 'MovieRating')
fetch_data_from_imdb2('https://datasets.imdbws.com/name.basics.tsv.gz', 'MovieName')

In [20]:

import pandas as pd

HOME = os.path.expanduser('~')
DATALAKE_ROOT_FOLDER = HOME + "/datalake/"


def convert_raw_to_formatted(file_name, current_day):
   RATING_PATH = DATALAKE_ROOT_FOLDER + "raw/imdb/MovieRating/" + current_day + "/" + file_name
   FORMATTED_RATING_FOLDER = DATALAKE_ROOT_FOLDER + "formatted/imdb/MovieRating/" + current_day + "/"
   if not os.path.exists(FORMATTED_RATING_FOLDER):
       os.makedirs(FORMATTED_RATING_FOLDER)
   df = pd.read_csv(RATING_PATH, sep='\t')
   parquet_file_name = file_name.replace(".tsv.gz", ".snappy.parquet")
   df.to_parquet(FORMATTED_RATING_FOLDER + parquet_file_name)


In [24]:
# Specify the file name and current day
file_name = "title.ratings.tsv.gz"
current_day = "20230606"

# Call the function
convert_raw_to_formatted(file_name, current_day)


In [22]:
import os
from pyspark.sql import SQLContext

HOME = os.path.expanduser('~')
DATALAKE_ROOT_FOLDER = HOME + "/datalake/"


def combine_data(current_day):
   RATING_PATH = DATALAKE_ROOT_FOLDER + "formatted/imdb/MovieRating/" + current_day + "/"
   USAGE_OUTPUT_FOLDER_STATS = DATALAKE_ROOT_FOLDER + "usage/movieAnalysis/MovieStatistics/" + current_day + "/"
   USAGE_OUTPUT_FOLDER_BEST = DATALAKE_ROOT_FOLDER + "usage/movieAnalysis/MovieTop10/" + current_day + "/"
   if not os.path.exists(USAGE_OUTPUT_FOLDER_STATS):
       os.makedirs(USAGE_OUTPUT_FOLDER_STATS)
   if not os.path.exists(USAGE_OUTPUT_FOLDER_BEST):
       os.makedirs(USAGE_OUTPUT_FOLDER_BEST)

   from pyspark import SparkContext

   sc = SparkContext(appName="CombineData")
   sqlContext = SQLContext(sc)
   df_ratings = sqlContext.read.parquet(RATING_PATH)
   df_ratings.registerTempTable("ratings")

   # Check content of the DataFrame df_ratings:
   print(df_ratings.show())

   stats_df = sqlContext.sql("SELECT AVG(averageRating) AS avg_rating,"
                             "       MAX(averageRating) AS max_rating,"
                             "       MIN(averageRating) AS min_rating,"
                             "       COUNT(averageRating) AS count_rating"
                             "    FROM ratings LIMIT 10")
   top10_df = sqlContext.sql("SELECT tconst, averageRating"
                             "    FROM ratings"
                             "    WHERE numVotes > 50000 "
                             "    ORDER BY averageRating DESC"
                             "    LIMIT 10")

   # Check content of the DataFrame stats_df and save it:
   print(stats_df.show())
   stats_df.write.save(USAGE_OUTPUT_FOLDER_STATS + "res.snappy.parquet", mode="overwrite")

   # Check content of the DataFrame top10_df  and save it:
   print(top10_df.show())
   stats_df.write.save(USAGE_OUTPUT_FOLDER_BEST + "res.snappy.parquet", mode="overwrite")


In [25]:
# Specify the current day
current_day = "20230606"

# Call the function
combine_data(current_day)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=CombineData, master=local[*]) created by __init__ at C:\Users\qcxl9\AppData\Local\Temp\ipykernel_44300\3611609488.py:19 