# Consume API Data from Kafka with Spark 

For consuming the API data we use Spark to directly read the data from Kafka and we subscribe to the corresponding topic. We made use of the Spark Kafka Package, consumed the data, cleaned it and uploaded it to our MongoDB Collection for the API data.

## Import Packages

In [107]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import udf, round
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import split
import pymongo as mdb

## Create Spark Session

In [108]:
spark = (SparkSession
         .builder
         .appName('nbaConsumer')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
         .getOrCreate())
sc = spark.sparkContext

## Create Schema for Spark Dataframe

In [109]:
schema_player_stats = StructType([
    StructField("player_id", IntegerType(), True),
    StructField("full_name", StringType(), True),
    StructField("season", IntegerType(), True),
    StructField("ast", DoubleType(), True),
    StructField("blk", DoubleType(), True),
    StructField("dreb", DoubleType(), True),
    StructField("fg3_pct", DoubleType(), True),
    StructField("fg3a", DoubleType(), True),
    StructField("fg3m", DoubleType(), True),
    StructField("fg_pct", DoubleType(), True),
    StructField("fga", DoubleType(), True),
    StructField("fgm", DoubleType(), True),
    StructField("ft_pct", DoubleType(), True),
    StructField("fta", DoubleType(), True),
    StructField("ftm", DoubleType(), True),
    StructField("games_played", IntegerType(), True),
    StructField("min", StringType(), True),
    StructField("oreb", DoubleType(), True),
    StructField("pf", DoubleType(), True),
    StructField("pts", DoubleType(), True),
    StructField("reb", DoubleType(), True),
    StructField("stl", DoubleType(), True),
    StructField("turnover", DoubleType(), True)
])

## Read Messages into Spark Dataframe 

In [110]:
df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "NBA-API-TOPIC") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

parsed_df = df.withColumn("parsed_value", from_json(col("value"), schema_player_stats))

final_df = parsed_df.select("key", "parsed_value.*")

final_df.show()

+----+---------+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+-----+----+----+-----+-----+----+--------+
| key|player_id|     full_name|season| ast| blk|dreb|fg3_pct|fg3a|fg3m|fg_pct|  fga| fgm|ft_pct| fta| ftm|games_played|  min|oreb|  pf|  pts|  reb| stl|turnover|
+----+---------+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+-----+----+----+-----+-----+----+--------+
|null|       67|MarShon Brooks|  2011|2.34|0.27|2.32|  0.313|2.68|0.84| 0.428|11.43|4.89| 0.764|2.64|2.02|          56|29:26|1.25|2.07|12.64| 3.57|0.93|    2.11|
|null|       67|MarShon Brooks|  2012|1.04|0.22|0.99|  0.273|0.75|0.21| 0.463| 4.59|2.12| 0.734|1.29|0.95|          73|12:31|0.44|1.27|  5.4| 1.42|0.47|    0.95|
|null|       67|MarShon Brooks|  2013|0.76|0.12|1.33|   0.52|0.76|0.39| 0.456| 3.79|1.73| 0.727|1.33|0.97|          33| 9:37| 0.3|0.64| 4.82| 1.64|0.42|    0.73|
|null|       71| Lorenzo Bro

## View Schema of Dataframe

