# Pyspark Usage with Delta Lake & Minio

This notebook shows how to write a CSV file directly to Minio, and also how to write and read a managed Delta Lake table in Minio.

Click the Table of Contents button in the left JupyterLab sidebar (the button on the far left of this browser window that looks like a bulleted list) to see the types of examples provided. **Make sure to run all the cells above a given section, since most examples in this notebook depend on those above them**

## Get Environment Variables for Minio (S3) Connection

In [1]:
import pyspark
import os

In [2]:
os.environ 
## Should see AWS_ENDPOINT_URL, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY environment varibles.
# These environment variables are set in the docker-compose.yml, and the service account used by PySpark
#> to read from and write to Minio are created by the minio-init container defined in docker-compose.yml

environ{'PATH': '/usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',
        'HOSTNAME': 'a7e8edb448e6',
        'AWS_ACCESS_KEY_ID': 'jupyteraccesskey',
        'AWS_SECRET_ACCESS_KEY': 'jupytersupersecretkey',
        'AWS_ENDPOINT_URL': 'http://minio:9000',
        'S3_BUCKET': 'test',
        'LANG': 'C.UTF-8',
        'GPG_KEY': 'A035C8C19219BA821ECEA86B64E628F8D684696D',
        'PYTHON_VERSION': '3.11.6',
        'PYTHON_PIP_VERSION': '23.2.1',
        'PYTHON_SETUPTOOLS_VERSION': '65.5.1',
        'PYTHON_GET_PIP_URL': 'https://github.com/pypa/get-pip/raw/9af82b715db434abb94a0a6f3569f43e72157346/public/get-pip.py',
        'PYTHON_GET_PIP_SHA256': '45a2bb8bf2bb5eff16fdd00faef6f29731831c7c59bd9fc2bf1f3bed511ff1fe',
        'HOME': '/root',
        'PYDEVD_USE_FRAME_EVAL': 'NO',
        'JPY_SESSION_NAME': '/notebooks/pyspark_delta_example.ipynb',
        'JPY_PARENT_PID': '1',
        'TERM': 'xterm-color',
        'CLICOLOR': '1',
        'FORCE_COLOR'

In [3]:
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
S3_BUCKET = os.environ.get("S3_BUCKET")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_ENDPOINT_URL = os.environ.get("AWS_ENDPOINT_URL")
# AWS_ACCESS_KEY_ID = "sparkaccesskey"
# S3_BUCKET = "test"
# AWS_SECRET_ACCESS_KEY = "sparksupersecretkey"
# AWS_ENDPOINT_URL = "http://minio:9000"

## Configure Pyspark to Connect to Minio and Enable Delta-Lake Format

In [4]:
# This cell may take some time to run the first time, as it must download the necessary spark jars
conf = pyspark.SparkConf()

## IF YOU ARE USING THE SPARK CONTAINERS, UNCOMMENT THE LINE BELOW TO OFFLOAD EXECUTION OF SPARK TASKS TO SPARK CONTAINERS
#conf.setMaster("spark://spark:7077")

conf.set("spark.jars.packages", 'org.apache.hadoop:hadoop-aws:3.3.3,io.delta:delta-core_2.12:2.1.0')
# conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')
conf.set('spark.hadoop.fs.s3a.endpoint', AWS_ENDPOINT_URL)
conf.set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID)
conf.set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)
conf.set('spark.hadoop.fs.s3a.path.style.access', "true")
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

sc = pyspark.SparkContext(conf=conf)

# sc.setLogLevel("INFO")

