In [10]:

import findspark
from datetime import datetime
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, udf, col, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import os
from dotenv import load_dotenv

load_dotenv()



True

In [11]:
findspark.init()
topic_name = os.getenv('kafka_topic_name')

# spark_path = findspark.find()
# print(spark_path)
print(topic_name)



ticketmaster


In [None]:
credentials_location = os.getenv('gcp_credentials_path')

# Spark Config
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('TwitterSentimentAnalysis') \
    .set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .set("spark.jars", "gs://path/to/spark-bigquery-latest.jar,gs://path/to/google-cloud-bigquery-latest.jar,/path/to/local-jar-file.jar") \
    .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)\
    .set("spark.jars", "gcs-connector-hadoop3-2.2.5.jar") 


sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

# Spark Context
# sc = spark.sparkContext
# sc.setLogLevel('ERROR')

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [13]:
schema = StructType([
    StructField("event_name", StringType()),
    StructField("event_type", StringType()),
    StructField("event_id", StringType()),
    StructField("event_url", StringType()),
    StructField("venue_name", StringType()),
    StructField("venue_id", StringType()),
    StructField("venue_zipcode", StringType()),
    StructField("venues_timezone", StringType()),
    StructField("venue_city", StringType()),
    StructField("venue_state_full", StringType()),
    StructField("venue_state_short", StringType()),
    StructField("venue_country_name", StringType()),
    StructField("venue_country_short", StringType()),
    StructField("venue_address", StringType()),
    StructField("venue_longitude", StringType()),
    StructField("venue_latitude", StringType()),
    StructField("attraction_name", StringType()),
    StructField("attraction_type", StringType()),
    StructField("attraction_id", StringType()),
    StructField("attraction_url", StringType()),
    StructField("attraction_segment_id", StringType()),
    StructField("attraction_segment_name", StringType()),
    StructField("attraction_genre_id", StringType()),
    StructField("attraction_genre_name", StringType()),
    StructField("attraction_subgenre_id", StringType()),
    StructField("attraction_subgenre_name", StringType()),
    StructField("event_start_date", StringType()),
    StructField("ticket_status", StringType()),
    StructField("event_start_time", StringType()),
    StructField("currency", StringType()),
    StructField("min_price", StringType()),
    StructField("max_price", StringType())
])

23/06/24 15:00:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-71d3a546-2d9e-4f76-a6b9-bc85d9902881--2001384727-driver-0-1, groupId=spark-kafka-source-71d3a546-2d9e-4f76-a6b9-bc85d9902881--2001384727-driver-0] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:00:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-71d3a546-2d9e-4f76-a6b9-bc85d9902881--2001384727-driver-0-1, groupId=spark-kafka-source-71d3a546-2d9e-4f76-a6b9-bc85d9902881--2001384727-driver-0] Resetting offset for partition ticketmaster-0 to offset 4833.
23/06/24 15:00:20 INFO CheckpointFileManager: Writing atomically to gs://kafka-spark-data/spark-metadata/offsets/205 using temp file gs://kafka-spark-data/spark-metadata/offsets/.205.6d021bce-9f83-4973-bbdc-70b32df0eb66.tmp


In [None]:

PROJECT_ID = os.getenv('GCP_PROJECT_ID')
BUCKET = os.getenv('GCP_BUCKET')

dataset = os.getenv('GCP_dataset')
table = os.getenv('GCP_table')

gcs_metadata_folder = os.getenv('GCP_metadata_bucket')
gcs_data_folder = os.getenv('GCP_data_bucket')

print(PROJECT_ID)
print(BUCKET)
print(dataset)
print(gcs_data_folder)

In [None]:
##### STREAMING DATA PROCESSING #####

# Read the data from kafka
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic_name) \
    .option("startingOffsets", "latest") \
    .option("header", "true") \
    .load() \
    .selectExpr("CAST(value AS STRING)")
    
# Apply deserialization or further processing if needed
df1 = df.withColumn("parsed_data", from_json("value", schema))

### DATA TYPE CONVERSIONS ####
# Extract and convert the "venue_zipcode" as Integer
# df1 = df1.withColumn("venue_zipcode", col("parsed_data.venue_zipcode").cast(IntegerType()))
# # Extract and convert the coordinates as Double
# df1 = df1.withColumn("venue_longitude", col("parsed_data.venue_longitude").cast(DoubleType()))
# df1 = df1.withColumn("venue_latitude", col("parsed_data.venue_latitude").cast(DoubleType()))
# # Extract and Convert the event_start_date as Date 
# df1 = df1.withColumn("event_start_date", col("parsed_data.event_start_date").cast(DateType()))

