Skip to content

Commit

Permalink
Merge pull request #32 from oslabs-beta/pre-main
Browse files Browse the repository at this point in the history
Pre main
  • Loading branch information
samarnold723 committed Oct 15, 2021
2 parents 95643b1 + 5a3dac1 commit 5430d8f
Show file tree
Hide file tree
Showing 431 changed files with 1,549 additions and 37,676 deletions.
2 changes: 2 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ MIT License

Copyright (c) 2021 OSLabs Beta

Copyright (c) 2018 Túlio Ornelas (ornelas.tulio@gmail.com)

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
Expand Down
65 changes: 0 additions & 65 deletions examples/admin.js

This file was deleted.

84 changes: 0 additions & 84 deletions examples/consumer.ts

This file was deleted.

38 changes: 0 additions & 38 deletions examples/dinosaurs.ts

This file was deleted.

30 changes: 24 additions & 6 deletions examples/example_consumer.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//deno-lint-ignore-file require-await
import { Kafka,logLevel } from '../index.ts';
import prettyConsolelogger2 from './prettyConsoleLogger2.ts'

//declare host name
const host = 'localhost'

//intialize broker
const kafka = new Kafka({
logLevel: logLevel.INFO,
logLevel: logLevel.DEMO,
logCreator: prettyConsolelogger2,
brokers: [`${host}:9092`],
clientId: 'example-consumer',
})

//declare topic name
const topic = 'topic-test'
//initialize producer and group ID
//initialize consumer and group ID
const consumer = kafka.consumer({ groupId: 'test-group' })

//main function to be run
Expand All @@ -21,11 +24,26 @@ const run = async () => {
await consumer.connect()
//subscribe to topic
await consumer.subscribe({ topic, fromBeginning: true })
//run a console.log on eachMessage
//run eachMessage function
await consumer.run({
//deno-lint-ignore require-await
eachMessage: async ({ message }: any) => {
console.log(message.key.toString(), message.value.toString())
eachMessage: async ({ topic, partition, message }: any) => {
//msgNumber++
kafka.logger().info('Message processed', {
topic,
partition,
offset: message.offset,
timestamp: message.timestamp,
headers: Object.keys(message.headers).reduce(
(headers, key) => ({
...headers,
[key]: message.headers[key].toString(),
}),
{}
),
key: message.key.toString(),
value: message.value.toString(),
//msgNumber,
})
},
})
}
Expand Down
25 changes: 17 additions & 8 deletions examples/example_producer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Kafka, logLevel } from '../index.ts';
import dinos from './dinosaurs.ts'
import prettyConsolelogger2 from './prettyConsoleLogger2.ts'

//declare host name
const host = 'localhost'

//intialize broker
const kafka = new Kafka({
logLevel: logLevel.DEBUG,
logLevel: logLevel.DEMO,
logCreator: prettyConsolelogger2,
brokers: [`${host}:9092`],
clientId: 'example_producer',
})
Expand All @@ -18,35 +19,43 @@ const producer = kafka.producer()

//helper functions for creating our randomized message
const getRandomNumber = () => Math.round(Math.random() * 1000);
const getRandomDino = () => dinos[Math.floor(Math.random() * (dinos.length - 1))]

//function for creating randomized message
const createMessage = (num: number, dino: string) => ({
const createMessage = (num: number) => ({
key: `key-${num}`,
value: `value-${num}-${dino}-${new Date().toISOString()}`,
value: `value-${num}-${new Date().toISOString()}`,
headers: {
'correlation-id': `${num}-${Date.now()}`,
},
})

//counters for logging purposes
let requestNumber = 0

//function for sending messages
const sendMessage = () => {
//create an array of messages
//create a randomly sized array of messages
const messages = Array(getRandomNumber())
.fill(undefined)
.map((_) => createMessage(getRandomNumber(), getRandomDino()))
.map((_) => createMessage(getRandomNumber()))
//increment the request number
const requestId = requestNumber++
//send the messages to the topic
kafka.logger().info(`Sending ${messages.length} messages on request number #${requestId}...`)
return producer
.send({
topic,
messages
})
.then(response => {
kafka.logger().info(`Message successfully sent to topic: ${response[0].topicName} on partition ${response[0].partition} `)
})
.catch((e: Error) =>
console.log(e)
)
}

//main function to be run
//main function to be run in setinterval
const run = async () => {
await producer.connect()
setInterval(sendMessage, 5000)
Expand Down
Loading

0 comments on commit 5430d8f

Please sign in to comment.