Skip to content

Commit

Permalink
Merge pull request #839 from tulios/improve-fetch-topic-offset-types
Browse files Browse the repository at this point in the history
Improve types
  • Loading branch information
Nevon committed Aug 20, 2020
2 parents 2c9cddb + d698c04 commit 983c3d7
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 11 deletions.
12 changes: 11 additions & 1 deletion src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ const findTopicPartitions = async (cluster, topic) => {
.sort()
}

/**
*
* @param {Object} params
* @param {import("../../types").Logger} params.logger
* @param {import('../instrumentation/emitter')} [params.instrumentationEmitter]
* @param {import('../../types').RetryOptions} params.retry
* @param {import("../../types").Cluster} params.cluster
*
* @returns {import("../../types").Admin}
*/
module.exports = ({
logger: rootLogger,
instrumentationEmitter: rootInstrumentationEmitter,
Expand Down Expand Up @@ -285,7 +295,7 @@ module.exports = ({

/**
* @param {string} topic
* @param {number=} timestamp
* @param {number} [timestamp]
*/

const fetchTopicOffsetsByTimestamp = async (topic, timestamp) => {
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const mergeTopics = (obj, { topic, partitions }) => ({
* @param {number} metadataMaxAge - in milliseconds
* @param {boolean} allowAutoTopicCreation
* @param {number} maxInFlightRequests
* @param {IsolationLevel} isolationLevel
* @param {number} isolationLevel
* @param {Object} retry
* @param {Logger} logger
* @param {Map} offsets
Expand Down
18 changes: 18 additions & 0 deletions src/consumer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ const specialOffsets = [
]

/**
* @param {Object} params
* @param {import("../../types").Cluster} params.cluster
* @param {String} params.groupId
* @param {import('../../types').RetryOptions} params.retry
* @param {import('../../types').Logger} params.logger
* @param {import('../../types').PartitionAssigner[]} [params.partitionAssigners]
* @param {number} [params.sessionTimeout]
* @param {number} [params.rebalanceTimeout]
* @param {number} [params.heartbeatInterval]
* @param {number} [params.maxBytesPerPartition]
* @param {number} [params.minBytes]
* @param {number} [params.maxBytes]
* @param {number} [params.maxWaitTimeInMs]
* @param {number} [params.isolationLevel]
* @param {string} [params.rackId]
* @param {import('../instrumentation/emitter')} [params.instrumentationEmitter]
* @param {number} params.metadataMaxAge
*
* @returns {import("../../types").Consumer}
*/
module.exports = ({
Expand Down
3 changes: 0 additions & 3 deletions src/instrumentation/emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ const EventEmitter = require('events')
const InstrumentationEvent = require('./event')
const { KafkaJSError } = require('../errors')

/**
* @typedef InstrumentationEventEmitter
*/
module.exports = class InstrumentationEventEmitter {
constructor() {
this.emitter = new EventEmitter()
Expand Down
14 changes: 14 additions & 0 deletions src/producer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ const eventKeys = keys(events)

const { CONNECT, DISCONNECT } = events

/**
*
* @param {Object} params
* @param {import('../../types').Cluster} params.cluster
* @param {import('../../types').Logger} params.Logger
* @param {import('../../types').ICustomPartitioner} [params.createPartitioner]
* @param {import('../../types').RetryOptions} params.retry
* @param {boolean} [params.idempotent]
* @param {string} [params.transactionalId]
* @param {number} [params.transactionTimeout]
* @param {import('../instrumentation/emitter')} [params.instrumentationEmitter]
*
* @returns {import('../../types').Producer}
*/
module.exports = ({
cluster,
logger: rootLogger,
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/isolationLevel.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Enum for isolation levels
* @readonly
* @enum {IsolationLevel}
* @enum {number}
*/
module.exports = {
// Makes all records visible
Expand Down
11 changes: 6 additions & 5 deletions types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import * as tls from 'tls'
import * as net from 'net'

type Without<T, U> = { [P in Exclude<keyof T, keyof U>]?: never };
type XOR<T, U> = (T | U) extends object ? (Without<T, U> & U) | (Without<U, T> & T) : T | U;

export class Kafka {
constructor(config: KafkaConfig)
producer(config?: ProducerConfig): Producer
Expand Down Expand Up @@ -136,9 +139,7 @@ export type Cluster = {
topics: Array<{
topic: string
partitions: Array<{ partition: number }>
fromBeginning?: boolean
fromTimestamp?: number
}>
} & XOR<{ fromBeginning: boolean }, { fromTimestamp: number }>>
): Promise<{ topic: string; partitions: Array<{ partition: number; offset: string }> }>
}

Expand Down Expand Up @@ -423,7 +424,7 @@ export type Broker = {
maxWaitTime?: number,
minBytes?: number,
maxBytes?: number,
topics: Array<{ topic: string, partitions: Array<{ partition: number; fetchOffset: string; maxBytes: number }>}>,
topics: Array<{ topic: string, partitions: Array<{ partition: number; fetchOffset: string; maxBytes: number }> }>,
rackId?: string,
}): Promise<any>
}
Expand Down Expand Up @@ -797,7 +798,7 @@ export class KafkaJSUnsupportedMagicByteInMessageSet extends KafkaJSError {
}

export class KafkaJSDeleteGroupsError extends KafkaJSError {
constructor(e: Error | string, groups?: KafkaJSDeleteGroupsErrorGroups[] )
constructor(e: Error | string, groups?: KafkaJSDeleteGroupsErrorGroups[])
}

export interface KafkaJSDeleteGroupsErrorGroups {
Expand Down

0 comments on commit 983c3d7

Please sign in to comment.