df2 = df1.select("parsed_data.*")


df2.printSchema()

path = '/Users/nicburkett/Desktop/spark_output'


# # # Write to a local file
# # file_query = df2.writeStream \
# #     .format("csv") \
# #     .outputMode("append") \
# #     .option("header", "true") \
# #     .option("checkpointLocation", path) \
# #     .trigger(processingTime="10 seconds") \
# #     .start(path)

# # # WRITE TO GCS BUCKET 
# gcs_write = df2.writeStream \
#     .format("csv") \
#     .outputMode("append") \
#     .option("path","gs://kafka-spark-data/raw-spark-data") \
#     .option("checkpointLocation", "gs://kafka-spark-data/spark-metadata") \
#     .trigger(processingTime="10 seconds") \
#     .start() 

# gcs_write.awaitTermination()

# WRITE TO CONSOLE TO LOG 
# console_query = df2.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .trigger(processingTime="10 seconds") \
#     .start() \
#     .awaitTermination()

    # .foreachBatch(write_batch) \

gcs_bigquery_stream = df2.writeStream \
    .format("bigquery") \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", gcs_metadata_folder) \
    .option("temporaryGcsBucket", 'kafka-spark-data') \
    .option("table",'global-maxim-338114.twitter_kafka_pyspark_test.kafka_pyspark') \
    .option("mode", "FAILFAST") \
    .start()

    # .option("failOnDataLoss",'false') \

gcs_bigquery_stream.awaitTermination()

23/06/24 15:43:33 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-60b89fd2-ef43-43f2-8675-88739e516fa4/_temporary/0/_temporary/' directory.
23/06/24 15:43:33 INFO FileOutputCommitter: Saved output of task 'attempt_202306241543314112936720846474209_0221_m_000000_221' to gs://kafka-spark-data/.spark-bigquery-local-1687636163344-60b89fd2-ef43-43f2-8675-88739e516fa4/_temporary/0/task_202306241543314112936720846474209_0221_m_000000
23/06/24 15:43:33 INFO SparkHadoopMapRedUtil: attempt_202306241543314112936720846474209_0221_m_000000_221: Committed. Elapsed time: 986 ms.
23/06/24 15:43:33 INFO Executor: Finished task 0.0 in stage 221.0 (TID 221). 2536 bytes result sent to driver
23/06/24 15:43:33 INFO TaskSetManager: Finished task 0.0 in stage 221.0 (TID 221) in 2083 ms on nics-mbp.attlocal.net (executor driver) (1/1)
23/06/24 15:43:33 INFO TaskSchedulerImpl: Removed TaskSet 221.0, whose tasks have all completed, from pool 
2

                                                                                

23/06/24 15:43:34 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-60b89fd2-ef43-43f2-8675-88739e516fa4/_temporary/0/task_202306241543314112936720846474209_0221_m_000000/' directory.
23/06/24 15:43:34 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-60b89fd2-ef43-43f2-8675-88739e516fa4/' directory.
23/06/24 15:43:35 INFO FileFormatWriter: Write Job 9d99a4bb-46e7-4b03-a47d-8917f180ae82 committed. Elapsed time: 2358 ms.
23/06/24 15:43:35 INFO FileFormatWriter: Finished processing stats for write job 9d99a4bb-46e7-4b03-a47d-8917f180ae82.
23/06/24 15:43:35 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 222:>                                                        (0 + 1) / 1]

23/06/24 15:43:43 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:43:43 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:43:43 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5338.
23/06/24 15:43:44 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:43:45 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-4de41ce8-d7c6-4b53-9b38-b5f8616bc2b7/_temporary/0/task_202306241543422219220570663763894_0222_m_000000/' directory.
23/06/24 15:43:46 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-4de41ce8-d7c6-4b53-9b38-b5f8616bc2b7/' directory.
23/06/24 15:43:46 INFO FileFormatWriter: Write Job 1c28a817-3e7c-4880-a8ec-941c9b222993 committed. Elapsed time: 2097 ms.
23/06/24 15:43:46 INFO FileFormatWriter: Finished processing stats for write job 1c28a817-3e7c-4880-a8ec-941c9b222993.
23/06/24 15:43:46 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 223:>                                                        (0 + 1) / 1]

