In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
cassandra_host = "cassandra"
cassandra_user = "cassandra"
cassandra_pwd  = "cassandra"
cassandra_port = 9042
key_space      = "loganalysis"
table_name     = "nasalog"
kafka_server   = "kafka:9092"
kafka_topic    = "nasa_logs"

In [None]:
spark = SparkSession.builder.appName("log_analytics").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0,com.datastax.spark:spark-cassandra-connector-driver_2.12:3.0.0").\
config("spark.cassandra.connection.host",cassandra_host).\
config("spark.cassandra.auth.username",cassandra_user).\
config("spark.cassandra.auth.password",cassandra_pwd).\
getOrCreate()



kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic) \
    .load()

# Cast the Kafka message value as a string
kafka_stream = kafka_stream.selectExpr("CAST(value AS STRING) as message")

In [None]:
# Preprocess the text data and filter out rows with null values
preprocessed_stream = kafka_stream.select(
    regexp_extract('message', r'^([^\s]+\s)', 1).alias('host'),
    to_timestamp(regexp_extract('message', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1), 'dd/MMM/yyyy:HH:mm:ss Z').alias('time'),
    regexp_extract('message', r'"(.*?) [^"]+"', 1).alias('method'),
    regexp_extract('message', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
    regexp_extract('message', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
    regexp_extract('message', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'),
).filter(
    col('method').isNotNull() &
    col('host').isNotNull() &
    col('time').isNotNull() &
    col('path').isNotNull() &
    col('status').isNotNull() &
    col('content_size').isNotNull()
)

In [None]:
# Define the Cassandra write configuration
cassandra_write_config = {
    "keyspace": key_space,
    "table": table_name,
    "mode": "append",
    "spark.cassandra.connection.host": cassandra_host,
    "spark.cassandra.auth.username": cassandra_user,
    "spark.cassandra.auth.password": cassandra_pwd,
}

# Write the preprocessed stream to Cassandra
query_cassandra = preprocessed_stream.writeStream \
    .foreachBatch(lambda batch_df, batch_id: batch_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(**cassandra_write_config) \
        .mode("append") \
        .save()) \
    .outputMode("append") \
    .start()

In [None]:
# Define the HDFS write configuration
hdfs_write_config = {
    "path": "hdfs://namenode:8020/output/nasa_logs/", 
    "format": "csv",
}

# Write the preprocessed stream to HDFS
query_hdfs = preprocessed_stream.writeStream \
    .format("csv") \
    .options(**hdfs_write_config) \
    .outputMode("append") \
    .option("checkpointLocation", "checkpoint/data") \
    .start()

In [None]:
# Start both streaming queries
query_cassandra.awaitTermination()
query_hdfs.awaitTermination()