:: loading settings :: url = jar:file:/usr/local/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-df536035-8985-4a6f-a615-52193b00baaf;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.3 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.3/hadoop-aws-3.3.3.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.3!hadoop-aws.jar (74ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.1.0/delta-core_2.12-2.1.0.jar ...
	[SUCCESSFUL 

23/10/14 14:45:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
spark = pyspark.sql.SparkSession(sc)

## Read in Sample CSV Data from Local Filesystem

In [None]:
df = spark.read.option("header", "true").csv("/data/appl_stock.csv")

In [None]:
df.show()

In [None]:
df.printSchema()

## Modify Column Types

In [None]:
for col in ["Open", "High", "Low", "Close", "AdjClose"]:
    df = df.withColumn(col,df[col].cast('double'))
for col in ["Volume"]:
    df = df.withColumn(col, df[col].cast('int'))

In [None]:
df.printSchema()

## Write CSV Directly to Minio (Not as a Delta Table)

In [None]:
df.write.csv(f"s3a://{S3_BUCKET}/appl_stock.csv", mode="overwrite")

**Navigate to http://localhost:9090 and login to the Minio Console to see the CSV file**

(username and password for minio can be found in the environment variables section of the minio service definition in the docker-compose.yml)

# Write a Delta Lake Table in Minio using Spark

In [None]:
# Have to replace spaces in column names with underscores for Delta
delta_df = df
for col in delta_df.columns:
    delta_df = delta_df.withColumnRenamed(col, col.replace(" ","_"))

In [None]:
delta_df.show()

In [None]:
delta_df.printSchema()

## Create Month and Year columns for partitioning

In [None]:
from pyspark.sql.functions import month, year

In [None]:
delta_df = delta_df.withColumn("Month", month(delta_df.Date))
delta_df = delta_df.withColumn("Year", year(delta_df.Date))

In [None]:
delta_table_name = "appl_stock_delta_table"

In [None]:
delta_df.write.format("delta").partitionBy('Year','Month').option("overwriteSchema", "true").save(f"s3a://{S3_BUCKET}/{delta_table_name}", mode="overwrite")

**Navigate to http://localhost:9090 and login to the Minio Console to see the Delta Lake Table**

**Note that the Delta Lake Table includes both the data partitions and the metadata log**

(username and password for minio can be found in the environment variables section of the minio service definition in the docker-compose.yml)

# Read the Delta Table Back into Spark

In [None]:
new_delta_df = spark.read.format("delta").load(f"s3a://{S3_BUCKET}/{delta_table_name}")

In [None]:
new_delta_df.show()

## Delete Data From Delta Table

In [None]:
from delta.tables import *

In [None]:
delta_table = DeltaTable.forPath(spark, f"s3a://{S3_BUCKET}/{delta_table_name}")

In [None]:
delta_table.delete("Date < '2010-02-01'")

In [None]:
# delta_table.vacuum()

# .vacuum() is not really necessary for this example. For more info, see https://docs.delta.io/latest/delta-utility.html#remove-files-no-longer-referenced-by-a-delta-table

In [None]:
updated_df = delta_table.toDF()

In [None]:
updated_df.describe().show()
# Notice the min date due to the delete above

## Use Time Travel to READ a Previous Version of the Delta Table

In [None]:
previous_df = spark.read.format("delta").option("versionAsOf", 0).load(f"s3a://{S3_BUCKET}/{delta_table_name}")
previous_df.describe().show()
# Notice the min date, showing that we are reading from a previous version

In [None]:
delta_table.history().show()

## Use Time Travel to RESTORE a Previous Version of the Delta Table

In [None]:
from datetime import datetime

In [None]:
# Capture a timestamp before we restore the delta table so we can see how to use a timestamp to do restore later on
pre_restore_time = datetime.now().strftime("%Y-%m-%d %X")

In [None]:
# Restore to a numbered version, and show the result summary of the restore operation
delta_table.restoreToVersion(0).show()

In [None]:
delta_table.history().show()

In [None]:
## We can always un-restore (or restore a more recent version) because restoring is a metadata-only operation
##  (i.e. the data files themselves are not modified)

In [None]:
#delta_table.restoreToVersion(1)

# Instead of using restoreToVersion, we can use a timestamp to revert to the table as it was at a specific time
delta_table.restoreToTimestamp(pre_restore_time).show()

In [None]:
delta_table.history().show()

# Trigger Trino to Automatically Infer Schema from Delta Table and Make Data Available for End User Querying / Dashboarding

In [None]:
import requests
import json
from time import sleep

In [None]:
delta_table_name = "appl_stock_delta_table"
delta_schema_name = "my_schema"

In [None]:
# Utility function to simplify query execution against Trino REST API
def execute_trino_query(query, statement_endpoint = "http://trino:8080/v1/statement", user = "admin", password = ""):
    
    print(f"Executing query:\n{query}")
    res = requests.post(statement_endpoint,data = query.encode("UTF8"), auth=requests.auth.HTTPBasicAuth(user,password))
    
    data = []
    cols = None
    while True:
        json_res = res.json()
        state = json_res.get("stats").get("state")
        print(f"State: {state}")

        res_data = json_res.get("data")
        if res_data:
            data.extend(res_data)
        
        res_cols = json_res.get("columns")
        if res_cols:
            cols = [i["name"] for i in res_cols]
            
        next_uri = json_res.get("nextUri")
        if next_uri:
            sleep(.5)
            res = requests.get(next_uri)
        else:
            if state == "FAILED":
                raise Exception(res.content)
            return [dict(zip(cols, d)) for d in data]
                
            


## Trigger Trino to Read Delta Table Schema

In [None]:
create_schema_statement = f"""
CREATE SCHEMA IF NOT EXISTS delta.my_schema
WITH (location = 's3a://{S3_BUCKET}/')
"""

register_table_statement = f"""CALL delta.system.register_table(schema_name => '{delta_schema_name}', table_name => '{delta_table_name}', table_location => 's3a://{S3_BUCKET}/{delta_table_name}')"""


In [None]:
for query in [create_schema_statement, register_table_statement]:
    print(execute_trino_query(query))

## Query Data from Table 

In [None]:
LIMIT = 10
select_statement = f"SELECT * FROM delta.{delta_schema_name}.{delta_table_name}"
if LIMIT and type(LIMIT) == int:
    select_statement += f" LIMIT {LIMIT}"

In [None]:
data = execute_trino_query(select_statement)

In [None]:
print(data)

## Create a New Delta Lake Table Using Trino 'CREATE TABLE AS'

In [None]:
statement = f"CREATE TABLE delta.{delta_schema_name}.{delta_table_name}_copy AS (SELECT * FROM delta.{delta_schema_name}.{delta_table_name} LIMIT 10)"

In [None]:
data = execute_trino_query(statement)

In [None]:
statement = f"SELECT * FROM delta.{delta_schema_name}.{delta_table_name}_copy LIMIT 10"

In [None]:
data = execute_trino_query(statement)

In [None]:
data

# Use Superset API To Add Connection to Trino Delta Lake Database

### NOTE: THE STEPS BELOW WILL ONLY WORK IF YOU ARE ALSO USING THE SUPERSET CONTAINERS

In [None]:
SUPERSET_BASE_URL = "http://superset:8088"
TOKEN_ENDPOINT = f"{SUPERSET_BASE_URL}/api/v1/security/login"

In [None]:
data = {
  "password": "admin",
  "provider": "db",
  "refresh": True,
  "username": "admin"
}
headers = {
    "Content-Type":"application/json",
    "Accept":"application/json"
}
res = requests.post(TOKEN_ENDPOINT, data=json.dumps(data), headers=headers)

In [None]:
auth_token = res.json()["access_token"]
headers["Authorization"] = f"Bearer {auth_token}"

In [None]:
### Disabled CSRF Token in Superset config.py

# CSRF_ENDPOINT = f"{SUPERSET_BASE_URL}/api/v1/security/csrf_token/"

# res = requests.get(CSRF_ENDPOINT,headers=headers)

# csrf_token = res.json()["result"]
# headers["X-CSRFToken"] = csrf_token

In [None]:
DATABASE_ENDPOINT = f"{SUPERSET_BASE_URL}/api/v1/database/"

In [None]:
data = {
    "database_name": "Delta",
    "engine": "trino",
    "configuration_method": "sqlalchemy_form",
    "catalog": [
        {
            "name": "",
            "value": ""
        }
    ],
    "sqlalchemy_uri": "trino://trino@trino:8080/delta",
    "expose_in_sqllab": True,
    "allow_ctas": True,
    "allow_cvas": True,
    "allow_dml": True,
    "extra_json": {
        "allows_virtual_table_explore": True
    },
    "extra": '{"allows_virtual_table_explore":true,"metadata_params":{},"engine_params":{},"schemas_allowed_for_file_upload":[]}'
}

In [None]:
res = requests.post(DATABASE_ENDPOINT, data=json.dumps(data), headers=headers)

In [None]:
headers

In [None]:
res.content