# Monster Tracking

First import relevant modules:

In [2]:
%pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Note: you may need to restart the kernel to use updated packages.


In [2]:
import kafka
from kafka.admin import KafkaAdminClient, NewTopic
import json
import random
import time
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, rand
from geopy.geocoders import Nominatim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from geonamescache import GeonamesCache

 Create a topic called: monster_movement:

In [4]:
admin_client = KafkaAdminClient(bootstrap_servers="kafka:9092")
topics = [NewTopic(name="monster_movement", num_partitions=1, replication_factor=1)]
admin_client.create_topics(new_topics=topics)
admin_client.close()

In [None]:
# admin_client = KafkaAdminClient(bootstrap_servers="kafka:9092")
# topics = [NewTopic(name="country_damage", num_partitions=1, replication_factor=1)]
# admin_client.create_topics(new_topics=topics)
# admin_client.close()

Next we'll generate the data. We want to use the functions we create in our Spark streaming practice examples. What we want to achieve is:

- Create 200 events.
- The event should pick five random rows from our dnd_monsters.csv and return the monster's name and str characteristic.
- Along with the name and str we need the latitude and longitude of the monster and a timestamp. Latitude is between -90 and 90 and longitude is -180 to 180.
- Add a timestamp field called ts.
- Remember to put everything into JSON then into a field called value for the kafka message.
- Don't write to Kafka yet.

In [3]:
# Create a SparkSession and context
spark = SparkSession.builder.appName("MonsterTracker").config("spark.jars","work/data/commons-pool2-2.11.1.jar,work/data/spark-sql-kafka-0-10_2.12-3.4.0.jar,work/data/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar").getOrCreate()

# Load data into dataframe
path = "./work/data/dnd_monsters.csv"
df = spark.read.options(header=True, inferSchema=True).csv(path)
#df.show()

# Register the DataFrame as a temporary table
df.createOrReplaceTempView("monsters")

# Perform SQL query to get name and strength on the DataFrame
sql_query = """
    SELECT name, str
    FROM monsters
"""
result = spark.sql(sql_query)

# Show the query result
result.show()



+--------------------+----+
|                name| str|
+--------------------+----+
|           aarakocra|10.0|
|             abjurer|null|
|             aboleth|21.0|
|     abominable-yeti|null|
|            acererak|null|
|             acolyte|10.0|
|  adult-black-dragon|23.0|
|adult-blue-dracolich|null|
|   adult-blue-dragon|25.0|
|  adult-brass-dragon|23.0|
| adult-bronze-dragon|25.0|
| adult-copper-dragon|23.0|
|   adult-gold-dragon|27.0|
|  adult-green-dragon|23.0|
|       adult-kruthik|null|
|         adult-oblex|null|
|    adult-red-dragon|27.0|
| adult-silver-dragon|27.0|
|  adult-white-dragon|22.0|
|       air-elemental|14.0|
+--------------------+----+
only showing top 20 rows



In [6]:
# Creating a SparkSession with the extra jars we need for Kafka
stream_spark = SparkSession.builder.appName("Monster_Stream").config("spark.jars","work/data/commons-pool2-2.11.1.jar,work/data/spark-sql-kafka-0-10_2.12-3.4.0.jar,work/data/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar").getOrCreate()

# Read Kafka as a stream, you can read it as a batch also.  Simple read everything
# on the topic into one DataFrame going back to the beginning
stream_spark_df = stream_spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("subscribe", "monster_movement").option("startingOffsets","earliest").load()

# Commented out code which would make the output more readable
# df = df.selectExpr("CAST(key AS STRING) as Key", "CAST(value AS STRING) as Value")

stream_spark_df = stream_spark_df.selectExpr("CAST(value AS STRING) as Value")
# Here we take the previous stream from Kafka and write that stream to the console 
rawQuery = stream_spark_df.writeStream.format("console").outputMode("append").start()


# Once you start the stream you either end it or keep it listening for more data with
# the awaitTermination option
rawQuery.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [7]:
def country_finder(latitude, longitude): 
    geolocator = Nominatim(user_agent="my_geocoder") # Create a geocoder instance
    location = geolocator.reverse((latitude, longitude), language='en') # Reverse geocode to get the location information
    try:
        country = location.raw['address']['country']
    except:
        return "sea"
    return country

def get_country_population(country_name):
    geonames = GeonamesCache()
    # population = 1
    try:
        country_data = geonames.get_countries_by_names().get(country_name)
        population = country_data.get('population')
    except:
        population = 1
        print("{} country not found".format(population))
    return population

for int in range(1,10):
    print('Event ', int)
    monster_json_string = ''
    random_lines = result.orderBy(rand()).limit(5)
    list_tuples = random_lines.rdd.map(tuple).collect()
    for monster in list_tuples:
        lat = random.randint(-90, 90)
        long = random.randint(-180, 180)
        country = country_finder(lat, long)
        if country != 'sea':
            population = get_country_population(country)
            if (population > 1) & (type(monster[1]) == int):
                damage = population * monster[1] / 1000
            else:
                damage = monster[1]
        else:
            population = None
            damage = None
        monster_data = {
            "name": monster[0],
            "str": monster[1],
            "lat": lat,
            "long": long,
            "ts": str(datetime.now()),
            "country": country,
            "population": population,
            "damage": damage
        }
        monster_json_string += json.dumps(monster_data)  
    # print(monster_json_string)
    monster_data = [(monster_json_string,),]
    random_monster_df = spark.createDataFrame(monster_data, ["value"])
    #random_monster_df.show(1, False)
    random_monster_df.write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("topic", "monster_movements") \
        .save()

Event  1
Event  2
Event  3
Event  4




Event  5
Event  6
Event  7
Event  8
Event  9


In [18]:
%pip install geonamescache

Note: you may need to restart the kernel to use updated packages.
