In [3]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk'

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta import *

# Initialize SparkSession with Delta Lake support
builder = SparkSession.builder.appName("DeltaLakeExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [5]:
spark.read.parquet('../../data/gold/bets_interview_completed.parquet/').show()

+--------------------+----------+--------------------+--------------------+
|       sportsbook_id|account_id|            outcomes|        transactions|
+--------------------+----------+--------------------+--------------------+
|66ce7381-8a05-4aa...| 112164134|[{{2, 1, 3, +200}...|[{125a6cc9-7de5-9...|
|4e781821-9bd5-43d...| 112164134|[{{2, 1, 3, +200}...|[{c62c05b8-a0df-f...|
|dcac73b6-18e4-4c9...| 114856230|[{{37, 50, 1.74, ...|[{07862be1-5c33-7...|
|abda5fcf-9d5d-453...| 112164134|[{{13, 25, 1.52, ...|[{23cb5474-2337-c...|
+--------------------+----------+--------------------+--------------------+



In [2]:
bets_path = "../../data/bronze/bets_v1"
trans_path = "../../data/bronze/trans_v1"

# Read the Delta Lake table
bets_df = spark.read.format("delta").load(bets_path)
trans_df = spark.read.format("delta").load(trans_path)

24/03/29 19:07:30 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
# Print schema
bets_df.printSchema()

root
 |-- sportsbook_id: string (nullable = true)
 |-- account_id: string (nullable = true)
 |-- legs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- price: struct (nullable = true)
 |    |    |    |-- num: integer (nullable = true)
 |    |    |    |-- den: integer (nullable = true)
 |    |    |    |-- decimal: string (nullable = true)
 |    |    |    |-- americanOdds: string (nullable = true)
 |    |    |-- result: string (nullable = true)
 |    |    |-- legPart: struct (nullable = true)
 |    |    |    |-- outcomeRef: string (nullable = true)
 |    |    |    |-- marketRef: string (nullable = true)
 |    |    |    |-- eventRef: string (nullable = true)
 |-- markets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- outcomeRef: string (nullable = true)
 |    |    |-- marketRef: string (nullable = true)
 |    |    |-- eventRef: string (nullable = true)
 |    |    |-- categoryRef: string (nullable = true)
 |    

In [6]:
print("Bets rows Count:", bets_df.count())

Bets rows Count: 4


In [16]:
bets_df.select('legs')

legs
"[{{7, 4, 2.75, +1..."
"[{{7, 4, 2.75, +1..."
"[{{7, 4, 2.75, +1..."
"[{{37, 50, 1.74, ..."


24/03/29 11:22:51 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1011200 ms exceeds timeout 120000 ms
24/03/29 11:22:51 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/29 11:38:48 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [12]:
bets_pandas_df = bets_df.limit(10).toPandas() 
print(bets_pandas_df)

                          sportsbook_id account_id  \
0  4e781821-9bd5-43df-a886-55dd15614ea1  112164134   
1  66ce7381-8a05-4aa3-aae5-c6bc65295a0c  112164134   
2  abda5fcf-9d5d-4534-954e-aacc42432725  112164134   
3  dcac73b6-18e4-4c9a-8900-9935e0e6b2fa  114856230   

                                                legs  \
0  [((7, 4, 2.75, +175), -, (35487833, 10303766, ...   
1  [((7, 4, 2.75, +175), -, (35487833, 10303766, ...   
2  [((7, 4, 2.75, +175), -, (35487833, 10303766, ...   
3  [((37, 50, 1.74, -136), L, (35644601, 10344031...   

                                             markets  
0  [(35382838, 10203679, 196004, FOOTBALL, |Atlet...  
1  [(35382838, 10203679, 196004, FOOTBALL, |Atlet...  
2  [(35508011, 10307803, 196518, HANDBALL, |HSG G...  
3  [(35644601, 10344031, 226772, FOOTBALL, |No|, ...  


In [23]:
legs_exploded_df = bets_df.withColumn("leg", F.explode("legs"))\
    .selectExpr("sportsbook_id", "account_id", "leg.price as price", "leg.result as result", 
                "leg.legPart.outcomeRef as outcomeRef", "leg.legPart.marketRef as marketRef", 
                "leg.legPart.eventRef as eventRef")

# For markets_exploded_df, assuming the structure is flat based on your schema, it remains the same
markets_exploded_df = bets_df.withColumn("market", F.explode("markets"))\
    .selectExpr("sportsbook_id", "account_id", "market.*")

# Now, perform the join using the correctly referenced columns
joined_df = legs_exploded_df.join(markets_exploded_df, ["sportsbook_id", "account_id", "outcomeRef", "marketRef", "eventRef"])

In [24]:
joined_df.show()

+--------------------+----------+----------+---------+--------+--------------------+------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+
|       sportsbook_id|account_id|outcomeRef|marketRef|eventRef|               price|result|categoryRef|         outcomeName|       eventTypeName|      className|          marketName|           eventName|      eventStartTime|
+--------------------+----------+----------+---------+--------+--------------------+------+-----------+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+
|4e781821-9bd5-43d...| 112164134|  35382838| 10203679|  196004|     {2, 1, 3, +200}|     -|   FOOTBALL|  |Atletico Tucuman|||Argentina Copa d...|    |Argentina||      |Match Result|||Atletico Tucuman...|2022-03-22T22:15:...|
|4e781821-9bd5-43d...| 112164134|  35516061| 10309513|  225456|{131, 100, 2.31, ...|     -|   FOOTBA

In [26]:

outcomes_struct = F.struct(
    "price.num", "price.den", "price.decimal", "price.americanOdds", "result",
    "outcomeRef", "marketRef", "eventRef",  # These were part of legPart
    "categoryRef", "outcomeName", "eventTypeName", "className",
    "marketName", "eventName", "eventStartTime"
).alias("outcomes")

# Group by original identifying columns and aggregate into the desired structure
final_df = joined_df.groupBy("sportsbook_id", "account_id").agg(
    F.collect_list(outcomes_struct).alias("outcomes")
)

final_df.show(truncate=False)

+------------------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
trans_df.show(truncate=False)

+------------------------------------+------------------------------------+---------+-----------------+---------+
|sportsbook_id                       |trans_uuid                          |transType|config           |deltaCash|
+------------------------------------+------------------------------------+---------+-----------------+---------+
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|07862be1-5c33-75ab-4b41-43d73e982405|ADJRESULT|Unsettlement_Lose|0.0      |
|66ce7381-8a05-4aa3-aae5-c6bc65295a0c|125a6cc9-7de5-9cf8-15c2-e6cbfce633fd|RESULT   |Settlement_Lose  |0.0      |
|abda5fcf-9d5d-4534-954e-aacc42432725|23cb5474-2337-ce1d-2400-53077e7909e1|RESULT   |Settlement_Lose  |0.0      |
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|65d2188b-a3ab-41e7-670d-c1bb89d673f6|WAGER    |SBOB_WAGER       |-0.34    |
|dcac73b6-18e4-4c9a-8900-9935e0e6b2fa|68f7914a-17bb-9b5d-d677-c204ce045548|ADJRESULT|Resettlement_Win |0.7      |
|66ce7381-8a05-4aa3-aae5-c6bc65295a0c|866205be-b9da-4887-49af-c081696505f3|WAGER    |SBO

In [28]:
trans_agg_df = trans_df.groupBy("sportsbook_id").agg(
    F.collect_list(
        F.struct(
            "trans_uuid",
            "transType",
            "config",
            "deltaCash"
        )
    ).alias("transactions")
)

# Now, join this aggregated transactions DataFrame with the final bets DataFrame
final_with_trans_df = final_df.join(trans_agg_df, "sportsbook_id", "left_outer")

# Show an example of the result
final_with_trans_df.select("sportsbook_id", "account_id", "outcomes", "transactions").show(truncate=False)

+------------------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------