23/06/24 15:43:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:43:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:43:55 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5340.
23/06/24 15:43:56 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:43:57 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-f6f5e83b-9ef4-4ab9-884d-6920ec553fd4/_temporary/0/task_202306241543546812901914039253015_0223_m_000000/' directory.
23/06/24 15:43:58 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-f6f5e83b-9ef4-4ab9-884d-6920ec553fd4/' directory.
23/06/24 15:43:58 INFO FileFormatWriter: Write Job cafbd1c0-3e21-414d-8520-279189665cab committed. Elapsed time: 2034 ms.
23/06/24 15:43:58 INFO FileFormatWriter: Finished processing stats for write job cafbd1c0-3e21-414d-8520-279189665cab.
23/06/24 15:43:59 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 224:>                                                        (0 + 1) / 1]

23/06/24 15:44:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:44:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:44:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5342.
23/06/24 15:44:09 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:44:10 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cba88029-3b89-47cf-9eed-703c850cd826/_temporary/0/task_202306241544074985391970721279483_0224_m_000000/' directory.
23/06/24 15:44:10 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cba88029-3b89-47cf-9eed-703c850cd826/' directory.
23/06/24 15:44:10 INFO BlockManagerInfo: Removed broadcast_224_piece0 on nics-mbp.attlocal.net:52028 in memory (size: 94.9 KiB, free: 434.4 MiB)
23/06/24 15:44:11 INFO FileFormatWriter: Write Job 672db8cf-33b2-4e14-9c83-0b7c4b5ed556 committed. Elapsed time: 2000 ms.
23/06/24 15:44:11 INFO FileFormatWriter: Finished processing stats for write job 672db8cf-33b2-4e14-9c83-0b7c4b5ed556.
23/06/24 15:44:11 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafk

[Stage 225:>                                                        (0 + 1) / 1]

23/06/24 15:44:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:44:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:44:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5345.
23/06/24 15:44:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:44:22 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-e6b771fd-70ae-4c1a-9b97-63627a797f51/_temporary/0/task_202306241544196630773960278918647_0225_m_000000/' directory.
23/06/24 15:44:22 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-e6b771fd-70ae-4c1a-9b97-63627a797f51/' directory.
23/06/24 15:44:23 INFO FileFormatWriter: Write Job 27a04ce7-fcde-4b26-ade6-e410e506fd54 committed. Elapsed time: 2006 ms.
23/06/24 15:44:23 INFO FileFormatWriter: Finished processing stats for write job 27a04ce7-fcde-4b26-ade6-e410e506fd54.
23/06/24 15:44:23 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 226:>                                                        (0 + 1) / 1]

23/06/24 15:44:31 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:44:31 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:44:31 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5347.
23/06/24 15:44:32 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:44:33 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-d8013a6a-587b-4eb1-afb0-3128e0ebc279/_temporary/0/task_202306241544305750021783249033278_0226_m_000000/' directory.
23/06/24 15:44:34 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-d8013a6a-587b-4eb1-afb0-3128e0ebc279/' directory.
23/06/24 15:44:35 INFO FileFormatWriter: Write Job ba35dbbb-98ec-41a7-b0a6-62a6ca021e4c committed. Elapsed time: 2196 ms.
23/06/24 15:44:35 INFO FileFormatWriter: Finished processing stats for write job ba35dbbb-98ec-41a7-b0a6-62a6ca021e4c.
23/06/24 15:44:35 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 227:>                                                        (0 + 1) / 1]

23/06/24 15:44:44 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:44:44 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:44:44 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5350.
23/06/24 15:44:45 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:44:46 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-c52e0172-b544-46ae-9710-354754fd83c2/_temporary/0/task_202306241544438373234766100047334_0227_m_000000/' directory.
23/06/24 15:44:46 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-c52e0172-b544-46ae-9710-354754fd83c2/' directory.
23/06/24 15:44:47 INFO FileFormatWriter: Write Job 0731c535-2be5-437b-b7eb-7b20954cd166 committed. Elapsed time: 2064 ms.
23/06/24 15:44:47 INFO FileFormatWriter: Finished processing stats for write job 0731c535-2be5-437b-b7eb-7b20954cd166.
23/06/24 15:44:47 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 228:>                                                        (0 + 1) / 1]

