In [1]:
!pip install delta-spark

Collecting delta-spark
  Downloading delta_spark-3.0.0-py3-none-any.whl.metadata (2.0 kB)
Collecting py4j==0.10.9.7 (from pyspark<3.6.0,>=3.5.0->delta-spark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading delta_spark-3.0.0-py3-none-any.whl (21 kB)
Installing collected packages: py4j, delta-spark
Successfully installed delta-spark-3.0.0 py4j-0.10.9.7


In [1]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import random 
from random import randint
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
from delta.tables import DeltaTable

In [2]:
spark_conf = (
    SparkConf()
        .set("spark.jars.packages", 'org.apache.hadoop:hadoop-client:3.3.4,org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-spark_2.12:3.0.0')
        .set("spark.driver.memory", "6g")
        .set("spark.hadoop.fs.s3a.endpoint", "minio:9000")
        .set("spark.hadoop.fs.s3a.access.key", "pPZWyzFK4auwPEHCrmuw")
        .set("spark.hadoop.fs.s3a.secret.key", "M8i0y8Ef9jDntigOXfTXhnqoCpZ5v7NVHwC7NhRd")
        .set("spark.hadoop.fs.s3a.path.style.access", "true")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")

        .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    )

sc = SparkContext.getOrCreate(spark_conf)
spark = SparkSession(sc)

In [4]:
print(f"Hadoop version = {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")
print(f"Spark version = {spark.version}")

Hadoop version = 3.3.4
Spark version = 3.5.0


### Data generation \\\

On the bronze level, generate three tables per day sails of some product. The sale consists of 3 columns: user, product, and time. Generate table for product: name and price. Generate table for user: name and location. In total bronze level will consist of 5 tables: sale_day_1, sale_day_2, sale_day_3, users, and products. 


In [7]:
import datetime

product = []  #product, price 
total_sales = []  #user_id, product, volume, amount, date
users = [] #user_id, location

for _ in range(50):
    product_name = f'product_{_}'
    product_price = random.randint(2, 50)
    product.append((product_name, product_price))

for _ in range(1000):
    user_id = _
    user_localization = f'localization_{random.randint(100,200)}'
    users.append((user_id, user_localization))
    
for _ in range(1, 4):
    sales = []
    date = datetime.datetime(2018, 6, _).strftime("%x")
    for i in range(random.randint(50_000, 100_000)):
        buy = random.choice(product)
        volume = random.randint(1,5)
        sales.append(
            (random.choice(users)[0],
             buy[0],
             volume,
             volume*buy[1],
             date
                        )
               )
    total_sales.append(sales)
    

In [8]:
users_df = spark.createDataFrame(users, ['user_id', 'localization'])
product_df = spark.createDataFrame(product, ['product_name', 'price'])
sales_one_df = spark.createDataFrame(total_sales[0], ['user_id', 'product_name', 'volume', 'amount', 'date'])
sales_two_df = spark.createDataFrame(total_sales[1], ['user_id', 'product_name', 'volume', 'amount', 'date'])
sales_three_df = spark.createDataFrame(total_sales[2], ['user_id', 'product_name', 'volume', 'amount', 'date'])

In [9]:
sales_one_df.show()

+-------+------------+------+------+--------+
|user_id|product_name|volume|amount|    date|
+-------+------------+------+------+--------+
|    599|  product_11|     4|    12|06/01/18|
|    328|  product_42|     4|   184|06/01/18|
|    471|   product_6|     1|    29|06/01/18|
|    224|  product_37|     1|    32|06/01/18|
|    157|   product_1|     3|    84|06/01/18|
|    891|  product_32|     5|    15|06/01/18|
|    464|   product_6|     3|    87|06/01/18|
|    105|  product_33|     1|    31|06/01/18|
|    970|  product_35|     2|    78|06/01/18|
|     37|  product_21|     5|    45|06/01/18|
|    911|  product_18|     1|    34|06/01/18|
|    172|  product_42|     1|    46|06/01/18|
|    603|  product_30|     4|    96|06/01/18|
|    979|  product_25|     4|    84|06/01/18|
|    590|  product_20|     2|    96|06/01/18|
|     12|  product_23|     4|    20|06/01/18|
|    163|  product_43|     3|    24|06/01/18|
|     27|  product_48|     5|    75|06/01/18|
|    108|  product_29|     5|   19

In [10]:
users_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/users')
product_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/products')
sales_one_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/day-one')
sales_two_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/day-two')
sales_three_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/day-three')

In [29]:
users_df = spark.read.format('delta').load('s3a://delta-bucket/users')
product_df = spark.read.format('delta').load('s3a://delta-bucket/products')
sales_one_df = spark.read.format('delta').load('s3a://delta-bucket/day-one')
sales_two_df = spark.read.format('delta').load('s3a://delta-bucket/day-two')
sales_three_df = spark.read.format('delta').load('s3a://delta-bucket/day-three')

In [4]:
product_df.printSchema()
sales_one_df.count()

root
 |-- product_name: string (nullable = true)
 |-- price: long (nullable = true)



80397

In [30]:
sales_one_df.createOrReplaceTempView('one')
sales_two_df.createOrReplaceTempView('two')
sales_three_df.createOrReplaceTempView('three')

In [31]:
total_sales_df = spark.sql("""
(SELECT * FROM one) UNION ALL (SELECT * FROM two) UNION ALL (SELECT * FROM three)
""")

In [31]:
total_sales_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/total-sales')

In [3]:
total_sales_df = spark.read.format('delta').load('s3a://delta-bucket/total-sales')

In [6]:
joined_df = total_sales_df.join(users_df, 'user_id', 'left').join(product_df, 'product_name', 'left')

In [7]:
joined_df.count()

In [35]:
joined_df.show()

+------------+-------+------+------+--------+----------------+-----+
|product_name|user_id|volume|amount|    date|    localization|price|
+------------+-------+------+------+--------+----------------+-----+
|   product_4|     50|     2|    68|06/01/18|localization_146|   34|
|  product_27|     72|     3|    21|06/01/18|localization_163|    7|
|  product_23|    420|     5|    25|06/01/18|localization_112|    5|
|  product_27|    524|     2|    14|06/01/18|localization_134|    7|
|  product_39|    698|     4|    44|06/01/18|localization_140|   11|
|  product_22|     98|     3|   120|06/01/18|localization_177|   40|
|  product_28|    431|     4|   156|06/01/18|localization_133|   39|
|  product_21|    848|     3|    27|06/01/18|localization_180|    9|
|  product_24|    675|     4|   112|06/01/18|localization_181|   28|
|  product_49|    115|     5|   105|06/01/18|localization_118|   21|
|  product_43|    154|     3|    24|06/01/18|localization_142|    8|
|  product_24|    404|     1|    2

In [36]:
joined_df.write.format('delta').mode('overwrite').save('s3a://delta-bucket/data-joined')

In [8]:
joined_df = spark.read.format('delta').load('s3a://delta-bucket/data-joined')

In [7]:
joined_df.show()

+------------+-------+------+------+--------+----------------+-----+
|product_name|user_id|volume|amount|    date|    localization|price|
+------------+-------+------+------+--------+----------------+-----+
|  product_21|    422|     2|    18|06/03/18|localization_172|    9|
|  product_24|    943|     4|   112|06/03/18|localization_125|   28|
|  product_45|     91|     3|   147|06/03/18|localization_145|   49|
|  product_30|    296|     4|    96|06/03/18|localization_119|   24|
|  product_47|    150|     5|   190|06/03/18|localization_179|   38|
|   product_8|    165|     1|    33|06/03/18|localization_195|   33|
|  product_26|    952|     3|    51|06/03/18|localization_122|   17|
|  product_23|    926|     4|    20|06/03/18|localization_129|    5|
|  product_26|    549|     3|    51|06/03/18|localization_128|   17|
|   product_3|    822|     2|    56|06/03/18|localization_121|   28|
|  product_20|    226|     3|   144|06/03/18|localization_128|   48|
|   product_2|    746|     5|    2

In [9]:
joined_df.createOrReplaceTempView('widetable')

In [10]:
sales_per_location = spark.sql("""
SELECT localization, SUM(amount) AS total_sales FROM widetable GROUP BY localization ORDER BY SUM(amount) DESC
""")

In [12]:
sales_per_location.write.format('delta').mode('overwrite').save('s3a://delta-bucket/sales-per-location')

In [11]:
sales_per_location.show()

+----------------+-----------+
|    localization|total_sales|
+----------------+-----------+
|localization_176|     325989|
|localization_140|     299326|
|localization_105|     287470|
|localization_164|     283557|
|localization_179|     279763|
|localization_178|     275498|
|localization_145|     274521|
|localization_146|     266097|
|localization_128|     264532|
|localization_129|     257876|
|localization_187|     253153|
|localization_200|     243427|
|localization_141|     233982|
|localization_112|     231792|
|localization_101|     225195|
|localization_192|     224865|
|localization_137|     222422|
|localization_170|     216669|
|localization_121|     207641|
|localization_120|     205086|
+----------------+-----------+
only showing top 20 rows



In [14]:
sales_per_day = spark.sql("""
SELECT date, SUM(amount) AS total_sales FROM widetable GROUP BY date ORDER BY SUM(amount) DESC
""")
sales_per_day.write.format('delta').mode('overwrite').save('s3a://delta-bucket/sales-per-day')

In [12]:
sales_per_day.show()

+--------+-----------+
|    date|total_sales|
+--------+-----------+
|06/01/18|    6232967|
|06/02/18|    5588355|
|06/03/18|    5504737|
+--------+-----------+



In [21]:
sales_per_product = spark.sql("""
SELECT product_name, price, SUM(amount)
FROM widetable
GROUP BY product_name, price
ORDER BY SUM(amount) DESC
""")


In [20]:
sales_per_product.write.format('delta').mode('overwrite').save('s3a://delta-bucket/sales-per-product')

In [23]:
sales_per_product.show()

+------------+-----+-----------+
|product_name|price|sum(amount)|
+------------+-----+-----------+
|  product_45|   49|     675171|
|  product_20|   48|     643680|
|  product_42|   46|     626980|
|  product_38|   46|     604716|
|   product_5|   43|     575297|
|  product_31|   42|     560280|
|  product_40|   41|     548867|
|  product_22|   40|     546600|
|  product_17|   41|     541159|
|  product_14|   39|     534495|
|  product_35|   39|     533208|
|  product_28|   39|     520806|
|  product_46|   38|     509314|
|  product_47|   38|     508326|
|  product_29|   38|     504488|
|  product_10|   36|     476568|
|  product_18|   34|     449514|
|   product_4|   34|     449106|
|   product_8|   33|     432432|
|  product_33|   31|     423181|
+------------+-----+-----------+
only showing top 20 rows



In [24]:
users_delta_table = DeltaTable.forPath(spark, 's3a://delta-bucket/users')


In [26]:
updated_data = [(1, 'Warszawa'), (2, 'Berlin')]
updated_df = spark.createDataFrame(updated_data, ['user_id', 'new_localization'])

users_delta_table.alias("t").merge(
    updated_df.alias("s"),
    "t.user_id % 13 = s.user_id"
).whenMatchedUpdate(
    set={"localization": "s.new_localization"}
).execute()


users_delta_table.toDF().filter(f.col('user_id').isin([1,2,3,4,14,15,16,27,28,29])).show()

+-------+----------------+
|user_id|    localization|
+-------+----------------+
|     27|        Warszawa|
|      1|        Warszawa|
|     14|        Warszawa|
|     29|localization_186|
|      3|localization_152|
|     16|localization_118|
|     28|          Berlin|
|      2|          Berlin|
|     15|          Berlin|
|      4|localization_178|
+-------+----------------+



In [28]:
users_delta_table.toDF().write.format('delta').mode('overwrite').save('s3a://delta-bucket/updated-users')

In [32]:
joined_df_updated = total_sales_df.join(users_delta_table.toDF(), 'user_id', 'left').join(product_df, 'product_name', 'left')

In [34]:
joined_df_updated.createOrReplaceTempView('widetableudated')

In [37]:
sales_per_location = spark.sql("""
SELECT localization, SUM(amount) AS total_sales FROM widetableudated WHERE localization = 'Berlin' OR localization = 'Warszawa' GROUP BY localization ORDER BY SUM(amount) DESC
""")

In [38]:
sales_per_location.show()

+------------+-----------+
|localization|total_sales|
+------------+-----------+
|    Warszawa|    1338182|
|      Berlin|    1325923|
+------------+-----------+



In [None]:
sales_in_Warsaw_Berlin.write.format('delta').mode('overwrite').save('s3a://delta-bucket/sales-per-product')