# Kappa App for nasa log view calculation

In [9]:
from pyspark.sql import SparkSession

# Set the necessary variables
# The following links were used to determine the necessary packages to include:
# - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html and 
# - https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb  

scala_version = '2.12'  
spark_version = '3.5.3'
bootstrap_servers = ['localhost:9092']
topic_name = 'kappa-topic'
consumer_group_id = 'my_group_id'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.9.0'
]

spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .config("SQLConf.ADAPTIVE_EXECUTION_ENABLED.key", "false")\
   .getOrCreate()

In [10]:
from pyspark.sql.functions import col, concat, lit

kafkaDf = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", *bootstrap_servers)\
  .option("subscribe", topic_name)\
  .option("startingOffsets", "earliest")\
  .load()
print(kafkaDf.isStreaming)    # Returns True for DataFrames that have streaming sources
kafkaDf.printSchema()
query = kafkaDf.select(
    concat(col("topic"), lit(':'), col("partition").cast("string")).alias("topic_partition"),
    col("offset"),
    col("value").cast("string"),
    col("timestamp"),
    col("timestampType")
    ).writeStream\
      .format("console")\
      .option("checkpointLocation", "/tmp") \
      .start()



True
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



25/02/23 17:50:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/23 17:50:20 WARN StreamingQueryManager: Stopping existing streaming query [id=07d4ecd4-f3c5-4a24-87d0-59ce6eb47af9, runId=a84cc6d3-9791-40d0-b215-e689d7d9473a], as a new run is being started.


## Structure the value column

In [11]:
import re
from pyspark.sql.functions  import split, from_csv
col_schema = ["line_no INTEGER","host STRING","time TIMESTAMP","method STRING","url STRING","response INTEGER","bytes INTEGER"]
schema_str = ",".join(col_schema)

df_csv = query.select(from_csv(query.value, schema_str).alias("value_parsed"))
df_csv.show()


#split_col = split(query['value'], ',')
#df = df.withColumn('NAME1', split_col.getItem(0))
#df = df.withColumn('NAME2', split_col.getItem(1))

#filter_pattern = re.compile(".*GET,/.*\.html")
#match_pattern = re.compile('GET,/.*\.html')
# select only lines with get Requests
#filtered = input.filter(lambda s: filter_pattern.match(s)) 

# search for the match_pattern in each line and return the matching part as a key/value pair e.g. ('/ksc.html', 1) 
#pages = filtered.map(lambda s: (match_pattern.search(s).group()[4:], 1))
# sum up the entries with the same key
#popularity = pages.reduceByKey(lambda a, b: a+b)

AttributeError: 'StreamingQuery' object has no attribute 'select'

In [None]:
query.awaitTermination()