23/06/24 15:44:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:44:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:44:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5352.
23/06/24 15:44:57 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:44:58 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-316a656c-9935-469c-9ca7-5ecf8e2ab1c6/_temporary/0/task_20230624154455860535605805953295_0228_m_000000/' directory.
23/06/24 15:44:59 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-316a656c-9935-469c-9ca7-5ecf8e2ab1c6/' directory.
23/06/24 15:44:59 INFO FileFormatWriter: Write Job 9b0fac8d-e493-425e-9803-09afba129b47 committed. Elapsed time: 2262 ms.
23/06/24 15:44:59 INFO FileFormatWriter: Finished processing stats for write job 9b0fac8d-e493-425e-9803-09afba129b47.
23/06/24 15:45:00 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDi

[Stage 229:>                                                        (0 + 1) / 1]

23/06/24 15:45:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:45:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:45:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5355.
23/06/24 15:45:10 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:45:11 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-b67f0e02-4b5d-4abd-acd5-075c6d204635/_temporary/0/task_202306241545087765221744616854592_0229_m_000000/' directory.
23/06/24 15:45:11 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-b67f0e02-4b5d-4abd-acd5-075c6d204635/' directory.
23/06/24 15:45:12 INFO FileFormatWriter: Write Job b8f8ef7c-d330-4c89-879c-68130ef698b6 committed. Elapsed time: 1888 ms.
23/06/24 15:45:12 INFO FileFormatWriter: Finished processing stats for write job b8f8ef7c-d330-4c89-879c-68130ef698b6.
23/06/24 15:45:12 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 230:>                                                        (0 + 1) / 1]

23/06/24 15:45:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:45:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:45:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5357.
23/06/24 15:45:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:45:22 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-edaae3d0-5c48-4892-a3d7-e1dc9d7651a7/_temporary/0/task_202306241545198428101437658095633_0230_m_000000/' directory.
23/06/24 15:45:22 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-edaae3d0-5c48-4892-a3d7-e1dc9d7651a7/' directory.
23/06/24 15:45:23 INFO FileFormatWriter: Write Job 49f880d1-a26e-496e-90f4-d6a816306c90 committed. Elapsed time: 1796 ms.
23/06/24 15:45:23 INFO FileFormatWriter: Finished processing stats for write job 49f880d1-a26e-496e-90f4-d6a816306c90.
23/06/24 15:45:23 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 231:>                                                        (0 + 1) / 1]

23/06/24 15:45:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:45:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:45:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5359.
23/06/24 15:45:33 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:45:34 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-ad3a8f3a-dcad-484d-9906-d70ec73d05e8/_temporary/0/task_202306241545316374939825352523789_0231_m_000000/' directory.
23/06/24 15:45:35 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-ad3a8f3a-dcad-484d-9906-d70ec73d05e8/' directory.
23/06/24 15:45:35 INFO FileFormatWriter: Write Job b19dbb10-0350-43e1-9cbb-34190c60fc34 committed. Elapsed time: 2011 ms.
23/06/24 15:45:35 INFO FileFormatWriter: Finished processing stats for write job b19dbb10-0350-43e1-9cbb-34190c60fc34.
23/06/24 15:45:36 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 232:>                                                        (0 + 1) / 1]

23/06/24 15:45:44 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cc3fb3c9-b4cd-461b-bee2-1ebfa4070989/_temporary/0/_temporary/' directory.
23/06/24 15:45:44 INFO FileOutputCommitter: Saved output of task 'attempt_202306241545426242691699373386998_0232_m_000000_232' to gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cc3fb3c9-b4cd-461b-bee2-1ebfa4070989/_temporary/0/task_202306241545426242691699373386998_0232_m_000000
23/06/24 15:45:44 INFO SparkHadoopMapRedUtil: attempt_202306241545426242691699373386998_0232_m_000000_232: Committed. Elapsed time: 1045 ms.
23/06/24 15:45:44 INFO Executor: Finished task 0.0 in stage 232.0 (TID 232). 2536 bytes result sent to driver
23/06/24 15:45:44 INFO TaskSetManager: Finished task 0.0 in stage 232.0 (TID 232) in 1794 ms on nics-mbp.attlocal.net (executor driver) (1/1)
23/06/24 15:45:44 INFO TaskSchedulerImpl: Removed TaskSet 232.0, whose tasks have all completed, from pool 


                                                                                

