In [1]:
! pip3 install -q -r ./requirements.txt

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [3]:
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers="kafka:9092",
    client_id='test'
)

topic_list = admin_client.list_topics()
print(topic_list)

def create_new_topic():
    try:
        topic_list = []
        topic_list.append(NewTopic(name="test", num_partitions=1, replication_factor=1))
        admin_client.create_topics(new_topics=topic_list, validate_only=False)
        return "Kafka Topic created" 
    except:
        return "Kafka Topic already exists"

create_new_topic()

['connect-offsets', 'connect-status', 'connect-config', '__consumer_offsets']


'Kafka Topic created'

In [4]:
admin_client.list_topics()

['test',
 'connect-config',
 'connect-status',
 'connect-offsets',
 '__consumer_offsets']

In [5]:
import os
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import concat_ws, col, when 

spark = SparkSession.builder \
        .appName("Spark Read Write") \
        .master("local[*]") \
        .getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("./data/e-shop_clothing_2008.csv", header=True, sep=";") 

# Print the schema of the DataFrame 
df.printSchema() 

# Show the first 5 rows of the DataFrame 
df.show(5)
 

# Create a DataFrame with the required schema
click_data_df = df.select(
    concat_ws("_", col("year"), col("month"), col("day"), col("order")).alias("click_id"),
    col("session ID").alias("user_id"),
    concat_ws(" ", col("year"), col("month"), col("day")).alias("timestamp"),
    col("page").alias("url")
)

# Show the first 5 rows of the DataFrame
click_data_df.show(5)
from pyspark.sql.functions import *

country_map = {1: 'Australia', 2: 'Austria', 3: 'Belgium', 4: 'British Virgin Islands', 5: 'Cayman Islands', 6: 'Christmas Island', 7: 'Croatia', 8: 'Cyprus', 9: 'Czech Republic', 10: 'Denmark', 11: 'Estonia', 12: 'unidentified', 13: 'Faroe Islands', 14: 'Finland', 15: 'France', 16: 'Germany', 17: 'Greece', 18: 'Hungary', 19: 'Iceland', 20: 'India', 21: 'Ireland', 22: 'Italy', 23: 'Latvia', 24: 'Lithuania', 25: 'Luxembourg', 26: 'Mexico', 27: 'Netherlands', 28: 'Norway', 29: 'Poland', 30: 'Portugal', 31: 'Romania', 32: 'Russia', 33: 'San Marino', 34: 'Slovakia', 35: 'Slovenia', 36: 'Spain', 37: 'Sweden', 38: 'Switzerland', 39: 'Ukraine', 40: 'United Arab Emirates', 41: 'United Kingdom', 42: 'USA', 43: 'biz (*.biz)', 44: 'com (*.com)', 45: 'int (*.int)', 46: 'net (*.net)', 47: 'org (*.org)'}

# get geo_data_df which contains the country name for each click_id 
geo_data_df = df.select(
    concat_ws("_", col("year"), col("month"), col("day"), col("order")).alias("click_id"),
    col("country").alias("geo")
)

# Show the first 5 rows of the DataFrame
geo_data_df.show(5)

# Join the two DataFrames on click_id to create a single DataFrame with the required schema
final_df = click_data_df.join(geo_data_df, "click_id", "inner")

# Show the first 5 rows of the DataFrame
final_df.show(5)

# convert the dataframe to json 
# final_df_json = final_df.toJSON()

# json_rdd = final_df.map(lambda row: json.dumps(json.loads(row), default=str))
# final_df_list = [row.asDict() for row in final_df.collect()]
# final_df_json = json.dumps(final_df_list)
# final_df_json.saveAsTextFile("./data/e-shop_clothing_2008.json")
# Set the number of partitions to use when writing to disk
num_partitions = 10

# Write the DataFrame to disk in small batches
final_df.repartition(num_partitions).write.json("./data/e-shop_clothing_2008.json", mode="overwrite")

# # Read the JSON file back into a DataFrame
# final_df_json = spark.read.json("./data/e-shop_clothing_2008.json")

