In [0]:
%pip install Faker
%pip install --upgrade typing-extensions


In [0]:
dbutils.library.restartPython()

In [0]:
from pyspark.sql import SparkSession
import datetime
import re
import os
import time
import pyspark
from pyspark.sql.types import StructType, StructField, BooleanType, DoubleType, LongType
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.types import IntegerType
from pyspark.sql import Row
from datetime import date
from faker import Faker
import random
import numpy as np


In [0]:
spark = (
    SparkSession.builder
    .appName("Spark Streamer Example")
    .master("local[*]")
    .config("spark.driver.memory", "4g")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.python.worker.reuse", "false")

    # ===== ALL PACKAGES IN ONE PLACE =====
    # .config(
    #     "spark.jars.packages",
    #     ",".join([
    #         "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.5.6",
    #         "org.postgresql:postgresql:42.7.7"
    #     ])
    # )

    # ===== EXTENSIONS =====
    # .config(
    #     "spark.sql.extensions",
    #     "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
    #     "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
    # )


    .getOrCreate()
)

spark.conf.set("spark.sql.legacy.parquet.nanosAsLong", "true")
spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

print("Spark Session Started")

In [0]:
def createFakePersonRecords(n=1000):
    faker = Faker()
    Faker.seed(0)
    fake_records = []
    for i in range(n):
        rand_num = random.randint(1, 100)

        if rand_num % 2 == 0:
                record =   {
                'first_name': faker.first_name(),
                'last_name': faker.last_name(),
                'email': faker.ascii_company_email(),
                'phone': faker.phone_number(),
                'address': faker.address(),
                'city': faker.city(),
                'state': faker. state_abbr(),
                'zip': faker.zipcode(),
                'country': faker.country(),
                'date_of_birth': datetime.datetime.strptime(faker.date(), "%Y-%m-%d"),
                'salary': faker.random_int(min=30000, max=100000),
                'department': faker.job(),
                'hired_date': faker.date_this_year(),
                'id': faker.uuid4(),
                'manager': None
                
            }
        else:
            record =   {
            'first_name': faker.first_name(),
            'last_name': faker.last_name(),
            'email': faker.ascii_company_email(),
            'phone': faker.phone_number(),
            'address': faker.address(),
            'city': faker.city(),
            'state': faker. state_abbr(),
            'zip': faker.zipcode(),
            'country': faker.country(),
            'date_of_birth': datetime.datetime.strptime(faker.date(), "%Y-%m-%d"),
            'salary': faker.random_int(min=30000, max=100000),
            'department': faker.job(),
            'hired_date': faker.date_this_year(),
            'id': faker.uuid4(),
            'manager': faker.first_name()
            
        }
            
        fake_records.append(

            record
        )
    return fake_records


In [0]:
fake_records_schema = StructType([
    StructField('first_name', StringType(), False),
    StructField('last_name', StringType(), False),
    StructField('email', StringType(), False),
    StructField('phone', StringType(), False),
    StructField('address', StringType(), False),
    StructField('city', StringType(), False),
    StructField('state', StringType(), False),
    StructField('zip', StringType(), False),
    StructField('country', StringType(), False),
    StructField('date_of_birth', DateType(), False),
    StructField('salary', IntegerType(), False),
    StructField('department', StringType(), False),
    StructField('hired_date', DateType(), False),
    StructField('id', StringType(), False),
    StructField('manager', StringType(), True)

])

In [0]:
records = createFakePersonRecords(10000)


print(records[0])

df = spark.createDataFrame(records, schema=fake_records_schema)

df.createOrReplaceTempView('temp_records')

In [0]:
%sql

SELECT * From temp_records

In [0]:
%sql

CREATE OR REPLACE TABLE default.records_example
as SELECT * FROM temp_records;

In [0]:
%sql

SELECT * FROM default.records_example

In [0]:
%sql




In [0]:
spark \
    .readStream \
        .table('default.records_example')\
        .createOrReplaceTempView('records_example_stream_tmp')


In [0]:
display(
    spark.sql("SELECT COUNT(*) FROM records_example_stream_tmp"),
    checkpointLocation="/Volumes/mlb_demo/default/records_example/checkpointA/"
)

In [0]:
%sql
    

CREATE OR REPLACE TEMPORARY VIEW record_counts_tmp_vw
AS SELECT count(*) as record_count FROM records_example_stream_tmp

In [0]:
spark \
    .table('record_counts_tmp_vw') \
    .writeStream \
        .trigger(processingTime="3 seconds")\
        .outputMode("complete") \
        .option("checkpointLocation", "/Volumes/mlb_demo/default/records_example/checkpointB/")\
        .table("record_count_tb")

In [0]:
%sql


SELECT * FROM default.record_count_tb

In [0]:
records = createFakePersonRecords(1000)

df = spark.createDataFrame(records, schema=fake_records_schema)

df.createOrReplaceTempView('temp_records')

In [0]:
%sql

INSERT INTO default.records_example
 SELECT * FROM temp_records;