23/06/24 15:45:45 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cc3fb3c9-b4cd-461b-bee2-1ebfa4070989/_temporary/0/task_202306241545426242691699373386998_0232_m_000000/' directory.
23/06/24 15:45:46 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-cc3fb3c9-b4cd-461b-bee2-1ebfa4070989/' directory.
23/06/24 15:45:46 INFO FileFormatWriter: Write Job ddcaf957-0eca-41d6-986d-1a141e6995ec committed. Elapsed time: 2090 ms.
23/06/24 15:45:46 INFO FileFormatWriter: Finished processing stats for write job ddcaf957-0eca-41d6-986d-1a141e6995ec.
23/06/24 15:45:47 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 233:>                                                        (0 + 1) / 1]

23/06/24 15:45:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:45:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:45:56 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5364.
23/06/24 15:45:57 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:45:58 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-ecffd656-4fee-4a14-8fc6-d6d0d67397ef/_temporary/0/task_202306241545555701057066379465333_0233_m_000000/' directory.
23/06/24 15:45:59 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-ecffd656-4fee-4a14-8fc6-d6d0d67397ef/' directory.
23/06/24 15:45:59 INFO FileFormatWriter: Write Job 96034c67-62e9-470d-943c-03cf0651790e committed. Elapsed time: 1893 ms.
23/06/24 15:45:59 INFO FileFormatWriter: Finished processing stats for write job 96034c67-62e9-470d-943c-03cf0651790e.
23/06/24 15:46:00 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 234:>                                                        (0 + 1) / 1]

23/06/24 15:46:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:46:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:46:07 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5366.
23/06/24 15:46:09 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:46:10 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-7870f21e-d406-493f-8e5c-52aafb726bd7/_temporary/0/task_20230624154607668688817708867172_0234_m_000000/' directory.
23/06/24 15:46:10 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-7870f21e-d406-493f-8e5c-52aafb726bd7/' directory.
23/06/24 15:46:11 INFO FileFormatWriter: Write Job b51c467e-fc0e-4632-9a81-5543f6b4664d committed. Elapsed time: 2196 ms.
23/06/24 15:46:11 INFO FileFormatWriter: Finished processing stats for write job b51c467e-fc0e-4632-9a81-5543f6b4664d.
23/06/24 15:46:11 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDi

[Stage 235:>                                                        (0 + 1) / 1]

23/06/24 15:46:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:46:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:46:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5369.
23/06/24 15:46:20 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:46:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-baa70b38-90fe-4c8d-a67f-b228d4140518/_temporary/0/task_202306241546183320456958041564764_0235_m_000000/' directory.
23/06/24 15:46:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-baa70b38-90fe-4c8d-a67f-b228d4140518/' directory.
23/06/24 15:46:22 INFO FileFormatWriter: Write Job 6c002eed-f225-4712-ad59-6c5fc7092d91 committed. Elapsed time: 2094 ms.
23/06/24 15:46:22 INFO FileFormatWriter: Finished processing stats for write job 6c002eed-f225-4712-ad59-6c5fc7092d91.
23/06/24 15:46:22 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 236:>                                                        (0 + 1) / 1]

23/06/24 15:46:33 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:46:33 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:46:33 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5371.
23/06/24 15:46:34 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:46:35 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-1c514615-e52f-41e8-8338-cc7f4addb6d2/_temporary/0/task_202306241546326010417713276490322_0236_m_000000/' directory.
23/06/24 15:46:36 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-1c514615-e52f-41e8-8338-cc7f4addb6d2/' directory.
23/06/24 15:46:36 INFO FileFormatWriter: Write Job eccba82d-bff6-4a4a-a56e-a838e7a82e2d committed. Elapsed time: 2271 ms.
23/06/24 15:46:36 INFO FileFormatWriter: Finished processing stats for write job eccba82d-bff6-4a4a-a56e-a838e7a82e2d.
23/06/24 15:46:36 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 237:>                                                        (0 + 1) / 1]

23/06/24 15:47:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:47:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:47:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-5, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5380.
23/06/24 15:47:24 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:47:26 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-5019e852-d7ac-43e2-b496-f2f319ccf148/_temporary/0/task_202306241547231561773205856950615_0237_m_000000/' directory.
23/06/24 15:47:26 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-5019e852-d7ac-43e2-b496-f2f319ccf148/' directory.
23/06/24 15:47:27 INFO FileFormatWriter: Write Job a0025a09-45ff-4d04-b862-613980e4099f committed. Elapsed time: 2075 ms.
23/06/24 15:47:27 INFO FileFormatWriter: Finished processing stats for write job a0025a09-45ff-4d04-b862-613980e4099f.
23/06/24 15:47:27 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 238:>                                                        (0 + 1) / 1]

