# Do this once - Spark initialization 


In [12]:
!pip install pyspark==3.5.0 ipython-sql sqlalchemy delta-spark==3.0.0 pandas

Collecting ipython-sql
  Downloading ipython_sql-0.5.0-py3-none-any.whl.metadata (17 kB)
Collecting delta-spark==3.0.0
  Downloading delta_spark-3.0.0-py3-none-any.whl.metadata (2.0 kB)
Collecting py4j==0.10.9.7 (from pyspark==3.5.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting prettytable (from ipython-sql)
  Downloading prettytable-3.10.0-py3-none-any.whl.metadata (30 kB)
Collecting sqlparse (from ipython-sql)
  Downloading sqlparse-0.4.4-py3-none-any.whl.metadata (4.0 kB)
Downloading delta_spark-3.0.0-py3-none-any.whl (21 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hDownloading ipython_sql-0.5.0-py3-none-any.whl (20 kB)
Downloading prettytable-3.10.0-py3-none-any.whl (28 kB)
Downloading sqlparse-0.4.4-py3-none-any.whl (41 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, min, count

def configure_spark_for_s3(spark: SparkSession):
    connection_time_out = "600000"
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.endpoint", os.environ["MINIO_URL"]
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"]
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"]
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.connection.timeout", connection_time_out
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "spark.sql.debug.maxToStringFields", "100"
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.path.style.access", "true"
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.connection.ssl.enabled", "false"
    )
    spark.sparkContext._jsc.hadoopConfiguration().set(
        "fs.s3a.aws.credentials.provider",
        "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    )


def create_spark():
    spark = (
        SparkSession.builder
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .master("local")
        .getOrCreate()
    )
    configure_spark_for_s3(spark)
    return spark

In [2]:
spark = create_spark()

# RECORD GENERATOR

## Functions to generate a JSON dataset to pick up

In [3]:
import random
import string
from datetime import datetime
import time
import os

# Method to return a random User ID between 1 and 10 (set low for testing some stateful streaming aggregations, higher for more variability)
def returnUserId():
  return random.randint(1, 6)

# Return a random float value for different purposes, rounded to 4 places
def returnValue():
  return round(random.uniform(111.1111, 9999999999.9999), 4)

# Method to return a string of random characters - hard-coded to length of 30
def returnString():
  letters = string.ascii_letters
  return ( ''.join(random.choice(letters) for i in range(30)) )

def returnTransactionTimestamp():
  currentDateTime = datetime.now()
  return currentDateTime.strftime("%Y-%m-%d %H:%M:%S.%f")

# Generate a record
def generateRecord():
  return (returnUserId(), returnString(), returnValue(), returnValue(), returnValue(), returnTransactionTimestamp())
  
# Generate a list of records
def generateRecordSet(recordCount):
  recordSet = []
  for x in range(recordCount):
    recordSet.append(generateRecord())
  return recordSet

# Generate a set of data, convert it to a Dataframe, write it out as one json file in a temp location, 
# move the json file to the desired location that the Auto Loader will be watching and then delete the temp location
def writeJsonFile(recordCount):
  recordColumns = ["userId", "stringCode", "value1", "value2", "value3", "transactionTimestamp"]
  recordSet = generateRecordSet(recordCount)
  recordDf = spark.createDataFrame(data=recordSet, schema=recordColumns)
  
  # Write out the json file with Spark in a temp location - this will create a directory with the file we want the Auto Loader to
  # pick up underneath it
  #recordDf.coalesce(1).write.format("json").save(tempPath)
  
  # Grab the file from the temp location, write it to the location we want and then delete the temp directory
  #tempJson = os.path.join(tempPath, dbutils.fs.ls(tempPath)[3][1])
  #dbutils.fs.cp(tempJson, destinationPath)
  #dbutils.fs.rm(tempPath, True)

  
  #recordDf.write.format("delta").save("s3a://silver/users")
  recordDf.write.mode("append").format("delta").save("s3a://silver/users")
  

# Define Record Count, Temporary Location, Auto Loader-Monitored Location and Sleep Interval Here

In [8]:
import threading
import time

# Assuming your data generation and writeJsonFile functions are defined in previous cells or in the same cell

def continuous_data_write(recordCount, stop_event):
    while not stop_event.is_set():
        writeJsonFile(recordCount)  # Ensure this matches the definition of writeJsonFile
        time.sleep(sleepIntervalSeconds)

# Define parameters for the data generation
recordCount = 5
sleepIntervalSeconds = 1
destinationPath = "s3a://silver/users"

# Create a stop event
stop_event = threading.Event()

# Start the background thread
data_thread = threading.Thread(target=continuous_data_write, args=(recordCount, stop_event))
data_thread.start()


In [4]:
# To stop the data generation
stop_event.set()
data_thread.join()  # Ensure the thread has finished

NameError: name 'stop_event' is not defined

In [9]:
recordCount=5
# tempPath = "s3://my/path/here/temp"
sleepIntervalSeconds = 1

while True:
  writeJsonFile(recordCount)
  time.sleep(sleepIntervalSeconds)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [12]:
df = spark.read.format("delta").load("s3a://silver/users")
# display(df.select(count("*")).toPandas())
display(df.select("*").toPandas())

Unnamed: 0,userId,stringCode,value1,value2,value3,transactionTimestamp
0,6,zLwpeBrIXpdDxguPytJdzskbZZcuGj,2.329014e+08,9.562096e+09,6.677291e+09,2024-03-13 13:06:13.709474
1,5,DVsWzAuzWbcFVMJvwZXFfnDHzaFLFH,7.758462e+09,5.586205e+09,9.793474e+09,2024-03-13 13:06:13.710086
2,5,rikINBzelrOaQSrefDbkEpdgdZNmPc,4.799428e+09,2.648775e+09,2.412623e+09,2024-03-13 13:06:13.710198
3,2,DAQJzEOCqhYMULxfwEemAtJpqxlHui,9.954533e+09,6.120536e+09,3.858808e+09,2024-03-13 13:06:13.710265
4,3,awTOHjBiKDqglMKqNUGQCPPWxwwiRQ,4.247559e+09,4.874784e+09,3.916491e+09,2024-03-13 13:06:13.710417
...,...,...,...,...,...,...
535,1,rkgazhEfOsQDRwyxdKEMuUOdgIOJsl,2.178721e+08,5.362222e+09,4.471909e+09,2024-03-13 13:03:34.541247
536,4,FLsgKtfgVcIuPOsjQmbIxGrpTUhguh,7.982872e+09,7.151681e+07,8.475935e+09,2024-03-13 13:03:34.541321
537,5,OsZalcfOCYTuFTGZHMLzrKtBLOqGlL,1.301974e+09,3.290370e+09,4.548687e+08,2024-03-13 13:03:34.541347
538,2,bfidqLhzfkfNztiXLyPbFjZdhhmpSa,6.754345e+09,1.898216e+09,5.684968e+09,2024-03-13 13:03:34.541371
