# Pyspark Usage with Delta Lake & Minio

This notebook shows how to write a CSV file directly to Minio, and also how to write and read a managed Delta Lake table in Minio.

The version of the org.apache.hadoop:hadoop-aws jar is dependent on the version included in the Bitnami Spark docker image.  If the notebook is unable to find the jar, open a terminal in the root level of the git repo and run `docker-compose exec spark-worker ls jars | grep hadoop-aws`.  This will print out the version of the hadoop-aws jar to use below.

## Get Environment Variables for Minio (S3) Connection

In [None]:
import pyspark
import os

In [None]:
os.environ 
## Should see S3_ENDPOINT, S3_ACCESS_KEY, and S3_SECRET_KEY environment varibles.
# These environment variables are set in the docker-compose.yml, and the service account used by PySpark
#> to read from and write to Minio are created by the minio-init container defined in docker-compose.yml

In [None]:
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY")
S3_SECRET_KEY = os.environ.get("S3_SECRET_KEY")
S3_ENDPOINT = os.environ.get("S3_ENDPOINT")
# S3_ACCESS_KEY = "sparkaccesskey"
# S3_SECRET_KEY = "sparksupersecretkey"
# S3_ENDPOINT = "http://minio:9000"

## Configure Pyspark to Connect to Minio and Enable Delta-Lake Format

In [None]:
conf = pyspark.SparkConf().setMaster("spark://spark:7077")
conf.set("spark.jars.packages", 'org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-core_2.12:2.1.0')
# conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
conf.set('spark.hadoop.fs.s3a.endpoint', S3_ENDPOINT)
conf.set('spark.hadoop.fs.s3a.access.key', S3_ACCESS_KEY)
conf.set('spark.hadoop.fs.s3a.secret.key', S3_SECRET_KEY)
conf.set('spark.hadoop.fs.s3a.path.style.access', "true")
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

sc = pyspark.SparkContext(conf=conf)

# sc.setLogLevel("INFO")

In [None]:
spark = pyspark.sql.SparkSession(sc)

## Read in Sample CSV Data from Local Filesystem

In [None]:
df = spark.read.option("header", "true").csv("/data/appl_stock.csv")

In [None]:
df.show()

## Write CSV Directly to Minio (Not as a Delta Table)

In [None]:
S3_BUCKET = "test"

In [None]:
df.write.csv(f"s3a://{S3_BUCKET}/appl_stock.csv")

# Write a Delta Lake Table in Minio using Spark

In [None]:
delta_df = df
for col in delta_df.columns:
    delta_df = delta_df.withColumnRenamed(col, col.replace(" ","_"))

In [None]:
delta_df.show()

In [None]:
delta_df.write.format("delta").save("s3a://test/appl_stock_delta_table")

# Read the Delta Table Back into Spark

In [None]:
delta_df = spark.read.format("delta").load("s3a://test/appl_stock_delta_table")

In [None]:
df.show()