In [7]:
!pip install couchdb pyspark


Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
     ---------------------------------------- 0.0/317.3 MB ? eta -:--:--
     --------------------------------------- 2.6/317.3 MB 18.9 MB/s eta 0:00:17
      -------------------------------------- 5.8/317.3 MB 15.3 MB/s eta 0:00:21
      -------------------------------------- 7.6/317.3 MB 13.4 MB/s eta 0:00:24
     - ------------------------------------ 10.0/317.3 MB 12.9 MB/s eta 0:00:24
     - ------------------------------------ 12.3/317.3 MB 12.7 MB/s eta 0:00:25
     - ------------------------------------ 14.9/317.3 MB 12.5 MB/s eta 0:00:25
     - ------------------------------------ 16.3/317.3 MB 11.8 MB/s eta 0:00:26
     -- ----------------------------------- 18.1/317.3 MB 11.3 MB/s eta 0:00:27
     -- ----------------------------------- 19.4/317.3 MB 10.8 MB/s eta 0:00:28
     -- ----------------------------------- 21.2/317.3 MB 10.6 MB/s eta 0:00:29
     -- ----------------------------------- 22.3/317.3 MB 10.2

In [10]:
import os
from couchdb import Server
from pyspark.sql import SparkSession
import logging
import time

# Configurazione del logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configurazione di Spark
spark = SparkSession.builder \
    .appName("CouchDBToSparkExample") \
    .config("spark.executor.memory", "4g").config("spark.executor.cores", "2") \
    .getOrCreate()

# Recupero delle variabili d'ambiente
couchdb_url = "localhost:5984"
couchdb_user = os.getenv("COUCHDB_USER")
couchdb_password = os.getenv("COUCHDB_PASSWORD")

if not couchdb_user or not couchdb_password:
    logger.error("Credenziali CouchDB non trovate. Assicurati di impostare COUCHDB_USER e COUCHDB_PASSWORD.")
    exit(1)

# Creazione dell'URL completo
couchdb_url_with_credentials = f"http://{couchdb_user}:{couchdb_password}@{couchdb_url}"

In [11]:
# Funzione per leggere i documenti da CouchDB
def read_from_couchdb():
    try:
        couch_server = Server(couchdb_url_with_credentials)
        couch_db_name = 'paperllm'
        couch_db = couch_server[couch_db_name]
        logger.info(f"Connesso al database CouchDB: {couch_db_name}")

        # Recupera i documenti
        documents = [
            {
                "id": doc.id,
                **doc
            }
            for doc in couch_db.view('_all_docs', include_docs=True)
        ]
        logger.info(f"Numero di documenti recuperati: {len(documents)}")
        return documents

    except Exception as e:
        logger.error(f"Errore durante la lettura dei documenti: {e}")
        return []

# Funzione per creare un DataFrame da documenti di CouchDB
def create_dataframe_from_couchdb():
    documents = read_from_couchdb()
    if documents:
        df = spark.createDataFrame(documents)
        df.createOrReplaceTempView("couchdb_documents")
        logger.info(f"Tabella 'couchdb_documents' registrata con {len(documents)} righe.")
        return df
    else:
        logger.warning("Nessun documento trovato.")
        return None

# Creazione del DataFrame
df = create_dataframe_from_couchdb()

INFO:__main__:Connesso al database CouchDB: paperllm
INFO:__main__:Numero di documenti recuperati: 7
INFO:__main__:Tabella 'couchdb_documents' registrata con 7 righe.


In [None]:
# Verificare se la tabella è stata registrata correttamente
try:
    df = spark.table("couchdb_documents")
    print("Tabella 'couchdb_documents' presente.")
except AnalysisException as e:
    print("Tabella 'couchdb_documents' non trovata.")


Tabella 'couchdb_documents' presente.


In [None]:
# Stampare lo schema del DataFrame
df.printSchema()

# Mostrare le prime righe del DataFrame per controllare i dati
df.show(truncate=False)


In [None]:
# Assicurati che la tabella sia registrata
if spark.table("couchdb_documents"):
    # Esegui la query
    query = """
    SELECT * FROM couchdb_documents
    """
    result_df = spark.sql(query)
    result_df.show(truncate=False)
else:
    print("Tabella 'couchdb_documents' non trovata.")


In [None]:
# Seleziona solo un documento per visualizzare i dati interni
query = """
SELECT doc, value FROM couchdb_documents LIMIT 1
"""
result_df = spark.sql(query)
result_df.show(truncate=False)
