In [8]:
import os
import pandas as pd
import logging
from pyspark.sql import SparkSession
from tabulate import tabulate

In [9]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("CagliostroGutenberg") \
    .getOrCreate()

# Set Spark log level to INFO
spark.sparkContext.setLogLevel("INFO")


def load_data_spark(jdbc_url, table_name):
    """
    Load data from PostgreSQL into a Pandas DataFrame using PySpark.

    Parameters:
    - jdbc_url (str): The JDBC URL for the PostgreSQL database.
    - table_name (str): The name of the table to load data from.

    Returns:
    - pd.DataFrame: Data loaded into a Pandas DataFrame.
    """
    db_user = os.getenv('DB_USER')
    db_password = os.getenv('DB_PASSWORD')

    if not db_user or not db_password:
        logger.error("Database credentials not found in environment variables.")
        return None

    try:
        # Initialize Spark session with JDBC configuration
        spark = SparkSession.builder \
            .appName("Load Data from PostgreSQL") \
            .config("spark.jars", "/home/tron/git/project_gemma/jdbc/postgresql-42.7.4.jar") \
            .getOrCreate()

        properties = {
            'user': db_user,
            'password': db_password,
            'driver': 'org.postgresql.Driver'
        }

        df_spark = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
        logger.info(f"Data from {table_name} imported successfully!")
        logger.info(f"There are {df_spark.count()} rows and {len(df_spark.columns)} columns.")

        df_pandas = df_spark.toPandas()

        # Export schema
        schema = pd.DataFrame({
            'Column Name': df_spark.columns,
            'Data Type': [str(dtype) for dtype in df_spark.dtypes]
        })

        unique_values = {col: df_spark.select(col).distinct().count() for col in df_spark.columns}
        schema['n_unique'] = schema['Column Name'].map(unique_values)

        logger.info("Schema of the loaded dataset:")
        logger.info(f"{tabulate(schema, headers='keys', tablefmt='psql')}")

        return df_pandas

    except Exception as e:
        logger.error(f"Error loading data: {e}")
        return None

# Usage:
df = load_data_spark('jdbc:postgresql://localhost:5432/project_gemma', 'cagliostro_gutenberg')
#df_preview = df.head(1)

24/10/27 20:42:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/10/27 20:42:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
2024-10-27 20:42:37,672 - ERROR - Error loading data: An error occurred while calling o69.jdbc.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:103)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:103)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql

In [27]:
print(tabulate(df, headers='keys', tablefmt='psql'))

+------+------+-----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------------------------------------------------------------+--------------+------------+-----------+
|      |   id | chapter_title   | paragraph                                                                                                                                                                  | quote   | source_url                                                     | created_at   | title      | content   |
|------+------+-----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+----------------------------------------------------------------+--------------+------------+-----------|
|    0 |   24 | CHAPTER I       | 

In [2]:
df.shape

AttributeError: 'NoneType' object has no attribute 'shape'

In [28]:
logger = logging.getLogger("load_data_spark_logger")

def export_to_csv(df, file_path='exported_data.csv'):
    """
    Export a Pandas DataFrame to a CSV file.

    Parameters:
    - df (pd.DataFrame): The DataFrame to export.
    - file_path (str): The path where the CSV file will be saved.

    Returns:
    - bool: True if export is successful, False otherwise.
    """
    if df is None or df.empty:
        logger.error("DataFrame is empty or None. Cannot export to CSV.")
        return False

    try:
        df.to_csv(file_path, index=False)
        logger.info(f"Data successfully exported to {file_path}.")
        return True

    except Exception as e:
        logger.error(f"Error exporting DataFrame to CSV: {e}")
        return False

# Usage with load_data_spark output:
file_name = "cagliostro_gutenberg.csv"
file_path = os.path.join("..", "csv", file_name)
df = load_data_spark('jdbc:postgresql://localhost:5432/project_gemma', 'cagliostro_gutenberg')
if df is not None:
    export_success = export_to_csv(df, file_path)


2024-10-27 12:13:05,117 - INFO - Data from cagliostro_gutenberg imported successfully!
INFO:load_data_spark_logger:Data from cagliostro_gutenberg imported successfully!
24/10/27 12:13:05 INFO DAGScheduler: Registering RDD 650 (count at <unknown>:0) as input to shuffle 136
24/10/27 12:13:05 INFO DAGScheduler: Got map stage job 216 (count at <unknown>:0) with 1 output partitions
24/10/27 12:13:05 INFO DAGScheduler: Final stage: ShuffleMapStage 416 (count at <unknown>:0)
24/10/27 12:13:05 INFO DAGScheduler: Parents of final stage: List()
24/10/27 12:13:05 INFO DAGScheduler: Missing parents: List()
24/10/27 12:13:05 INFO DAGScheduler: Submitting ShuffleMapStage 416 (MapPartitionsRDD[650] at count at <unknown>:0), which has no missing parents
24/10/27 12:13:05 INFO MemoryStore: Block broadcast_216 stored as values in memory (estimated size 13.8 KiB, free 434.3 MiB)
24/10/27 12:13:05 INFO MemoryStore: Block broadcast_216_piece0 stored as bytes in memory (estimated size 7.2 KiB, free 434.3 Mi

24/10/27 12:26:01 INFO BlockManagerInfo: Removed broadcast_241_piece0 on tron-01.lan:45307 in memory (size: 20.0 KiB, free: 434.4 MiB)
24/10/27 12:26:01 INFO BlockManagerInfo: Removed broadcast_242_piece0 on tron-01.lan:45307 in memory (size: 5.9 KiB, free: 434.4 MiB)