23/06/24 15:53:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:53:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:53:24 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5383.
23/06/24 15:53:26 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:53:27 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-6531f00d-af48-4c37-b72e-a5864b285562/_temporary/0/task_202306241553238060599031040295961_0238_m_000000/' directory.
23/06/24 15:53:27 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-6531f00d-af48-4c37-b72e-a5864b285562/' directory.
23/06/24 15:53:28 INFO BlockManagerInfo: Removed broadcast_238_piece0 on nics-mbp.attlocal.net:52028 in memory (size: 95.0 KiB, free: 434.4 MiB)
23/06/24 15:53:28 INFO FileFormatWriter: Write Job 7744cf02-1910-4f7b-9d04-e20135216392 committed. Elapsed time: 2314 ms.
23/06/24 15:53:28 INFO FileFormatWriter: Finished processing stats for write job 7744cf02-1910-4f7b-9d04-e20135216392.
23/06/24 15:53:29 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafk

[Stage 239:>                                                        (0 + 1) / 1]

23/06/24 15:53:37 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:53:37 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:53:37 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5385.
23/06/24 15:53:39 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:53:40 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-6ff3057f-163d-45f4-91bb-276d29a149a4/_temporary/0/task_202306241553367973894061413901518_0239_m_000000/' directory.
23/06/24 15:53:41 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-6ff3057f-163d-45f4-91bb-276d29a149a4/' directory.
23/06/24 15:53:41 INFO BlockManagerInfo: Removed broadcast_239_piece0 on nics-mbp.attlocal.net:52028 in memory (size: 95.0 KiB, free: 434.4 MiB)
23/06/24 15:53:41 INFO FileFormatWriter: Write Job bacf3884-8e1f-4c40-acdb-e19fb2a076d2 committed. Elapsed time: 2426 ms.
23/06/24 15:53:41 INFO FileFormatWriter: Finished processing stats for write job bacf3884-8e1f-4c40-acdb-e19fb2a076d2.
23/06/24 15:53:42 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafk

[Stage 240:>                                                        (0 + 1) / 1]

23/06/24 15:53:51 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:53:51 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:53:51 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5388.
23/06/24 15:53:53 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:53:54 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-900e5a0e-5503-4f57-95ba-8fd0f4fa1b7a/_temporary/0/task_202306241553503275691611424142084_0240_m_000000/' directory.
23/06/24 15:53:54 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-900e5a0e-5503-4f57-95ba-8fd0f4fa1b7a/' directory.
23/06/24 15:53:55 INFO BlockManagerInfo: Removed broadcast_240_piece0 on nics-mbp.attlocal.net:52028 in memory (size: 95.0 KiB, free: 434.4 MiB)
23/06/24 15:53:55 INFO FileFormatWriter: Write Job 2d6a6d3c-09db-49dc-b743-ef8243e25e7c committed. Elapsed time: 2222 ms.
23/06/24 15:53:55 INFO FileFormatWriter: Finished processing stats for write job 2d6a6d3c-09db-49dc-b743-ef8243e25e7c.
23/06/24 15:53:55 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafk

[Stage 241:>                                                        (0 + 1) / 1]

23/06/24 15:54:04 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:54:04 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:54:04 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5391.
23/06/24 15:54:05 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:54:06 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-eba74d4b-ec03-4f61-9858-b56734d1cf03/_temporary/0/task_202306241554037237758450032569003_0241_m_000000/' directory.
23/06/24 15:54:07 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-eba74d4b-ec03-4f61-9858-b56734d1cf03/' directory.
23/06/24 15:54:07 INFO FileFormatWriter: Write Job 3da2b4f6-2d7d-4fa2-a740-084a138b6577 committed. Elapsed time: 2365 ms.
23/06/24 15:54:07 INFO FileFormatWriter: Finished processing stats for write job 3da2b4f6-2d7d-4fa2-a740-084a138b6577.
23/06/24 15:54:08 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createD

[Stage 242:>                                                        (0 + 1) / 1]