# # Convert the DataFrame to a list of dictionaries
# final_df_list = [row.asDict() for row in final_df_json.collect()]

# # Serialize the list of dictionaries to JSON
# final_df_json = json.dumps(final_df_list)
# print(final_df_json.collect())


root
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- order: string (nullable = true)
 |-- country: string (nullable = true)
 |-- session ID: string (nullable = true)
 |-- page 1 (main category): string (nullable = true)
 |-- page 2 (clothing model): string (nullable = true)
 |-- colour: string (nullable = true)
 |-- location: string (nullable = true)
 |-- model photography: string (nullable = true)
 |-- price: string (nullable = true)
 |-- price 2: string (nullable = true)
 |-- page: string (nullable = true)

+----+-----+---+-----+-------+----------+----------------------+-----------------------+------+--------+-----------------+-----+-------+----+
|year|month|day|order|country|session ID|page 1 (main category)|page 2 (clothing model)|colour|location|model photography|price|price 2|page|
+----+-----+---+-----+-------+----------+----------------------+-----------------------+------+--------+-----------------+-----+-------+

In [6]:
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient 
from kafka.admin import NewTopic 
from kafka.errors import TopicAlreadyExistsError 
import ujson
import logging 
import time 
from typing import Dict, List
import json 

class KafkaAdapter:
    def __init__(self, 
                  value_deserializer = lambda x:ujson.loads(x.decode('utf-16')),
                  value_serializer = lambda value: ujson.dumps(value).encode('utf-16')
                  ):
        self.name = "KafkaAdapter"
        self.value_deserializer = value_deserializer 
        self.value_serializer = value_serializer

    def produce(self, bootstrap_servers: List, topic_name, data, partition):
        producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                 value_serializer=self.value_serializer,
                                 acks="all",
                                 retries=3
                                 )
    
        try:
            producer.send(topic=topic_name, value=data, partition=partition)
            logging.info(f"Data: {data} Sent to topic: {topic_name} on partition: {partition}")

        except Exception as e:
            logging.error(f"Error while sending data to topic: {topic_name} on partition: {partition} with error: {e}")
        
        finally:
            producer.close()

    def consume(self, bootstrap_servers: List, topic_name, group_id, auto_offset_reset="earliest", enable_auto_commit=True, customer_timeout=3000):
        try:
            consumer = KafkaConsumer(
                topic_name,
                bootstrap_servers=bootstrap_servers,
                group_id=group + topic_name,
                value_deserializer=self.value_deserializer,
                enable_auto_commit=enable_auto_commit,
                auto_offset_reset=auto_offset_reset,
                customer_timeout=customer_timeout
            )
            logging.info(f"Data consumed from {topic_name}")
            return consumer
        except Exception as e:
            logging.error(f"Error while consuming data from {topic_name} with error: {e}")

    topic_config_list = [{"topic_name": "click_data", "num_partitions": 1, "replication_factor": 1}]

    @staticmethod
    def create_topic(bootstrap_servers: List, topic_config_list:List[Dict]):
        client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
        topics, topics_list = [], []
        for topic_config in topic_config_list:
            new_topic = NewTopic(name=topic_config["topic_name"], num_partitions=topic_config["num_partitions"], replication_factor=topic_config["replication_factor"])
            topics.append(new_topic)
            topics_list.append(topic_config["topic_name"])
        try:
            client.create_topics(new_topics=topics)
            logging.info(f"Topics {topics_list} created successfully")
        except TopicAlreadyExistsError as e:
            client.delete_topics(topics_list, timeout_ms=1000)
            time.sleep(1)
            client.create_topics(new_topics=topics)
            logging.info(f"Topics {topics_list} deleted and recreated successfully")

    def create_producer(self, bootstrap_servers: List):
        try:
            producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                                 value_serializer=self.value_serializer,
                                 acks="all",
                                 retries=3
                                 )
            return producer
        except Exception as e:
            logging.error(f"Error while creating producer with error: {e}")

