In [0]:
import requests
import json
from pyspark.sql import SparkSession
import re

In [0]:
def connect_rest_api(spark, url: str, query:str):

    # Ensure query starts with '?', but only if it exists
    full_url = url + query if query.startswith("?") else url + "?" + query if query else url

    response = requests.get(full_url)
    response.raise_for_status()

    json_data = response.json()

    if isinstance(json_data, dict):
        for value in json_data.values():
            if isinstance(value, list):
                return spark.read.json(spark.sparkContext.parallelize(value))
        return spark.createDataFrame([json_data])

    if isinstance(json_data, list):
        return spark.read.json(spark.sparkContext.parallelize(json_data))

    raise Exception("Unsupported API JSON structure")

In [0]:
def connect_jdbc(spark, connectorstring: str, query: str, driver: str = None):
    reader = spark.read.format("jdbc") \
        .option("url", connectorstring) \
        .option("query", query)
    if driver:
        reader = reader.option("driver", driver)
    return reader.load()

In [0]:
def connect_olap(spark, sql_server_url, openquery_mdx, username, password):
    mdx_clean = openquery_mdx.strip().replace("'", "''")
    openquery_sql = f"""
    (
        SELECT * FROM OPENQUERY(
            SSAS,
            '{mdx_clean}'
        )
    ) AS mdx_result
    """

    connection_properties = {
        "user": username,
        "password": password,
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }

    return spark.read.jdbc(
        url=sql_server_url,
        table=openquery_sql,
        properties=connection_properties
    )