23/06/24 15:54:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:54:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:54:18 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5394.
23/06/24 15:54:20 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-loca

                                                                                

23/06/24 15:54:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-e66b52c5-8a04-4dda-a4b0-ba63bec5d534/_temporary/0/task_20230624155418287911688369781841_0242_m_000000/' directory.
23/06/24 15:54:21 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-e66b52c5-8a04-4dda-a4b0-ba63bec5d534/' directory.
23/06/24 15:54:22 INFO FileFormatWriter: Write Job c125a5a2-bd37-493c-b04d-c44314a9b97f committed. Elapsed time: 2237 ms.
23/06/24 15:54:22 INFO FileFormatWriter: Finished processing stats for write job c125a5a2-bd37-493c-b04d-c44314a9b97f.
23/06/24 15:54:23 INFO BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=twitter_kafka_pyspark_test, projectId=global-maxim-338114, tableId=kafka_pyspark}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDi

[Stage 243:>                                                        (0 + 1) / 1]

23/06/24 15:54:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 0.
23/06/24 15:54:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Seeking to LATEST offset of partition ticketmaster-0
23/06/24 15:54:32 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor-7, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-executor] Resetting offset for partition ticketmaster-0 to offset 5396.
23/06/24 15:54:33 INFO AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb

                                                                                

23/06/24 15:54:34 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-driver-0-6, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-driver-0] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
23/06/24 15:54:35 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-driver-0-6, groupId=spark-kafka-source-ba1563fc-d255-476e-bc57-dbb77866df2e--2001384727-driver-0] Connection to node 1 (/127.0.0.1:9092) could not be established. Broker may not be available.
23/06/24 15:54:35 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/.spark-bigquery-local-1687636163344-7f4a908b-c60a-485d-90dc-974263385e0a/_temporary/0/task_202306241554313259584058231161412_0243_m_000000/' directory.
23/06/24 15:54:35 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://kafka-spark-data/

In [42]:
# WRITE TO CONSOLE TO LOG 
console_query = df2.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime="10 seconds") \
    .start() \
    .awaitTermination()

    # .foreachBatch(write_batch) \

23/06/22 20:39:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/gr/l9vf1gtd2_s_3tz5n15mgkn80000gn/T/temporary-7718572b-fe8e-461e-9e94-f12a490f995c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/06/22 20:39:55 INFO ResolveWriteToStream: Checkpoint root /private/var/folders/gr/l9vf1gtd2_s_3tz5n15mgkn80000gn/T/temporary-7718572b-fe8e-461e-9e94-f12a490f995c resolved to file:/private/var/folders/gr/l9vf1gtd2_s_3tz5n15mgkn80000gn/T/temporary-7718572b-fe8e-461e-9e94-f12a490f995c.
23/06/22 20:39:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/22 20:39:55 INFO CheckpointFileManager: Writing atomically to file:/private/var/folders/gr/l9vf1gtd2_s_3tz5n15mgkn80000gn/T/te

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/nicburkett/opt/anaconda3/envs/spark/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicburkett/opt/anaconda3/envs/spark/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/nicburkett/opt/anaconda3/envs/spark/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

23/06/22 20:40:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-2b8cbc25-4bf7-465c-b61f-6e26ba34a2b9--2109907342-driver-0-33, groupId=spark-kafka-source-2b8cbc25-4bf7-465c-b61f-6e26ba34a2b9--2109907342-driver-0] Seeking to LATEST offset of partition ticketmaster-0
23/06/22 20:40:20 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-2b8cbc25-4bf7-465c-b61f-6e26ba34a2b9--2109907342-driver-0-33, groupId=spark-kafka-source-2b8cbc25-4bf7-465c-b61f-6e26ba34a2b9--2109907342-driver-0] Resetting offset for partition ticketmaster-0 to offset 1393.
23/06/22 20:40:20 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "5ff3f0d9-8649-4084-a0de-9c984d47f474",
  "runId" : "061d6392-20e9-47dc-a164-da2a57f51557",
  "name" : null,
  "timestamp" : "2023-06-23T01:40:20.005Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 10,
    "triggerExecution" : 

In [23]:
spark.stop()

23/06/19 19:50:27 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor-9, groupId=spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001:
org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
23/06/19 19:50:42 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor-9, groupId=spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor] Connection to node 1001 (host.docker.internal/143.244.220.150:9092) could not be established. Broker may not be available.
23/06/19 19:50:42 INFO FetchSessionHandler: [Consumer clientId=consumer-spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor-9, groupId=spark-kafka-source-d2941ca6-4ad1-48ba-bda3-65af4267d63f--2001384727-executor] Erro

