In [None]:
!pip install delta-spark==2.3.0

In [None]:
from delta.pip_utils import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType

In [None]:
import requests
oddrn_key = "local"
request = {"name":"spark", "oddrn":"//spark/host/local"}
response = requests.post("http://odd-platform:8080/api/datasources", json=request)
response.raise_for_status()

In [None]:
packages = [
    "org.apache.hadoop:hadoop-aws:3.3.2",
    "com.amazonaws:aws-java-sdk-bundle:1.11.1026",
    "io.delta:delta-core_2.12:2.3.0",
]

In [None]:
spark = (
    SparkSession.builder.master("local")
    .appName("DeltaLakeFundamentals")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.region", "eu-central-1")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.odd.host.url", "http://odd-platform:8080")
    .config("spark.odd.oddrn.key", "local")
    .config("spark.jars", "/jars/odd-spark-adapter-local.jar")
    .config(
        "spark.extraListeners", "org.opendatadiscovery.adapters.spark.ODDSparkListener"
    )
)

spark = configure_spark_with_delta_pip(spark, extra_packages=packages).getOrCreate()
# spark._sc.setLogLevel("WARN")

In [None]:
SCHEMA = StructType(
    [
        StructField("id", StringType(), True),  # ACCIDENT ID
        StructField("data_inversa", StringType(), True),  # DATE
        StructField("dia_semana", StringType(), True),  # DAY OF WEEK
        StructField("horario", StringType(), True),  # HOUR
        StructField("uf", StringType(), True),  # BRAZILIAN STATE
        StructField("br", StringType(), True),  # HIGHWAY
        # AND OTHER FIELDS OMITTED TO MAKE THIS CODE BLOCK SMALL
    ]
)

df_acidentes = (
    spark.read.format("csv")
    .option("delimiter", ";")
    .option("header", "true")
    .option("encoding", "ISO-8859-1")
    .schema(SCHEMA)
    .load("/data/datatran.csv")
)
df_acidentes.show(5)

In [None]:
df_acidentes\
    .write\
    .format("delta")\
    .option("overwriteSchema", "true")\
    .mode("overwrite")\
    .save("s3a://deltalake/accidents/")

In [None]:
spark.stop()