# Spark Session

In [3]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("read-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

# Read Postgres

In [6]:
df_movies = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://postgres/test")
    .option("dbtable", "public.movies")
    .option("user", "test")
    .option("password", "postgres")
    .load()
)

In [8]:
df_movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [7]:
df_ratings = (
    spark.read
    .format("jdbc")
    .option("url", "jdbc:postgresql://postgres/test")
    .option("dbtable", "public.ratings")
    .option("user", "test")
    .option("password", "postgres")
    .load()
)

In [10]:
df_ratings.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



# Top 10 movies with more ratings

In [10]:
df_movies = df_movies.alias("m")
df_ratings = df_ratings.alias("r")

df_join = df_ratings.join(df_movies, df_ratings.movieId == df_movies.movieId).select("r.*","m.title")

In [22]:
from pyspark.sql import functions as F

df_result = (
    df_join
    .groupBy("title")
    .agg(
        F.count("timestamp").alias("qty_ratings")
        ,F.mean("rating").alias("avg_rating")
    )
    .sort(F.desc("qty_ratings"))
    .limit(10)
)

In [24]:
df_result.coalesce(1).write.format("csv").mode("overwrite").save("/home/jovyan/work/data/output_postgres", header=True)