In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Step 2.1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Hudi Example") \
    .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


24/11/27 12:11:48 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.33 instead (on interface en0)
24/11/27 12:11:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/bhland/.ivy2/cache
The jars for the packages stored in: /Users/bhland/.ivy2/jars
org.apache.hudi#hudi-spark3.5-bundle_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e62f8b10-f645-4a07-b2ba-4ce0ed33acf3;1.0
	confs: [default]


:: loading settings :: url = jar:file:/Users/bhland/miniforge3/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.apache.hudi#hudi-spark3.5-bundle_2.12;0.15.0 in central
	found org.apache.hive#hive-storage-api;2.8.1 in central
	found org.slf4j#slf4j-api;1.7.36 in local-m2-cache
:: resolution report :: resolve 81ms :: artifacts dl 4ms
	:: modules in use:
	org.apache.hive#hive-storage-api;2.8.1 from central in [default]
	org.apache.hudi#hudi-spark3.5-bundle_2.12;0.15.0 from central in [default]
	org.slf4j#slf4j-api;1.7.36 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-e62f8b10-f645-4a07-b2ba-4ce0ed33acf3
	confs: [default]
	0 artifa

In [2]:
# Define schema for the JSON data
schema = StructType([
    StructField("patent_id", StringType(), True),
    StructField("patent_date", StringType(), True),
    StructField("num_claims", StringType(), True),
    StructField("branch", StringType(), True),
    StructField("inventors", ArrayType(StructType([
        StructField("inventor_name", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("state", StringType(), True)
    ])), True),
    StructField("applicants", ArrayType(StructType([
        StructField("organization", StringType(), True)
    ])), True)
])

# Read JSON files as streaming input
input_path = "file:///Users/bhland/Big_Data_Project/RealTimePOC/input"  # Replace with your JSON file path
json_stream = spark.readStream \
    .format("json") \
    .schema(schema) \
    .load(input_path)

json_stream = json_stream.na.drop(how="any")

'''
# Write the json stream to the console (can be written to a file, Kafka, etc.)
print_query = json_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .start()

# Await termination
print_query.awaitTermination()

'''

# Extract year and month from the patent_date column
transformed_stream = json_stream \
    .withColumn("year", year(col("patent_date").cast("timestamp"))) \
    .withColumn("month", month(col("patent_date").cast("timestamp"))) \
    .withColumn("inventor", explode(col("inventors")))  # Flatten inventors array

# Count male and female inventors
aggregated_stream = transformed_stream \
    .withColumn("male_count", when(col("inventor.gender") == "M", lit(1)).otherwise(lit(0))) \
    .withColumn("female_count", when(col("inventor.gender") == "F", lit(1)).otherwise(lit(0))) \
    .groupBy("year", "month", "branch") \
    .agg(
        avg("num_claims").alias("avg_claims"),
        count("patent_id").alias("total_patents"),
        sum("male_count").alias("total_male_inventors"),
        sum("female_count").alias("total_female_inventors")
    )

patent_stream = aggregated_stream \
    .withColumn("year", make_date(col("year"), col("month"), lit(1))) \
    .withColumn("recordkey", concat_ws("_", col("year"), col("branch"))) \
    .select(
        col("year"),
        col("branch"),
        col("total_patents").alias("grant_count"),
        col("avg_claims")
    )



gender_stream = aggregated_stream \
    .withColumn("year", make_date(col("year"), col("month"), lit(1))) \
    .select(
        col("year"),
        col("branch"),
        col("total_female_inventors").alias("female_count"),
        col("total_male_inventors").alias("male_count")    
    )



'''
# Write the aggregated results to the console
output_patent_query = final_patent_stream.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .start()

# Write the aggregated results to the console
output_gender_query = final_gender_stream.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .start()

# Await termination
output_patent_query.awaitTermination()
output_gender_query.awaitTermination()

'''
# Step 2.4: Define Hudi Options
patent_trends_options = {
    'hoodie.table.name': 'patent_trends',
    'hoodie.datasource.write.recordkey.field': 'year,branch',
    "hoodie.datasource.write.precombine.field": 'grant_count',
    'hoodie.datasource.write.table.name': 'patent_trends',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2,
    'hoodie.write.handle.insert.skip.null.record': 'true'
}

gender_trends_options = {
    'hoodie.table.name': 'gender_trends',
    'hoodie.datasource.write.recordkey.field': 'year,branch',
    "hoodie.datasource.write.precombine.field": 'male_count',
    'hoodie.datasource.write.table.name': 'gender_trends',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2,
    'hoodie.write.handle.insert.skip.null.record': 'true'
}

# Output path for Hudi table
patent_trends_output_path = "file:///Users/bhland/Big_Data_Project/RealTimePOC/output/patent_trends" 
gender_trends_output_path = "file:///Users/bhland/Big_Data_Project/RealTimePOC/output/gender_trends" 

# Step 2.6: Write to Hudi Table Using Structured Streaming
patent_query = patent_stream.writeStream \
    .format("hudi") \
    .options(**patent_trends_options) \
    .outputMode("complete") \
    .option("checkpointLocation", "file:///Users/bhland/Big_Data_Project/RealTimePOC/output/checkpoint_patent_dir") \
    .start(patent_trends_output_path)
'''
gender_query = gender_stream.writeStream \
    .format("hudi") \
    .options(**gender_trends_options) \
    .outputMode("update") \
    .option("checkpointLocation", "file:///Users/bhland/Big_Data_Project/RealTimePOC/output/checkpoint_gender_dir") \
    .start(gender_trends_output_path)'''
# Await termination to keep the stream running
patent_query.awaitTermination()
#gender_query.awaitTermination()

24/11/27 12:12:54 WARN HoodieStreamingSink: Ignore TableNotFoundException as it is first microbatch.
24/11/27 12:12:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/11/27 12:13:03 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
24/11/27 12:13:03 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
24/11/27 12:13:04 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
24/11/27 12:13:05 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
                                                                                



24/11/27 12:13:10 WARN HoodieSparkSqlWriterInternal: Closing write client
24/11/27 12:13:16 WARN HoodieSparkSqlWriterInternal: hoodie table at file:/Users/bhland/Big_Data_Project/RealTimePOC/output/patent_trends already exists. Deleting existing data & overwriting with new data.
24/11/27 12:13:19 WARN HoodieSparkSqlWriterInternal: Closing write client       
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/bhland/miniforge3/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/bhland/miniforge3/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Users/bhland/miniforge3/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [4]:
patent_trends_output_path = "file:///Users/bhland/Big_Data_Project/RealTimePOC/output/patent_trends" 
gender_trends_output_path = "file:///Users/bhland/Big_Data_Project/RealTimePOC/output" 


patent_df = spark.read.format("hudi").load(patent_trends_output_path)
patent_df.show(truncate=False)

#gender_df = spark.read.format("hudi").load(gender_trends_output_path)
#gender_df.show(truncate=False)

+-------------------+---------------------+---------------------------------------------------------+----------------------+-------------------------------------------------------------------------+----------+----------------------------------+-----------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                       |_hoodie_partition_path|_hoodie_file_name                                                        |year      |branch                            |grant_count|avg_claims        |
+-------------------+---------------------+---------------------------------------------------------+----------------------+-------------------------------------------------------------------------+----------+----------------------------------+-----------+------------------+
|20241127121316312  |20241127121316312_0_0|year:2024-08-01,branch:Data Science and Analytics        |                      |7dbec9b4-59b3-45a9-b6ea-348d84e05e94-0_0-68-459_