# Imports

In [None]:
import pyspark.sql.functions as func
from core.config import conf
from core.utils.spark import spark, sqlContext

In [None]:
conf.sql_login.username

In [None]:
spark

In [None]:
# read from file
df = spark.read.csv("../../data/test.csv.dummy", header=True)
df.createOrReplaceTempView("table_df")
sqlContext.cacheTable("table_df")
df.count()

In [None]:
df.toPandas()

In [None]:
# convert data types
df = (
    df
    .withColumn("COLUMN_A", func.col("COLUMN_A").cast("float"))
    .withColumn("COLUMN_C", func.to_date(func.col("COLUMN_C"), format="yyyy-MM-dd"))
)

# Write to database

In [None]:
db_url = f"jdbc:postgresql://{conf.sql_connection.server}:{conf.sql_connection.port}/{conf.sql_connection.db_name}"

In [None]:
(
    df.write.format("jdbc")
    .option("url", db_url)
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", "POSTGRES_TABLE")
    .option("user", conf.sql_login.username)
    .option("password", conf.sql_login.password.get_secret_value())
    .mode("overwrite")
    .save()
)

# Read from database

In [None]:
loaded_df = (
    spark.read.format("jdbc")
    .option("url", db_url)
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", "POSTGRES_TABLE")
    .option("user", conf.sql_login.username)
    .option("password", conf.sql_login.password.get_secret_value())
    .load()
)

In [None]:
loaded_df.toPandas()