In [1]:
%load_ext sparksql_magic

In [2]:
import os
import socket
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [3]:
# Pyspark config
os.environ['PYSPARK_PYTHON'] = 'python3.11.8'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3.11.8'
# Create Spark config for our Kubernetes based cluster manager
SPARK_PORT_MAX_RETRIES = 2
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"

In [5]:
conf = SparkConf().setAppName("spark-app-miguel-20250701")
conf.setMaster("k8s://https://kubernetes.default:443")
conf.set("spark.submit.deployMode", "client")
conf.set("spark.kubernetes.namespace", os.getenv("POD_NAMESPACE"))
conf.set("spark.kubernetes.container.image", "miguelmanuttupa/pyspark-k8s-python3.11:3.5.0")
conf.set("spark.kubernetes.container.image.pullPolicy","IfNotPresent")
conf.set("spark.kubernetes.pyspark.pythonVersion", "3")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", os.getenv("SERVICE_ACCOUNT_NAME"))
conf.set("spark.driver.host", socket.gethostbyname(socket.gethostname()))
conf.set("spark.kubernetes.executor.deleteOnTermination","true")
# conf.set("spark.driver.port", "2222")
# conf.set("spark.blockManager.port", "7777")
conf.set("spark.port.maxRetries", "2")
# RESOURCES
conf.set("spark.executor.instances", "1")
conf.set("spark.executor.cores", "2")
conf.set("spark.executor.memory", "4G")
# MINIO / S3
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
conf.set("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT_URL"))
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.access.key", os.getenv("ACCESS_KEY"))
conf.set("spark.hadoop.fs.s3a.secret.key", os.getenv("SECRET_KEY"))
# DELTA LAKE
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
# HIVE
conf.set("spark.sql.catalogImplementation", "hive")
conf.set("spark.sql.warehouse.dir", os.getenv("WAREHOUSE_DIR"))
conf.set("hive.metastore.uris", os.getenv("METASTORE_URI"))
conf.set("spark.databricks.delta.commitInfo.userMetadata", os.getenv("NB_USER"))

<pyspark.conf.SparkConf at 0x7efd4963ab50>

In [6]:
sc = SparkContext.getOrCreate(conf=conf)

In [7]:
spark = SparkSession(sc)

In [8]:
%%sparksql
SHOW DATABASES;

0
namespace
bronze
default


In [9]:
df = spark.read.format("parquet").load("s3a://lhchdev/STA_TIPO_CAMBIO")

In [10]:
df.cache().count()

12777

In [11]:
df.write.format("delta").mode("append").saveAsTable("bronze.STA_TIPO_CAMBIO")

In [12]:
%%sparksql
DESCRIBE HISTORY bronze.sta_tipo_cambio

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2025-08-24 05:56:41,,,WRITE,"{'mode': 'Append', 'partitionBy': '[]'}",,,,0,Serializable,True,"{'numOutputRows': '12777', 'numOutputBytes': '63488', 'numFiles': '1'}",usuario1,Apache-Spark/3.5.0 Delta-Lake/3.2.0
0,2025-08-24 05:51:24,,,CREATE OR REPLACE TABLE AS SELECT,"{'partitionBy': '[]', 'description': None, 'properties': '{}', 'clusterBy': '[]', 'isManaged': 'true'}",,,,,Serializable,False,"{'numOutputRows': '12777', 'numOutputBytes': '63488', 'numFiles': '1'}",,Apache-Spark/3.5.0 Delta-Lake/3.2.0
