In [1]:
import os
from pyspark.sql import SparkSession, functions
from utilities.extract import get_today
import csv

In [2]:
spark = SparkSession \
      .builder \
      .appName("Spark Ingestion") \
      .getOrCreate()

In [3]:
datasets_path = '/home/jovyan/raw_data'

In [4]:
def ingest_movies_data() : 
    movies_file = os.path.join(datasets_path, get_today(), 'movies.csv')
    if not os.path.exists(movies_file):
        print("File does not exist")
        return 
    movies_raw_data = spark.sparkContext.textFile(movies_file).map(lambda line: next(csv.reader([line])))
    movies_raw_data_header = movies_raw_data.take(1)[0]
    movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
    movies_data = movies_data.toDF(["movieId", "title", "genres"])
    movies_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "movies") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest movies data successfully")
    return movies_raw_data

In [5]:
def ingest_links_data():
    links_file = os.path.join(datasets_path, get_today(), 'links.csv')
    if not os.path.exists(links_file):
        print("File does not exist")
        return 
    links_raw_data = spark.sparkContext.textFile(links_file)
    links_raw_data_header = links_raw_data.take(1)[0]
    links_data = links_raw_data.filter(lambda line: line!=links_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2]))
    links_data = links_data.toDF(["movieId", "imdbId", "tmdbId"])
    links_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "links") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest links data successfully")

In [6]:
def ingest_tags_data():
    tags_file = os.path.join(datasets_path, get_today(), 'tags.csv')
    if not os.path.exists(tags_file):
        print("File does not exist")
        return 
    tags_raw_data = spark.sparkContext.textFile(tags_file)
    tags_raw_data_header = tags_raw_data.take(1)[0]
    tags_data = tags_raw_data.filter(lambda line: line!=tags_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2],tokens[3]))
    tags_data = tags_data.toDF(["userId", "movieId", "tag", "tagTime"])
    tags_data = tags_data.select(functions.col("userId"), \
                                 functions.col("movieId"), \
                                 functions.col("tag"), \
                                 functions.from_unixtime(functions.col("tagTime")).alias("tagTime"))
    tags_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "tags") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest tags data successfully")

In [7]:
def ingest_ratings_data() : 
    ratings_file = os.path.join(datasets_path, get_today(), 'ratings.csv')
    if not os.path.exists(ratings_file):
        print("File does not exist")
        return 
    ratings_raw_data = spark.sparkContext.textFile(ratings_file)
    ratings_raw_data_header = ratings_raw_data.take(1)[0]
    ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2],tokens[3]))
    ratings_data = ratings_data.toDF(["userId", "movieId", "ratingScore", "ratingTime"])
    ratings_data = ratings_data.select(functions.col("userId"), \
                                       functions.col("movieId"), \
                                       functions.col("ratingScore"), \
                                       functions.from_unixtime(functions.col("ratingTime")).alias("ratingTime"))
    ratings_data.write \
              .mode("append") \
              .format("jdbc") \
              .option("driver","com.mysql.cj.jdbc.Driver") \
              .option("url", "jdbc:mysql://cap2-database/MovieLens") \
              .option("dbtable", "ratings") \
              .option("user", "root") \
              .option("password", "123") \
              .save()
    print("Ingest ratings data successfully")

In [None]:
if __name__ == '__main__' : 
    ingest_movies_data()
    ingest_links_data()
    ingest_tags_data()
    ingest_ratings_data()

Ingest movies data successfully
Ingest links data successfully
Ingest tags data successfully
