In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.types import *
from confluent_kafka import Consumer

conf = {'bootstrap.servers': "localhost:9092",
        'group.id': "foo",
        'auto.offset.reset': 'smallest'}

consumer = Consumer(conf)

In [39]:
consumer

<cimpl.Consumer at 0x7f7e7c4d0e00>

In [40]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaStreamReader") \
    .getOrCreate()

In [41]:
# Define the schema for the DataFrame
schema = StructType([
    StructField("TvC", StringType(), nullable=False),
    StructField("date", StringType(), nullable=False),
    StructField("traffic_source", StringType(), nullable=False),
    StructField("device_type", StringType(), nullable=False),
    StructField("browser_language", StringType(), nullable=False),
    StructField("login_y_n", StringType(), nullable=False),
    StructField("region", StringType(), nullable=False),
    StructField("return_y_n", StringType(), nullable=False),
    StructField("conversion", StringType(), nullable=False)
])

# Define the column names
column_names = ['TvC', 'date', 'traffic_source', 'device_type', 'browser_language', 'login_y_n', 'region', 'return_y_n', 'conversion']

#Creates Empty RDD
emptyRDD = spark.sparkContext.emptyRDD()

#Create empty DataFrame from empty RDD
df = spark.createDataFrame(emptyRDD,schema)

df.printSchema()

root
 |-- TvC: string (nullable = false)
 |-- date: string (nullable = false)
 |-- traffic_source: string (nullable = false)
 |-- device_type: string (nullable = false)
 |-- browser_language: string (nullable = false)
 |-- login_y_n: string (nullable = false)
 |-- region: string (nullable = false)
 |-- return_y_n: string (nullable = false)
 |-- conversion: string (nullable = false)



In [42]:
import csv

running = True

def basic_consume_loop(consumer, topics, df):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=3.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                print(msg.topic(), msg.partition(), msg.offset(), msg.key(), msg.value().decode('utf-8'))
                print('==========')
                
                # Split the row string into a list of values
                row_values = msg.value().decode('utf-8').split(",")
                
                # Check if the message has the correct number of values
                if len(row_values) != len(column_names):
                    print(f"Skipping malformed message: {msg.value().decode('utf-8')}")
                    continue

                # Convert the list of values to a single-row DataFrame
                row_df = spark.createDataFrame([row_values], column_names)
                
                write_to_csv(msg.value().decode('utf-8'))

                # Append the row DataFrame to the main DataFrame
                df = df.union(row_df)
                
                # Show the resulting DataFrame
                #df.show()
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

def shutdown():
    running = False
    
def write_to_csv(new_row):
    # Define the CSV file path
    csv_file = 'Data/data.csv'

    # Define the column names
    fieldnames = ['TvC', 'date', 'traffic_source', 'device_type', 'browser_language', 'login_y_n', 'region', 'return_y_n', 'conversion']

    # Check if the file exists
    file_exists = os.path.isfile(csv_file)

    # Open the file in append mode and write the header if the file is empty
    with open(csv_file, 'a', newline='') as file:
    
        writer = csv.DictWriter(file, fieldnames=fieldnames)

        # Check if the file is empty
        file.seek(0)
        #is_empty = len(file.read()) == 0

        if not file_exists:
            writer.writeheader()
        
        # Split the input and create a dictionary using fieldnames as keys
        row_data = dict(zip(fieldnames, new_row.split(',')))

        # Write the new row to the CSV file
        writer.writerow(row_data)

    print("Rows added to the CSV file.")

In [None]:
basic_consume_loop(consumer, ['sparkab'], df)

sparkab 0 391 None C,2021-02-06,Email Marketing,Android,Spanish,n,Southwest,y,0
Rows added to the CSV file.
sparkab 0 392 None C,2021-02-07,Organic,Iphone,English,n,West,n,0
Rows added to the CSV file.
sparkab 0 393 None V1,2021-02-16,Organic,Desktop,English,y,Southeast,n,0
Rows added to the CSV file.
sparkab 0 394 None C,2021-02-04,Referrals,Mweb,English,n,Northeast,y,1
Rows added to the CSV file.
sparkab 0 395 None C,2021-02-13,Social Media,Mweb,Spanish,y,Southeast,n,0
Rows added to the CSV file.
sparkab 0 396 None V1,2021-02-04,Direct,Desktop,Spanish,n,Southwest,y,1
Rows added to the CSV file.
sparkab 0 397 None V1,2021-02-11,Organic,Iphone,English,n,West,y,0
Rows added to the CSV file.
sparkab 0 398 None C,2021-02-07,Organic,Iphone,English,n,West,y,0
Rows added to the CSV file.
sparkab 0 399 None C,2021-02-07,Social Media,Mweb,Other,y,Northeast,n,0
Rows added to the CSV file.
sparkab 0 400 None C,2021-02-10,Direct,Android,English,n,Southwest,n,0
Rows added to the CSV file.
sparkab 

sparkab 0 467 None V1,2021-02-09,Organic,Mweb,English,n,Southwest,y,0
Rows added to the CSV file.
sparkab 0 468 None V1,2021-02-01,Organic,Desktop,English,n,Southwest,y,0
Rows added to the CSV file.
sparkab 0 469 None V1,2021-02-07,Email Marketing,Mweb,English,n,Southeast,y,0
Rows added to the CSV file.
sparkab 0 470 None C,2021-02-02,Direct,Mweb,Spanish,y,West,n,0
Rows added to the CSV file.
sparkab 0 471 None C,2021-02-11,Organic,Android,English,n,West,y,0
Rows added to the CSV file.
sparkab 0 472 None C,2021-02-09,Direct,Iphone,Spanish,n,Southeast,n,1
Rows added to the CSV file.
sparkab 0 473 None V1,2021-02-12,Social Media,Iphone,Spanish,n,Southwest,y,1
Rows added to the CSV file.
sparkab 0 474 None V1,2021-02-13,Direct,Desktop,Spanish,y,West,n,0
Rows added to the CSV file.
sparkab 0 475 None C,2021-02-16,Email Marketing,Mweb,English,y,West,n,0
Rows added to the CSV file.
sparkab 0 476 None V1,2021-02-13,Direct,Desktop,Spanish,n,West,y,1
Rows added to the CSV file.
sparkab 0 477 No