In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
%pip install azure-eventhub


In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# CricAPI credentials
API_KEY = "b213b9b0-2e50-4cae-90f5-15029cfa4d9b"
CRICAPI_URL = "https://api.cricapi.com/v1/cricScore?apikey=" + API_KEY  # API URL

# Event Hub credentials
EVENT_HUB_CONN_STR = "Endpoint=sb://cricket-data-stream-ns.servicebus.windows.net/;SharedAccessKeyName=Data-send;SharedAccessKey=Y/QuiMSZqEbxoZPU/NLgdnky72tXDxZ1Q+AEhC93Mpw=;EntityPath=cricket-live-data"
EVENT_HUB_NAME = "cricket-live-data"

# Function to fetch live cricket match data from CricAPI
def fetch_cricket_data():
    response = requests.get(CRICAPI_URL)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Failed to fetch data from CricAPI, status code: {response.status_code}")
        return None

# Function to send data to Event Hubs
def send_data_to_event_hub(data):
    producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONN_STR, eventhub_name=EVENT_HUB_NAME)
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(data)))
    producer.send_batch(event_data_batch)
    producer.close()

# Main function to fetch and send data
if __name__ == "__main__":
    cricket_data = fetch_cricket_data()
    if cricket_data:
        print("Fetched cricket data successfully")
        send_data_to_event_hub(cricket_data)
        print("Data sent to Event Hub successfully")


In [0]:
# Check if Unity Catalog is enabled and you have permissions to create catalogs
try:
    # Create the catalog
    spark.sql("CREATE CATALOG IF NOT EXISTS streaming;")
    print("Catalog 'streaming' created or already exists.")
except Exception as e:
    print(f"Error creating catalog: {e}")

# Create schemas (or databases) within the catalog
try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.bronze;")
    print("Schema 'streaming.bronze' created or already exists.")
except Exception as e:
    print(f"Error creating bronze schema: {e}")

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.silver;")
    print("Schema 'streaming.silver' created or already exists.")
except Exception as e:
    print(f"Error creating silver schema: {e}")

try:
    spark.sql("CREATE SCHEMA IF NOT EXISTS streaming.gold;")
    print("Schema 'streaming.gold' created or already exists.")
except Exception as e:
    print(f"Error creating gold schema: {e}")


In [0]:
# List schemas in the default metastore (hive_metastore)
spark.sql("SHOW SCHEMAS IN streaming").show()

# Verify the default catalog (should be hive_metastore)
spark.sql("SHOW CATALOGS").show()


In [0]:
from pyspark.sql import SparkSession

# Assuming 'spark' is your SparkSession
sc = spark.sparkContext

# Define the Event Hub connection string
event_hub_connection_string = "Endpoint=sb://cricket-data-stream-ns.servicebus.windows.net/;SharedAccessKeyName=Data-send;SharedAccessKey=Y/QuiMSZqEbxoZPU/NLgdnky72tXDxZ1Q+AEhC93Mpw=;EntityPath=cricket-live-data"

# Encrypt the connection string using Spark's Scala API
encrypted_connection_string = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_connection_string)

# Configure the Event Hub settings with the encrypted connection string
ehConf = {
  'eventhubs.connectionString': sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(event_hub_connection_string)
}

# Continue with your existing code to read from Event Hubs stream
event_stream = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

# The rest of your code remains the same
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

# Define the schema for the incoming JSON data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("dateTimeGMT", StringType(), True),
    StructField("matchType", StringType(), True),
    StructField("status", StringType(), True),
    StructField("ms", StringType(), True),
    StructField("t1", StringType(), True),
    StructField("t2", StringType(), True),
    StructField("t1s", StringType(), True),
    StructField("t2s", StringType(), True),
    StructField("series", StringType(), True),
    StructField("t1img", StringType(), True),
    StructField("t2img", StringType(), True)
])

# Event Hub connection string
event_hub_connection_string = "Endpoint=sb://cricket-data-stream-ns.servicebus.windows.net/;SharedAccessKeyName=Data-send;SharedAccessKey=Y/QuiMSZqEbxoZPU/NLgdnky72tXDxZ1Q+AEhC93Mpw=;EntityPath=cricket-live-data"

# Configure the Event Hub settings
ehConf = {
  'eventhubs.connectionString': event_hub_connection_string
}

# Read from Event Hubs stream
event_stream = (
    spark.readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

# Convert binary data in 'body' to string (assuming the body contains UTF-8 encoded JSON)
cricket_data = event_stream.withColumn("body", col("body").cast("string"))

# Parse the body column (JSON data) and apply the schema
cricket_data_json = cricket_data.select(from_json(col("body"), schema).alias("data")).select("data.*")

# Write raw data to the Bronze layer (Delta table)
cricket_data_json.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/cricket-data") \
    .start("/mnt/bronze/cricket-data")
