### Spark session

In [1]:
from pyspark.sql import SparkSession

# Spark session and context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-42.6.0.jar")
         .getOrCreate())
sc = spark.sparkContext

### Read CSV data

In [2]:
df_movies_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/work/data/movies.csv")
)

In [3]:
df_rating_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/jovyan/work/data/ratings.csv")
    .withColumnRenamed("timestamp","timestamp_epoch")
)

In [4]:
# Convert epoch to timestamp and rating to DoubleType
from pyspark.sql.functions import from_unixtime, col, to_timestamp
from pyspark.sql.types import DoubleType

df_rating_csv_fmt = (
    df_rating_csv
    .withColumn('rating', col("rating").cast(DoubleType()))
    .withColumn('timestamp', to_timestamp(from_unixtime(col("timestamp_epoch"))))
)

### Load data to Postgres

In [5]:
(df_movies_csv.write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.movies")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())

In [6]:
(df_rating_csv_fmt
 .select([c for c in df_rating_csv_fmt.columns if c != "timestamp_epoch"])
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.ratings")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())