In [1]:
import os

import boto3
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

In [2]:
session = boto3.Session(
    aws_access_key_id = os.environ.get("MINIO_ROOT_USER"),
    aws_secret_access_key = os.environ.get("MINIO_ROOT_PASSWORD"),
)
s3 = session.resource('s3', 
  endpoint_url="http://minio:9000", 
  config=boto3.session.Config(signature_version='s3v4')
)

In [None]:
s3.create_bucket(Bucket='spark-data')

In [None]:
spark = SparkSession.builder. \
  config('spark.hadoop.fs.s3a.endpoint', 'http://minio:9000'). \
  config('spark.hadoop.fs.s3a.access.key', os.environ.get("MINIO_ROOT_USER")). \
  config('spark.hadoop.fs.s3a.secret.key', os.environ.get("MINIO_ROOT_PASSWORD")). \
  config('spark.hadoop.fs.s3a.path.style.access', "true"). \
  config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").\
  config('spark.jars', "/opt/spark_jars/delta-spark_2.12-3.2.0.jar,/opt/spark_jars/hadoop-aws-3.3.4.jar,/opt/spark_jars/aws-java-sdk-bundle-1.12.262.jar"). \
  config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0,org.apache.hadoop:hadoop-aws:3.3.4"). \
  config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"). \
  config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"). \
  master("spark://spark-master:7077").getOrCreate()  

In [29]:
transactions = pd.DataFrame({
    "id": [i for i in range(10)],
    "date": ["2024-01-01"] * 4 + ["2024-02-01"] * 6,
    "cust_id": [0, 0, 0, 1, 1, 2, 2, 2, 3, 3],
    "prod_id": [0, 1, 2, 0, 1, 2, 2, 3, 0, 3],
    "volume": [1, 1, 1, 1, 2, 2, 3, 3, 4, 4],
    "revenue": [10, 15, 40, 10, 30, 80, 120, 150, 40, 160]
}).set_index("id")

In [30]:
transactions_spark = spark.createDataFrame(transactions)

In [None]:
transactions_spark.show()

In [None]:
transactions_spark.write.format('delta').mode('overwrite').save('s3a://spark-data/transactions')

In [36]:
customer_revenue = transactions_spark.groupBy("cust_id").agg(sum("revenue").alias("total_revenue")).sort("total_revenue", ascending = False)

In [None]:
customer_revenue.collect()

In [None]:
customer_revenue.show()

In [39]:
customer_revenue.write.format('delta').mode('overwrite').save('s3a://spark-data/customer_revenue')

### Clean up

In [40]:
spark.stop()

In [None]:
s3.Bucket("spark-data").objects.all().delete()
s3.Bucket("spark-data").delete()