In [1]:
import pyspark
from delta import *

builder = (
  pyspark 
    .sql 
    .SparkSession 
    .builder 
    .appName("delta") 
    .master("spark://spark-master:7077")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.access.key","datalake") 
    .config("spark.hadoop.fs.s3a.secret.key","datalake") 
    .config("spark.hadoop.fs.s3a.endpoint","http://minio:9000") 
    .config("spark.hadoop.fs.s3a.path.style.access", "true") 
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") 
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)

spark = configure_spark_with_delta_pip(builder).enableHiveSupport().getOrCreate()

In [2]:
spark

In [15]:
# Import Libraries
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark import SQLContext
# Setup the Configuration

# Setup the Schema
schema = StructType([
    StructField("User_ID", IntegerType(),True),
    StructField("Username", StringType(),True),
    StructField("Browser", StringType(),True),
    StructField("OS", StringType(),True),
])
# Add Data
data = ([
    (1580, "Barry", "FireFox", "Windows" ),
    (5820, "Sam", "MS Edge", "Linux"),
    (2340, "Harry", "Vivaldi", "Windows"),
    (7860, "Albert", "Chrome", "Windows"),
    (1123, "May", "Safari", "macOS")
])
# Setup the Data Frame
user_data_df = spark.createDataFrame(data,schema=schema)

In [None]:
user_data_df.show(10, False)

In [None]:
(
    user_data_df
        .write
        .format('parquet')
        .mode('overwrite')
        .save('s3a://delta-lake/users_paquet')
)

In [16]:
(
    user_data_df
        .write
        .format('delta')
        .mode('overwrite')
        .save('s3a://delta-lake/users_delta')
)

In [17]:
from delta.tables import *
from pyspark.sql.functions import *

In [18]:
user_df = DeltaTable.forPath(spark, 's3a://delta-lake/users_delta')

In [19]:
user_df.toDF().show(10, False)

+-------+--------+-------+-------+
|User_ID|Username|Browser|OS     |
+-------+--------+-------+-------+
|2340   |Harry   |Vivaldi|Windows|
|7860   |Albert  |Chrome |Windows|
|1123   |May     |Safari |macOS  |
|1580   |Barry   |FireFox|Windows|
|5820   |Sam     |MS Edge|Linux  |
+-------+--------+-------+-------+



In [22]:
user_df.update(
  condition = expr("OS == 'Windows'"),
  set = { "OS": expr("concat('windows', '__', '_')") }
)


In [23]:
(
spark
    .read
    .format('delta')
    .load('s3a://delta-lake/users_delta')
    .show(10, False)
)

+-------+--------+-------+----------+
|User_ID|Username|Browser|OS        |
+-------+--------+-------+----------+
|2340   |Harry   |Vivaldi|windows___|
|7860   |Albert  |Chrome |windows___|
|1123   |May     |Safari |macOS     |
|1580   |Barry   |FireFox|windows___|
|5820   |Sam     |MS Edge|Linux     |
+-------+--------+-------+----------+



In [24]:
user_df.update(
  condition = expr("OS == 'windows___'"),
  set = { "OS": expr("'Windows'") }
)

In [25]:
(
spark
    .read
    .format('delta')
    .load('s3a://delta-lake/users_delta')
    .show(10, False)
)

+-------+--------+-------+-------+
|User_ID|Username|Browser|OS     |
+-------+--------+-------+-------+
|2340   |Harry   |Vivaldi|Windows|
|7860   |Albert  |Chrome |Windows|
|1123   |May     |Safari |macOS  |
|1580   |Barry   |FireFox|Windows|
|5820   |Sam     |MS Edge|Linux  |
+-------+--------+-------+-------+