In [111]:
final_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- player_id: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- ast: double (nullable = true)
 |-- blk: double (nullable = true)
 |-- dreb: double (nullable = true)
 |-- fg3_pct: double (nullable = true)
 |-- fg3a: double (nullable = true)
 |-- fg3m: double (nullable = true)
 |-- fg_pct: double (nullable = true)
 |-- fga: double (nullable = true)
 |-- fgm: double (nullable = true)
 |-- ft_pct: double (nullable = true)
 |-- fta: double (nullable = true)
 |-- ftm: double (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- min: string (nullable = true)
 |-- oreb: double (nullable = true)
 |-- pf: double (nullable = true)
 |-- pts: double (nullable = true)
 |-- reb: double (nullable = true)
 |-- stl: double (nullable = true)
 |-- turnover: double (nullable = true)



## Clean Data

### Drop Columns

In [112]:
final_df = final_df.drop("key")

In [113]:
final_df = final_df.drop("player_id")

In [114]:
final_df.printSchema()

root
 |-- full_name: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- ast: double (nullable = true)
 |-- blk: double (nullable = true)
 |-- dreb: double (nullable = true)
 |-- fg3_pct: double (nullable = true)
 |-- fg3a: double (nullable = true)
 |-- fg3m: double (nullable = true)
 |-- fg_pct: double (nullable = true)
 |-- fga: double (nullable = true)
 |-- fgm: double (nullable = true)
 |-- ft_pct: double (nullable = true)
 |-- fta: double (nullable = true)
 |-- ftm: double (nullable = true)
 |-- games_played: integer (nullable = true)
 |-- min: string (nullable = true)
 |-- oreb: double (nullable = true)
 |-- pf: double (nullable = true)
 |-- pts: double (nullable = true)
 |-- reb: double (nullable = true)
 |-- stl: double (nullable = true)
 |-- turnover: double (nullable = true)



### Convert Minute String into Numeric Values

In [115]:
final_df.show(5)

+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+-----+----+----+-----+----+----+--------+
|     full_name|season| ast| blk|dreb|fg3_pct|fg3a|fg3m|fg_pct|  fga| fgm|ft_pct| fta| ftm|games_played|  min|oreb|  pf|  pts| reb| stl|turnover|
+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+-----+----+----+-----+----+----+--------+
|MarShon Brooks|  2011|2.34|0.27|2.32|  0.313|2.68|0.84| 0.428|11.43|4.89| 0.764|2.64|2.02|          56|29:26|1.25|2.07|12.64|3.57|0.93|    2.11|
|MarShon Brooks|  2012|1.04|0.22|0.99|  0.273|0.75|0.21| 0.463| 4.59|2.12| 0.734|1.29|0.95|          73|12:31|0.44|1.27|  5.4|1.42|0.47|    0.95|
|MarShon Brooks|  2013|0.76|0.12|1.33|   0.52|0.76|0.39| 0.456| 3.79|1.73| 0.727|1.33|0.97|          33| 9:37| 0.3|0.64| 4.82|1.64|0.42|    0.73|
| Lorenzo Brown|  2013|1.71|0.13|0.83|    0.1|1.25|0.13| 0.302| 3.58|1.08| 0.692|0.54|0.38|          24| 9:17|0.33|0.79| 2.6

In [116]:
def convert_min_to_decimal(time_str):
    minutes, seconds = time_str.split(':')
    minutes = int(minutes)
    seconds = int(seconds)
    return minutes + seconds / 60

convert_to_decimal_udf = udf(convert_min_to_decimal, DoubleType())

final_df = final_df.withColumn('min', convert_to_decimal_udf(final_df['min']))

final_df = final_df.withColumn('min', round(final_df['min'], 1))

In [117]:
final_df.show()

+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+----+----+----+-----+-----+----+--------+
|     full_name|season| ast| blk|dreb|fg3_pct|fg3a|fg3m|fg_pct|  fga| fgm|ft_pct| fta| ftm|games_played| min|oreb|  pf|  pts|  reb| stl|turnover|
+--------------+------+----+----+----+-------+----+----+------+-----+----+------+----+----+------------+----+----+----+-----+-----+----+--------+
|MarShon Brooks|  2011|2.34|0.27|2.32|  0.313|2.68|0.84| 0.428|11.43|4.89| 0.764|2.64|2.02|          56|29.4|1.25|2.07|12.64| 3.57|0.93|    2.11|
|MarShon Brooks|  2012|1.04|0.22|0.99|  0.273|0.75|0.21| 0.463| 4.59|2.12| 0.734|1.29|0.95|          73|12.5|0.44|1.27|  5.4| 1.42|0.47|    0.95|
|MarShon Brooks|  2013|0.76|0.12|1.33|   0.52|0.76|0.39| 0.456| 3.79|1.73| 0.727|1.33|0.97|          33| 9.6| 0.3|0.64| 4.82| 1.64|0.42|    0.73|
| Lorenzo Brown|  2013|1.71|0.13|0.83|    0.1|1.25|0.13| 0.302| 3.58|1.08| 0.692|0.54|0.38|          24| 9.3|0.33|0.79| 2.67

### Drop Null/NA Values

In [118]:
number_of_rows = final_df.count()
print(f"The DataFrame has {number_of_rows} rows.")

The DataFrame has 3923 rows.


In [119]:
final_df = final_df.na.drop()

In [120]:
number_of_rows = final_df.count()
print(f"The DataFrame has {number_of_rows} rows.")

The DataFrame has 3922 rows.


### Drop Duplicate Rows

In [121]:
number_of_rows = final_df.count()
print(f"The DataFrame has {number_of_rows} rows.")

The DataFrame has 3922 rows.


In [122]:
final_df = final_df.dropDuplicates()

In [124]:
number_of_rows = final_df.count()
print(f"The DataFrame has {number_of_rows} rows.")

The DataFrame has 3898 rows.


## Upload Cleaned Data to MongoDB

### Convert Spark Dataframe into Pandas Dataframe

In [125]:
mongo_upload_df = final_df.toPandas()

In [126]:
mongo_upload_df.head()

Unnamed: 0,full_name,season,ast,blk,dreb,fg3_pct,fg3a,fg3m,fg_pct,fga,...,fta,ftm,games_played,min,oreb,pf,pts,reb,stl,turnover
0,Malik Rose,2005,0.93,0.19,2.43,1.0,0.01,0.01,0.374,3.9,...,1.9,1.49,72,15.5,1.17,2.01,4.42,3.6,0.6,1.11
1,Paul Pierce,2013,2.37,0.41,4.29,0.373,4.0,1.49,0.451,9.49,...,4.13,3.41,75,28.0,0.35,2.53,13.47,4.64,1.15,2.04
2,Victor Claver,2013,0.57,0.14,1.43,0.167,0.86,0.14,0.405,2.0,...,0.52,0.48,21,8.7,0.43,0.76,2.24,1.86,0.14,0.52
3,Thaddeus Young,2013,2.3,0.46,3.92,0.308,3.7,1.14,0.454,16.24,...,2.9,2.06,79,34.4,2.1,2.7,17.94,6.03,2.11,2.09
4,Mitch Richmond,1996,4.17,0.3,3.21,0.428,5.89,2.52,0.454,19.48,...,6.56,5.64,81,37.8,0.73,2.6,25.86,3.94,1.46,2.93


### Create MongoDB Connection

In [127]:
client = mdb.MongoClient("mongodb://pt-n20.p4001.w3.cs.technikum-wien.at:4001")
db = client.nba_data
collection = db.season_stats_api

### Convert Rows to Dictionaries for Key-Value-Form

In [128]:
records = mongo_upload_df.to_dict(orient='records')

### Iterate over Dataframe and inset or update Collection

In [129]:
for record in records:
    # Use 'full_name' and 'season' as the unique identifiers
    unique_query = {'full_name': record['full_name'], 'season': record['season']}
    
    # Replace or insert the record
    result = collection.update_one(unique_query, {'$set': record}, upsert=True)

    # Check the result to see what operation was performed
    if result.matched_count > 0:
        if result.modified_count > 0:
            print(f"Updated document: {unique_query}")
        else:
            print(f"Document already exists and no update needed: {unique_query}")
    elif result.upserted_id is not None:
        print(f"Inserted new document: {unique_query}")

Document already exists and no update needed: {'full_name': 'Malik Rose', 'season': 2005}
Document already exists and no update needed: {'full_name': 'Paul Pierce', 'season': 2013}
Document already exists and no update needed: {'full_name': 'Victor Claver', 'season': 2013}
Document already exists and no update needed: {'full_name': 'Thaddeus Young', 'season': 2013}
Document already exists and no update needed: {'full_name': 'Mitch Richmond', 'season': 1996}
Document already exists and no update needed: {'full_name': 'Zydrunas Ilgauskas', 'season': 2006}
Document already exists and no update needed: {'full_name': 'Damon Jones', 'season': 2005}
Document already exists and no update needed: {'full_name': 'Hedo Turkoglu', 'season': 2012}
Document already exists and no update needed: {'full_name': 'Tayshaun Prince', 'season': 2007}
Document already exists and no update needed: {'full_name': 'Flip Murray', 'season': 2007}
Document already exists and no update needed: {'full_name': 'Keith Bog

### Close MongoDB Connection

In [132]:
client.close()

## Stop Spark Session

In [133]:
spark.stop()