# DnD Monster Tracker

In [None]:
from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="kafka:9092", 
    client_id='test'
)

topic_list = []
topic_list.append(NewTopic(name="monster_movement", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

Create 200 events.
The event should pick five random rows from our dnd_monsters.csv and return the monster's name and str characteristic.

In [1]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import lit, udf


spark = SparkSession.builder.appName("MonsterTracker").config("spark.jars","work/data/commons-pool2-2.11.1.jar,work/data/spark-sql-kafka-0-10_2.12-3.4.0.jar,work/data/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar").getOrCreate()

df = spark.read.format('csv').option('header', 'true').load('./work/data/dnd_monsters.csv').where("str is not null")



In [None]:
%pip install geopy

In [2]:
from geopy.geocoders import Nominatim   
import random
from pyspark.sql.functions import lit, udf
from datetime import datetime

def get_location_and_country():
    #get longitude and latitude
    longitude = random.randint(-180, 180)
    latitude = random.randint(-90, 90)
    
    # Create a geocoder instance for finding country from monster long&lat
    geolocator = Nominatim(user_agent="my_geocoder")
    
    # Reverse geocode to get the location information
    location = geolocator.reverse((latitude, longitude), language='en')

    # Extract the country from the location information, with a random lat and long it might not have a country
    country = "Sea"
    try:
        country = location.raw['address']['country']
    except:
        country = 'Sea'
    return [str(country), str(longitude), str(latitude)]


# location and country udf
location_udf = udf(get_location_and_country, ArrayType(StringType(), False))
longitude_udf = udf(get_location_and_country, IntegerType())
latitude_udf = udf(get_location_and_country, IntegerType())

# timestamp udf

def get_time():
    return str(datetime.now())

time_udf = udf(get_time, StringType())

# adding columns to DF
selection = df['name', 'str'].withColumn('lat', location_udf()[2]).withColumn('long', location_udf()[1]).withColumn('ts', time_udf()).withColumn('country',  location_udf()[0])

selection = selection.cache()

In [3]:
selection = selection.cache()

In [None]:
import random

x = random.randint(5, 200)
y = x - 5

selection.collect()[y:x]

In [4]:
import time
from pyspark.sql.functions import rand


print("starting")

# change to 200 after testing
for i in range(20):
        
    # # range for random 5
    # higher = random.randint(5, 200)
    # lower = higher - 5

    print('point 1 in loop')
    # Randomly select 5 lines
    rows_df = selection.orderBy(rand()).limit(5)
    print('point 2')

    # convert to JSON
    five_batch = rows_df.toJSON().collect()

    print("Batch DataFrame\n")
    rows_df.show()
    time.sleep(1)

    print("Batch JSON\n")
    print(five_batch)
    time.sleep(1)

    #loop through json
    for x in five_batch:
        # Write the DataFrame to Kafka
        data = [(x,),]
        write_df = spark.createDataFrame(data, ['value'])
        write_df.show(1, False)
        write_df.write.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("topic", "monster_movement").save()





starting
point 1 in loop
point 2
Batch DataFrame

+-----------------+----+---+----+--------------------+-------+
|             name| str|lat|long|                  ts|country|
+-----------------+----+---+----+--------------------+-------+
|        blink-dog|12.0| 37|  98|2023-07-21 09:50:...|  China|
|     phase-spider|15.0| 42|  81|2023-07-21 09:52:...|  China|
|minotaur-skeleton|18.0|-49| -88|2023-07-21 09:52:...|    Sea|
|          mammoth|24.0|-31|-105|2023-07-21 09:52:...|    Sea|
|              elk|16.0|-86|  57|2023-07-21 09:51:...|    Sea|
+-----------------+----+---+----+--------------------+-------+

Batch JSON

['{"name":"blink-dog","str":"12.0","lat":"37","long":"98","ts":"2023-07-21 09:50:50.747819","country":"China"}', '{"name":"phase-spider","str":"15.0","lat":"42","long":"81","ts":"2023-07-21 09:52:38.795465","country":"China"}', '{"name":"minotaur-skeleton","str":"18.0","lat":"-49","long":"-88","ts":"2023-07-21 09:52:23.750891","country":"Sea"}', '{"name":"mammoth","st

In [None]:
%pip install geonamescache

* Create a field called population, if the country doesn't return the word "sea", as in the monster is at sea and no population.
* Create a field called damage, if the population is greater than 1 then divide the population by 1000 and multple that by the str of the monster. Otherwise just return their str.
* Create one stream to console that shows the name of the country and how much damage it has taken.
* Create another stream that shows a dataset with the monsters and how much damage they have caused.

In [None]:
from geonamescache import GeonamesCache

def get_country_population(country_name):
    geonames = GeonamesCache()
    population = 0
    if country_name != 'Sea':
        country_data = geonames.get_countries_by_names().get(country_name)
        # create populaton
        population = country_data.get('population')
    else:
        print(f'country not found, likely ocean: {country_name}, population likely 0.')
    return population

population_udf = udf(get_country_population, IntegerType())

print(selection['Country'])

# adding columns to DF
selection = selection.withColumn('Population', population_udf(selection['Country']))

selection = selection.cache()

selection.show()



In [5]:
from geonamescache import GeonamesCache
from pyspark.sql.functions import col, when
from pyspark.sql.functions import sum as _sum

def get_country_population(country_name):
    geonames = GeonamesCache()
    # population = 1
    try:
        if country_name != 'Sea':
            country_data = geonames.get_countries_by_names().get(country_name)
            population = country_data.get('population')
    except:
        population = 1
        print("{} country not found".format(population))
    return population

# Example usage


from pyspark.sql.functions import udf

from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import from_json
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import DecimalType

spark = SparkSession.builder.appName("Monsters").config("spark.jars","commons-pool2-2.11.1.jar,spark-sql-kafka-0-10_2.12-3.4.0.jar,spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar").getOrCreate()
get_country_population_udf = udf(get_country_population, IntegerType())

schema = StructType() \
    .add("name", StringType()) \
    .add("str", StringType()) \
    .add("ts", StringType()) \
    .add("lat", StringType()) \
    .add("long", StringType()) \
    .add("country", StringType()) \

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "monster_movement") \
    .option("kafka.group.id", "tester") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 1) \
    .load() \
    .selectExpr("CAST(value AS STRING)")

# Parse the JSON data and apply the schema
df = df.select(from_json(df.value, schema).alias("data")).select("data.*")

withPopulation = df.withColumn("population", get_country_population_udf(df["country"]))
withDamage = withPopulation.withColumn("damage", when(col("population") > 1, (withPopulation['population'] / 1000) * withPopulation['str']).otherwise(withPopulation['str']))
strongest_monster = withDamage.groupBy("name").agg(_sum(col("damage").cast(DecimalType())).alias("monster_damage"))
worst_country = withDamage.groupBy("country").agg(_sum(col("damage").cast(DecimalType())).alias("country_damage"))

query = worst_country \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start() \


query2 = strongest_monster \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start() \

query.awaitTermination()
query2.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 