In [None]:

## BATCH DATA PROCESSING 

df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", topic_name) \
  .load()\
  .selectExpr("CAST(value AS STRING)")

# Apply deserialization or further processing if needed
df1 = df.withColumn("parsed_data", from_json("value", schema))
## Select the data from the parsed_data column
df2 = df1.select("parsed_data.*")

gcs_metadata_folder = "gs://kafka-spark-data/spark-metadata"
gcs_data_folder = "gs://kafka-spark-data/raw-spark-data"

print(df2.schema)

## WRITE TO LOCAL STORAGE
# gcs_write = df2.write \
#   .format("csv") \
#   .option("checkpointLocation", "/Users/nicburkett/Desktop/spark_output") \
#   .option("path","/Users/nicburkett/Desktop/spark_output") \
#   .mode("overwrite")\
#   .save()

##WRITE TO GCS BUCKET
# gcs_write_newfolder = df2.write \
#   .format("parquet") \
#   .option("checkpointLocation", gcs_metadata_folder) \
#   .option("path",gcs_data_folder) \
#   .mode("overwrite")\
#   .save()


dataset = 'global-maxim-338114.twitter_kafka_pyspark_test'
table = 'twitter_kafka_pyspark_test'

# Write the DataFrame to BigQuery
 ## this is the bucket where the data is stored temporarily
df2.write \
    .format("bigquery") \
    .mode("overwrite") \
    .option("checkpointLocation", gcs_metadata_folder) \
    .option("temporaryGcsBucket", 'kafka-spark-data') \
    .option("table",'global-maxim-338114.twitter_kafka_pyspark_test.kafka_pyspark') \
    .option("mode", "FAILFAST") \
    .save()


In [None]:
import pandas as pd
# Function to save the data to GCS with a custom filename
def save_to_gcs(batch_df, batch_id):
    # Convert the batch dataframe to a pandas dataframe
    pandas_df = batch_df.toPandas()

    # Get the values of the desired columns from the first row
    column1_value = pandas_df.loc[0, "column1"]
    column2_value = pandas_df.loc[0, "column2"]

    # Get the current time
    current_time = pd.Timestamp.now()

    # Generate the custom filename
    filename = f"file_{column1_value}_{column2_value}_{current_time}.parquet"

    # Save the dataframe to GCS with the custom filename
    pandas_df.to_parquet(f"gs://{bucket_name}/{path}/{filename}", index=False)


In [None]:
# DATA TYPES THOUGHOUT KAFKA SERVER
import json
import pandas as pd
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, udf
import findspark

findspark.init()
topic_name = 'twitter'
# Config
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("TwitterSentimentAnalysis") \
    .getOrCreate()

prod = {'user_id': 19, 'recipient_id': 57, 'message': 'YbfyRHyWgjuGlzOiudEcVMLJNzqUPDvV'}
print(type(prod))
##convert dictionary to json string (BYTES)
serialized_prod = json.dumps(prod).encode('utf-8')

print(f'The producer dtype is {type(serialized_prod)} and output is {(serialized_prod)}')

## turn from string/bytes into a dictionary again
deserializer_function = lambda x: json.loads(x.decode('utf-8'))
deserialized_cons = deserializer_function(serialized_prod)

print(f'The producer dtype is {type(deserialized_cons)} and output is {(deserialized_cons)}')

### PARSING THE JSON COMING OUT 
user_id = deserialized_cons.get('user_id')
recipient_id = deserialized_cons.get('recipient_id')
message = deserialized_cons.get('message')
output_parsed = print(f'UserID: {user_id}, RecipientID: {recipient_id}, Message:{message}')


df_pandas = pd.DataFrame([deserialized_cons])
df_pandas


# df_spark = spark.createDataFrame(df_pandas)
# df_spark.show()

# Create a spark schema/column headers
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("recipient_id", IntegerType(), True),
    StructField("message", StringType(), True)
])

# Create DataFrame from a single row
# data = [(user_id, recipient_id, message)]
df_spark = spark.createDataFrame(df_pandas,schema)
df_spark.show()
# df.write.csv('/path/to/output.csv', header=True, mode='overwrite')
