# Imports

In [None]:
from src.config import conf

import pyspark.sql.functions as func
from src.utils.spark import spark, sqlContext
import pandas as pd

In [None]:
pdf = pd.DataFrame(
    {
        "int_col": [1, 2, 3],
        "float_col": [1.1, 2.2, 3.3],
        "bool_col": [True, False, True],
        "string_col": ["pippo", "pluto", "paperino"],
        "time_col": pd.date_range(start="2024-01-01", periods=3, freq="12H"),
    }
)
pdf["date_col"] = pd.to_datetime(pdf["time_col"]).dt.date

In [None]:
# read from file
df = spark.createDataFrame(pdf)
df.createOrReplaceTempView("table_df")
sqlContext.cacheTable("table_df")
df.count()

In [None]:
df.toPandas()

# Write to database

In [None]:
db_url = f"jdbc:postgresql://{conf.sql_connection.server}:{conf.sql_connection.port}/{conf.sql_connection.db_name}"
db_url

In [None]:
(
    df.write.format("jdbc")
    .option("url", db_url)
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", "POSTGRES_TABLE")
    .option("user", conf.sql_login.username)
    .option("password", conf.sql_login.password.get_secret_value())
    .mode("overwrite")
    .save()
)

# Read from database

In [None]:
loaded_df = (
    spark.read.format("jdbc")
    .option("url", db_url)
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", "POSTGRES_TABLE")
    .option("user", conf.sql_login.username)
    .option("password", conf.sql_login.password.get_secret_value())
    .load()
)

In [None]:
loaded_df.toPandas()

# Write to S3
Note that here, S3 crendetials have been defined in spark session config at `src/utils/spark.py`

In [None]:
file_path = f"s3a://{conf.s3_creds.bucket_name}/dummy_test.parquet"
file_path

In [None]:
df.write.parquet(file_path, mode="overwrite")

In [None]:
loaded_df = spark.read.parquet(file_path)

In [None]:
loaded_df.toPandas()