# ETL with Spark

- membuat ETL menggunakan spark ke database postgreSQL
- database yang digunakan adalah database movie rating

In [22]:
import time

from pyspark.sql import SparkSession 
from pyspark.sql import functions as fnc
from pyspark.sql.types import IntegerType, StringType, StructType, StructField 

In [23]:
now_ = time.time()
POSTGRESQL_JDBC_PATH = "postgresql-42.2.19.jar"
spark = SparkSession.builder.appName("Pyspark_film").config("spark.jars", POSTGRESQL_JDBC_PATH).getOrCreate()
print(f"Creating spark session : {time.time() - now_} s\n")

Creating spark session : 0.010308980941772461 s



In [24]:
ratingSchema = StructType([ \
    StructField("userID", IntegerType(), True), \
    StructField("movieID", IntegerType(), True), \
    StructField("rating", IntegerType(), True), \
    StructField("time", IntegerType(), True)]
)

moviesSchema = StructType([ \
    StructField("movieID", IntegerType(), True), \
    StructField("movieTitle", StringType(), True), \
    StructField("genres", StringType(), True)]
)

def getRatingPercent(rating):
    return rating/5 * 100
getRatingPercentUDF = fnc.udf(getRatingPercent)

start_time = time.time()

In [25]:
now_ = time.time()
ratingsDF = spark.read.option("sep", "::").schema(ratingSchema).csv("ml-1m/ratings.dat")
print(f"Loading data ratings: {time.time() - now_} s")
print(f"Ratings data length: {ratingsDF.count()}\n")

Loading data ratings: 0.12536382675170898 s
Ratings data length: 1000209



In [26]:
now_= time.time()
moviesDF = spark.read.option("sep", "::").schema(moviesSchema).csv("ml-1m/movies.dat")
print(f"Loading data films: {time.time() - now_} s")
print(f"Films data length: {moviesDF.count()}\n")

Loading data films: 0.025614023208618164 s
Films data length: 3883



In [27]:
now_ = time.time()
ratingsDF = ratingsDF.withColumn("ratingPercent", getRatingPercentUDF(fnc.col("rating")).cast(IntegerType()))
print(f"Create rating percentage: {time.time() - now_} s\n")

Create rating percentage: 0.06072711944580078 s



In [28]:
now_ = time.time()
ratingsDF = ratingsDF.alias('ratings')
moviesDF = moviesDF.alias('movies')
joinedDF = ratingsDF.join(moviesDF, fnc.col('ratings.movieID') == fnc.col('movies.movieID')) \
            .select('ratings.*', 'movies.movieTitle', 'movies.genres')
print(f"Joining ratings and films dataframes: {time.time() - now_} s\n")

Joining ratings and films dataframes: 0.06629085540771484 s



In [29]:
now_ = time.time()
ratingsAvgDF = ratingsDF["movieID","ratingPercent"].groupBy("movieID").avg()
print(f"Create average rating for each movieID dataframe: {time.time() - now_} s\n")

Create average rating for each movieID dataframe: 0.05833721160888672 s



In [30]:
now_ = time.time()
joinedDF.repartition(10) \
    .write.format('jdbc').options(
        url='jdbc:postgresql://localhost:5433/spark_try',
        driver='org.postgresql.Driver',
        dbtable='film_rating_spark',
        user='postgres',
        password='admin'
    ).save()
print(f"Writing joined dataframe to postgresql: {time.time() - now_} s\n")

print(f"Total time: {time.time() - start_time} s\n")

print("Stopping Spark session ...")
spark.stop()
print("Spark session stopped")

Writing joined dataframe to postgresql: 16.260953903198242 s

Total time: 42.30452036857605 s

Stopping Spark session ...
Spark session stopped


berdasarkan hasil percobaan diperoleh waktu eksekusi selama 42.30452036857605 detik, 
untuk membuat ETL dengan spark 