In [65]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lower, regexp_extract, split
import os

# spark = SparkSession.builder.appName(
#     "Ch02 - Analyzing the vocabulary of Pride and Prejudice."
# ).setMaster.getOrCreate()

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

book = spark.read.text("/opt/spark/data/pride-and-prejudice.txt")

lines = book.select(split(col("value"), " ").alias("line"))

words = lines.select(explode(col("line")).alias("word"))

words_lower = words.select(lower(col("word")).alias("word_lower"))
words_clean = words_lower.select(
    regexp_extract(col("word_lower"), "[a-z]*", 0).alias("word")
)
words_nonull = words_clean.where(col("word") != "")

results = words_nonull.groupby(col("word")).count()

results.orderBy(col("count").desc()).show(10)


+----+-----+
|word|count|
+----+-----+
| the| 4306|
|  to| 4139|
|  of| 3593|
| and| 3432|
| her| 2220|
|   a| 1926|
|  in| 1852|
| was| 1839|
|   i| 1754|
| she| 1682|
+----+-----+
only showing top 10 rows



In [66]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Apache Iceberg with PySpark")
    # .setAll([
    #     ("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog"),
    #     ("spark.sql.catalog.local.warehouse","/warehouse"),
    #     ("park.sql.catalog.data.catalog-impl","org.apache.iceberg.jdbc.JdbcCatalog"),
    #     ("spark.sql.catalog.local.uri", "jdbc:postgresql://postgres-db:5432/iceberg_db"),
    #     ("spark.sql.catalog.local.jdbc.user", "postgres"),
    #     ("spark.sql.catalog.local.jdbc.password", "postgres"),
    #     ("spark.sql.defaultCatalog","local")
    # ])
spark = SparkSession.builder.remote("sc://localhost:15002").config(conf=conf).getOrCreate()


# spark.master                           spark://spark-iceberg:7077
# spark.eventLog.enabled                 true
# spark.eventLog.dir                     /opt/spark/spark-events
# spark.history.fs.logDirectory          /opt/spark/spark-events
# spark.sql.extensions                   org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
# spark.sql.catalog.data                 org.apache.iceberg.spark.SparkCatalog
# spark.sql.catalog.data.warehouse       /home/iceberg/warehouse
# spark.sql.catalog.data.catalog-impl    org.apache.iceberg.jdbc.JdbcCatalog
# spark.sql.catalog.data.uri             jdbc:postgresql://pg-catalog:5432/iceberg
# spark.sql.catalog.data.jdbc.user       iceberg
# spark.sql.catalog.data.jdbc.password   iceberg
# spark.sql.defaultCatalog               data
# spark.sql.catalogImplementation        in-memory

In [67]:
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
path = "/opt/spark/data/Belgium.csv"
vaccinations: DataFrame = spark.read \
      .option("header", "true") \
      .option("inferSchema", "true") \
      .csv(path)

vaccinations \
  .withColumn("date", F.to_date(F.col("date"))) \
  .writeTo("local.db.vaccinations") \
  .create()

In [68]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data, schema=schema)
df.writeTo("local.db.test").create()