In [1]:
import os
import csv
import requests
import pandas as pd
from pyspark.sql import SparkSession, functions
from utilities.extract import get_today

In [2]:
spark = SparkSession \
      .builder \
      .appName("Spark Ingestion") \
      .getOrCreate()

In [3]:
datasets_path = '/home/jovyan/raw_data'

In [4]:
def ingest_movies_data() : 
    movies_file = os.path.join(datasets_path, get_today(), 'movies.csv')
    if not os.path.exists(movies_file):
        print("File does not exist")
        return 
    movies_raw_data = spark.sparkContext.textFile(movies_file).map(lambda line: next(csv.reader([line])))
    movies_raw_data_header = movies_raw_data.take(1)[0]
    movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
    movies_data = movies_data.toDF(["movieId", "title", "genres"])
    movies_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "movies") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest movies data successfully")
    return movies_raw_data

In [5]:
def ingest_links_data():
    links_file = os.path.join(datasets_path, get_today(), 'links.csv')
    if not os.path.exists(links_file):
        print("File does not exist")
        return 
    links_raw_data = spark.sparkContext.textFile(links_file)
    links_raw_data_header = links_raw_data.take(1)[0]
    links_data = links_raw_data.filter(lambda line: line!=links_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
    links_data = links_data.toDF(["movieId", "imdbId", "tmdbId"])
    links_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "links") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest links data successfully")

In [6]:
def ingest_tags_data():
    tags_file = os.path.join(datasets_path, get_today(), 'tags.csv')
    if not os.path.exists(tags_file):
        print("File does not exist")
        return 
    tags_raw_data = spark.sparkContext.textFile(tags_file)
    tags_raw_data_header = tags_raw_data.take(1)[0]
    tags_data = tags_raw_data.filter(lambda line: line!=tags_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2],tokens[3]))
    tags_data = tags_data.toDF(["userId", "movieId", "tag", "tagTime"])
    tags_data = tags_data.select(functions.col("userId"), \
                                 functions.col("movieId"), \
                                 functions.col("tag"), \
                                 functions.from_unixtime(functions.col("tagTime")).alias("tagTime"))
    tags_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "tags") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest tags data successfully")

In [7]:
def ingest_ratings_data() : 
    ratings_file = os.path.join(datasets_path, get_today(), 'ratings.csv')
    if not os.path.exists(ratings_file):
        print("File does not exist")
        return 
    ratings_raw_data = spark.sparkContext.textFile(ratings_file)
    ratings_raw_data_header = ratings_raw_data.take(1)[0]
    ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2],tokens[3]))
    ratings_data = ratings_data.toDF(["userId", "movieId", "ratingScore", "ratingTime"])
    ratings_data = ratings_data.select(functions.col("userId"), \
                                       functions.col("movieId"), \
                                       functions.col("ratingScore"), \
                                       functions.from_unixtime(functions.col("ratingTime")).alias("ratingTime"))
    ratings_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "ratings") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest ratings data successfully")

In [None]:
def get_movie_information():
    movies_data = spark.read \
                        .format("jdbc") \
                        .option("driver","com.mysql.cj.jdbc.Driver") \
                        .option("url", "jdbc:mysql://cap2-database/MovieLens") \
                        .option("dbtable", "movies") \
                        .option("user", "root") \
                        .option("password", "123") \
                        .load()\
                        .rdd
    movies_detail_data = spark.read \
                        .format("jdbc") \
                        .option("driver","com.mysql.cj.jdbc.Driver") \
                        .option("url", "jdbc:mysql://cap2-database/MovieLens") \
                        .option("dbtable", "movies_detail") \
                        .option("user", "root") \
                        .option("password", "123") \
                        .load()\
                        .rdd
    movies_detail_data = movies_detail_data.map(lambda x: x[0]).collect()
    new_movies = movies_data.filter(lambda r: r[0] not in movies_detail_data).map(lambda x: x[0]).collect()
    links_data =   spark.read \
                        .format("jdbc") \
                        .option("driver","com.mysql.cj.jdbc.Driver") \
                        .option("url", "jdbc:mysql://cap2-database/MovieLens") \
                        .option("dbtable", "links") \
                        .option("user", "root") \
                        .option("password", "123") \
                        .load()\
                        .rdd
    movie_links = links_data.filter(lambda r: r[0] in new_movies).map(lambda x: x[2]).collect()
    api_key = '59bdc898ca49489bafe29fb36f895728'
    movies_detail_df = pd.DataFrame(columns = ['movieId', 'title', 'releaseDate', 'genres', 'runtime', 
                                               'overview', 'backgroundPath', 'posterPath'])
    count = 0
    for movie_link in movie_links : 
        detail_url = 'https://api.themoviedb.org/3/movie/%s?api_key=%s&language=en-US' % (movie_link, api_key)
        response = requests.get(detail_url).json()
        movieId = new_movies[count]
        try :
            title = response['original_title'] or response['title']
        except : 
            continue
        releaseDate = response['release_date'] or None
        genres = []
        for genre in response['genres'] :
            genres.append(genre['name'])
        genres = ', '.join(genres)
        runtime = response['runtime']
        overview = response['overview']
        backgroundPath = response['backdrop_path']
        posterPath = response['poster_path']
        movies_detail_df_aux = pd.DataFrame({'movieId': [movieId], 'title': [title], 'releaseDate': [releaseDate], 'genres': [genres], 
                                             'runtime': [runtime], 'overview': [overview], 'backgroundPath': [backgroundPath], 
                                             'posterPath': [posterPath]})
        movies_detail_df = pd.concat([movies_detail_df, movies_detail_df_aux], ignore_index = True, axis = 0)
        if count % 200 == 0 or count == len(movie_links) - 1 : 
            movies_detail_df = spark.createDataFrame(movies_detail_df)
            movies_detail_df.write \
                          .mode("append") \
                          .format("jdbc") \
                          .option("driver","com.mysql.cj.jdbc.Driver") \
                          .option("url", "jdbc:mysql://cap2-database/MovieLens") \
                          .option("dbtable", "movies_detail") \
                          .option("user", "root") \
                          .option("password", "123") \
                          .save()
            movies_detail_df = pd.DataFrame(columns = ['movieId', 'title', 'releaseDate', 'genres', 'runtime', 
                                                       'overview', 'backgroundPath', 'posterPath'])
        print(movieId)
        count += 1
    print("Ingest movies detail data successfully")

In [12]:
if __name__ == '__main__' : 
    ingest_movies_data()
    ingest_links_data()
    ingest_tags_data()
    ingest_ratings_data()
    get_movie_information()

Ingest movies data successfully
Ingest links data successfully
Ingest tags data successfully
Ingest ratings data successfully
Ingest movies detail data successfully
