In [68]:
from time import sleep
from json import dumps
from kafka import KafkaProducer

DER_TOPIC = 'der_item'

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))
producer

<kafka.producer.kafka.KafkaProducer at 0x126dafd90>

In [69]:
import os
from time import strftime
import datetime
from faker import Faker
from faker.providers import BaseProvider
import random
import csv


class GenreProvider(BaseProvider):
    def movie_genre(self):
        return random.choice(['Documentary', 'Thriller', 'Mystery', 'Horror', 'Action', 'Comedy', 'Drama', 'Romance'])


class LanguageProvider(BaseProvider):
    def language(self):
        return random.choice(['English', 'Chinese', 'Italian', 'Spanish', 'Hindi', 'Japanese'])


fake = Faker()

fake.add_provider(GenreProvider)
fake.add_provider(LanguageProvider)


def get_movie_name():
    words = fake.words()
    capitalized_words = list(map(str.capitalize, words))
    return ' '.join(capitalized_words)


def get_movie_date():
    return fake.date_time_this_decade().strftime("%B %d, %Y")


def get_movie_len():
    return random.randrange(50, 150)


def get_movie_rating():
    return round(random.uniform(1.0, 5.0), 1)


def generate_movie():
    return [get_movie_name(), fake.movie_genre(), get_movie_date(), get_movie_len(), get_movie_rating(),
            fake.language()]


def write_csv(file_name='movie_data.csv', folder='data', row_count=10):
    file_path = os.path.join(folder, file_name)
    with open(file_path, 'w') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Title', 'Genre', 'Premiere', 'Runtime', 'IMDB_Score', 'Language'])
        for n in range(1, row_count):
            writer.writerow(generate_movie())
    return file_path

In [70]:
def generate_fake_fn(word_count=3):
    f = '-'.join(fake.words(word_count))
    return f'{f}.csv'


generate_fake_fn()

'stock-office-trouble.csv'

In [71]:
csv_files_topic = 'csv_files'

In [72]:
write_csv()

'data/movie_data.csv'

In [73]:

for i in range(1, 10):
    fn = generate_fake_fn()
    fp = write_csv(file_name=fn, row_count=5)
    data = {'filename': fp}
    producer.send(csv_files_topic, value=data)
    print('.', end='.')

..................

In [None]:
from kafka import KafkaConsumer
from json import loads
import pandas as pd

consumer = KafkaConsumer(
    csv_files_topic,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
    fn = message.value['filename']
    # print(fn)
    df = pd.read_table(fn, sep=",")
    df = df.reset_index()
    [print(row_dict) for row_dict in df.to_dict(orient="records")]
    print('.', end='.')
    [producer.send(DER_TOPIC, value=row_dict) for row_dict in df.to_dict(orient="records")]

{'index': 0, 'Title': 'Through Vote Bag', 'Genre': 'Drama', 'Premiere': 'January 02, 2020', 'Runtime': 128, 'IMDB_Score': 3.5, 'Language': 'English'}
{'index': 1, 'Title': 'Short Newspaper Week', 'Genre': 'Mystery', 'Premiere': 'March 21, 2022', 'Runtime': 103, 'IMDB_Score': 3.9, 'Language': 'Italian'}
{'index': 2, 'Title': 'Without Throughout Car', 'Genre': 'Horror', 'Premiere': 'July 19, 2021', 'Runtime': 62, 'IMDB_Score': 2.1, 'Language': 'English'}
{'index': 3, 'Title': 'Make Military Song', 'Genre': 'Documentary', 'Premiere': 'April 26, 2021', 'Runtime': 75, 'IMDB_Score': 4.2, 'Language': 'English'}
..{'index': 0, 'Title': 'Too Cost Surface', 'Genre': 'Action', 'Premiere': 'March 31, 2020', 'Runtime': 70, 'IMDB_Score': 5.0, 'Language': 'English'}
{'index': 1, 'Title': 'Indicate Scientist Wide', 'Genre': 'Action', 'Premiere': 'October 06, 2021', 'Runtime': 116, 'IMDB_Score': 4.9, 'Language': 'English'}
{'index': 2, 'Title': 'Forward Project Exactly', 'Genre': 'Documentary', 'Premie

In [None]:
def process(data=None):
    sleep_duration = int(random.randint(20, 100) / 60)
    print(f'z' * sleep_duration, end='')
    sleep(sleep_duration)
    print(f'z' * sleep_duration, end='')


process()

In [None]:


DERConsumer = KafkaConsumer(
    DER_TOPIC,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in DERConsumer:
    data = message.value
    print('.', end='')
    process(data)