adapter = KafkaAdapter() 
bootstrap_servers = ["kafka:9092"]
topic_config_list = [{"topic_name": "click_data", "num_partitions": 1, "replication_factor": 1}]
def init_kafka():
    try:
        print("Creating topic")
        logging.info("Creating a topic in kafka")
        adapter.create_topic(bootstrap_servers=bootstrap_servers, topic_config_list=topic_config_list)
    except Exception as e:
        logging.error(f"Error while creating topic with error: {e}")

def send_to_kafka(data):
    try:
        adapter.produce(bootstrap_servers=bootstrap_servers, topic_name="click_data", directory_location=data, partition= 0)
    except Exception as e:
        logging.error(f"Error while sending data to kafka with error: {e}")

import os

def send_to_kafka(data_dir):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
        .appName("Spark Read Write") \
        .master("local[*]") \
        .getOrCreate()
    
    for filename in os.listdir(data_dir):
        if filename.endswith(".json"):
            file_path = os.path.join(data_dir, filename)
            print(f"this is the file {file_path}")
            df = spark.read.json(file_path)
            for row in df.collect():
                # print(row)
                try:
                    adapter.produce(
                        bootstrap_servers=bootstrap_servers,
                        topic_name="click_data",
                        data=row.asDict(),
                        partition=0
                    )
                    logging.info(f"Data: {row} Sent to topic: click_data on partition: 0")
                except Exception as e:
                    logging.error(f"Error while sending data to kafka with error: {e}")

            

init_kafka()
send_to_kafka("./data/e-shop_clothing_2008.json/")

Creating topic
this is the file ./data/e-shop_clothing_2008.json/part-00000-3378a02e-3d33-4d9d-b84c-bac17fbf29d3-c000.json


KeyboardInterrupt: 

In [7]:
admin_client.list_topics()

['click_data',
 'test',
 'connect-config',
 'connect-status',
 'connect-offsets',
 '__consumer_offsets']

In [8]:
admin_client.delete_topics(topics = ["test"])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='test', error_code=0)])

In [9]:
admin_client.list_topics()

['click_data',
 'connect-config',
 'connect-status',
 'connect-offsets',
 '__consumer_offsets']

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, struct

spark_read_stream = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \
    .getOrCreate()

def read_stream_from_kafka():    
    spark_read = spark_read_stream \
                    .readStream \
                    .format("kafka") \
                    .option("kafka.bootstrap.servers", "kafka:9092") \
                    .option("subscribe", "click_data") \
                    .load() 
                    # .select(from_json(col("value").cast("string"), json_schema).alias("parsed_data"))
    
    print(spark_read.isStreaming)

    spark_read.printSchema()

#     spark_read.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
#         .select(from_json(col("value"), struct).alias("data"))
    
#     spark_read.printSchema()
    return spark_read

spark_kafka_consume_read  = read_stream_from_kafka()

True
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [26]:
# def writeToElasticsearch(df):

#     df.write \
#         .format("org.elasticsearch.spark.sql") \
#         .option("es.nodes", "localhost") \
#         .option("es.port", "9200") \
#         .option("es.resource", "office-index") \
#         .mode("append") \
# #         .save()


# streamingQuery = spark_kafka_consume_read.writeStream \
#     .outputMode("append") \
#     .format("json") \
#     .option("es.nodes", "localhost") \
#     .option("es.port", "9200") \
#     .start()
#     # .option("checkpointLocation", checkpointDir) \
#     # .option("numRows", 4) \
#     # .option("truncate", False) \
#     # .start()

# streamingQuery.awaitTermination()
spark_write_stream = SparkSession.builder \
    .appName("MyAppspark_write") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \
    .getOrCreate()


def write_stream_to_elasticsearch(spark_read): 

    spark_read \
        .writeStream \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes", "elasticsearch") \
        .option("es.port", "9200") \
        .option("checkpointLocation", "checkpoint") \
        .option("es.resource", "click_data") \
        .start()
    
    spark_read.awaitTermination()

write_stream_to_elasticsearch(spark_kafka_consume_read)


Py4JJavaError: An error occurred while calling o228.start.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.elasticsearch.spark.sql. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:367)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.spark.sql.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 14 more
