In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [9]:
spark = (
        SparkSession.builder
            .master("spark://spark1:7077")
            .appName("silver_to_gold")
            ## Config Fields
            .config('spark.sql.debug.maxToStringFields', 5000)
            .config('spark.debug.maxToStringFields', 5000)
            ## Optimize
            .config("delta.autoOptimize.optimizeWrite", "true")
            .config("delta.autoOptimize.autoCompact", "true")
            ## Delta Table
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.2.0")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            ## MinIO
            .config("spark.hadoop.fs.s3a.proxy.host", "minio1")
            .config("spark.hadoop.fs.s3a.proxy.port", "9000")
            .config("spark.hadoop.fs.s3a.access.key", "brew")
            .config("spark.hadoop.fs.s3a.secret.key", "brew4321")
            .config("spark.hadoop.fs.s3a.path.style.access", "true")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.hadoop.fs.s3a.connection.estabilish.timeout", "5000")
            ## Hive SQL
            .enableHiveSupport()
            .getOrCreate()
    )

In [10]:
spark

In [11]:
source_bucket = "silver"
prefix_bucket = "breweries"
source_path = f"s3a://{source_bucket}/{prefix_bucket}/"

In [12]:
silver_data = spark.read.format('delta').load(source_path)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [6]:
display(silver_data.head(5))

[Row(address_1='5725 Miller Ave', address_2=None, address_3=None, brewery_type='micro', city='Gary', country='United States', id='b51f3cdf-60ff-4ae1-94a7-76906c7d62eb', latitude='41.59928343', longitude='-87.26887786', name='18th Street Brewery', phone=None, postal_code='46403-2871', state='Indiana', state_province='Indiana', street='5725 Miller Ave', website_url='http://www.18thstreetbrewery.com'),
 Row(address_1='1411 NW Flanders St', address_2=None, address_3=None, brewery_type='large', city='Portland', country='United States', id='e432899b-7f58-455f-9c7b-9a6e2130a1e0', latitude='45.5259786', longitude='-122.6855056', name='10 Barrel Brewing Co', phone='5032241700', postal_code='97209-2620', state='Oregon', state_province='Oregon', street='1411 NW Flanders St', website_url='http://www.10barrel.com'),
 Row(address_1='6410 SE Milwaukie Ave', address_2=None, address_3=None, brewery_type='brewpub', city='Portland', country='United States', id='936c3d7e-5d54-4459-b72c-117cdda059b4', lati

In [None]:
agg_breweries_data = silver_data \
    .groupBy(col("country"), col("state"), col("city"), col("brewery_type")) \
    .withColumn("quantity", count(1))

In [13]:
agg_breweries_data = spark.sql(f'''
       SELECT
            country, state, city, brewery_type, count(1) as quantity
       FROM
          delta.`{source_path}`
       GROUP BY country, state, city, brewery_type
       ''')

In [16]:
display(agg_breweries_data.where("quantity > 1").head(5))

[Row(country='United States', state='Oregon', city='Bend', brewery_type='large', quantity=3)]

In [15]:
target_bucket = "gold"
prefix_bucket = "breweries"
target_path = f"s3a://{target_bucket}/{prefix_bucket}/"

In [18]:
agg_breweries_data.write.mode('overwrite') \
    .format('delta') \
    .partitionBy("brewery_type", "country", "state", "city") \
    .save(target_path)

In [19]:
spark.stop()