In [None]:
import os
os.chdir('../')

In [None]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.core import Config
from databricks.connect import DatabricksSession

w = WorkspaceClient()
# get shared cluster id
shared_cluster_id = [
    c.cluster_id for c in w.clusters.list()
    if ("RUNNING" in str(c.state)) and ("Shared Compute" in c.cluster_name)
    ][0]

# I am using the config to setup a session
config = Config()
config.cluster_id = shared_cluster_id
spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()

In [14]:
from blast_gen_ai.arxiv import search_arxiv_papers
df = search_arxiv_papers(spark, "lithium brine", max_results=10)
df.show(5)

search_query=all:lithium+brine&searchtype=all&start=0&max_results=10&source=header&orderby=relevance
+--------------------+--------------------+------------+
|               title|                 url|   unique_id|
+--------------------+--------------------+------------+
|Modeling the morp...|http://arxiv.org/...| 0903.2823v2|
|Energy optimizati...|http://arxiv.org/...| 1211.3685v2|
|Distribution and ...|http://arxiv.org/...|2012.00100v1|
|Feasibility of ch...|http://arxiv.org/...|2101.07754v1|
|Pore-Scale Visual...|http://arxiv.org/...|2207.04128v1|
+--------------------+--------------------+------------+
only showing top 5 rows



In [15]:
catalog = "scottmckean_catalog"
schema = "blast"
research_table_name = "research_paper_urls"
research_table_path = f"{catalog}.{schema}.{research_table_name}"

In [16]:
# Define the SQL command to create a new schema
create_schema_sql = f"""
CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}
"""

# Execute the SQL command
spark.sql(create_schema_sql)

DataFrame[]

In [17]:
(
  df.write.format("delta")
  .mode("overwrite")
  .saveAsTable(research_table_path)
)

In [19]:
# now we can run our batch job to do keyword searches and append new records
from delta.tables import DeltaTable
# Assuming 'existing_table' is a Delta table
existing_research = DeltaTable.forName(spark, research_table_path)

for keywords_search in [
    'geothermal energy',
    'lithium brine',
    'natural gas exploration'
  ]:
    new_research = search_arxiv_papers(spark, keywords_search, max_results=100)

    # Perform merge operation
    existing_research.alias("existing").merge(
        new_research.alias("updates"),
        "existing.unique_id = updates.unique_id"
    ).whenNotMatchedInsertAll().execute()

search_query=all:geothermal+energy&searchtype=all&start=0&max_results=100&source=header&orderby=relevance
search_query=all:lithium+brine&searchtype=all&start=0&max_results=100&source=header&orderby=relevance
search_query=all:natural+gas+exploration&searchtype=all&start=0&max_results=100&source=header&orderby=relevance


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BinaryType
import requests

# Define UDF to download PDF
@udf(returnType=BinaryType())
def download_pdf(url):
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.content
        else:
            return None
    except Exception as e:
        print(f"Error downloading {url}: {str(e)}")
        return None

In [30]:
# Read source Delta table containing URLs
source_table = spark.table(research_table_path)

In [38]:
# Apply UDF to download PDFs in parallel
result_df = source_table.limit(5).withColumn("pdf_content", download_pdf(col("url")))

In [39]:
result_df.show()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 1788, in main
    check_python_version(infile)
  File "/databricks/spark/python/pyspark/worker_util.py", line 81, in check_python_version
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version: 3.10 than that in driver: 3.12, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


In [None]:




# Write results to new Delta table
result_df.write.format("delta").mode("overwrite").save("path/to/destination/delta/table")