Skip to content
kafka php client
PHP
Branch: master
Clone or download
Latest commit a8f5b01 Mar 31, 2018
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
docs modify qq group Mar 29, 2018
example Fix coding standard issues Feb 11, 2018
src Merge pull request #196 from simPod/connection-exception Mar 31, 2018
tests Add record validation Feb 12, 2018
.coveralls.yml Simplify the execution of tests Dec 3, 2017
.gitattributes Use infection for mutation testing Dec 3, 2017
.gitignore Use infection for mutation testing Dec 3, 2017
.travis.yml Reduce the mutation tests required metrics Jan 4, 2018
Dockerfile Implement functional tests Dec 10, 2017
LICENSE Update LICENSE Feb 16, 2017
README.md Update README.md Mar 27, 2018
README_CH.md Update README_CH.md Mar 27, 2018
composer.json Use doctrine/coding-standard:^2.1 as CS Jan 29, 2018
docker-compose.yml Implement functional tests Dec 10, 2017
infection.json.dist Reorganise test suites Dec 10, 2017
phpcs.xml.dist Use doctrine/coding-standard:^2.1 as CS Jan 29, 2018
phpstan.neon Improve configuration tests Jan 4, 2018
phpunit.xml.dist

README.md

Kafka-php

中文文档

QQ Group Build Status Packagist Packagist Packagist GitHub issues GitHub forks GitHub stars GitHub license

Kafka-php is a pure PHP kafka client that currently supports greater than 0.8.x version of Kafka, this project v0.2.x and v0.1.x are incompatible if using the original v0.1.x You can refer to the document Kafka PHP v0.1.x Document, but it is recommended to switch to v0.2.x . v0.2.x use PHP asynchronous implementation and kafka broker interaction, more stable than v0.1.x efficient, because the use of PHP language so do not compile any expansion can be used to reduce the access and maintenance costs

Requirements

  • Minimum PHP version: 7.1
  • Kafka version greater than 0.8
  • The consumer module needs kafka broker version greater than 0.9.0

Installation

Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).

Composer Install

Simply add a dependency nmred/kafka-php to your project if you use Composer to manage the dependencies of your project.

$ composer require nmred/kafka-php

Here is a minimal example of a composer.json file :

{
	"require": {
		"nmred/kafka-php": "0.2.*"
	}
}

Configuration

Configuration properties are documented in Configuration

Producer

Asynchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer(
    function() {
        return [
            [
                'topic' => 'test',
                'value' => 'test....message.',
                'key' => 'testkey',
            ],
        ];
    }
);
$producer->setLogger($logger);
$producer->success(function($result) {
	var_dump($result);
});
$producer->error(function($errorCode) {
		var_dump($errorCode);
});
$producer->send(true);

Synchronous mode

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9192');
$config->setBrokerVersion('1.0.0');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$producer = new \Kafka\Producer();
$producer->setLogger($logger);

for($i = 0; $i < 100; $i++) {
    $producer->send([
        [
            'topic' => 'test1',
            'value' => 'test1....message.',
            'key' => '',
        ],
    ]);
}

Consumer

<?php
require '../vendor/autoload.php';
date_default_timezone_set('PRC');
use Monolog\Logger;
use Monolog\Handler\StdoutHandler;
// Create the logger
$logger = new Logger('my_logger');
// Now add some handlers
$logger->pushHandler(new StdoutHandler());

$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('10.13.4.159:9192');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
//$config->setOffsetReset('earliest');
$consumer = new \Kafka\Consumer();
$consumer->setLogger($logger);
$consumer->start(function($topic, $part, $message) {
	var_dump($message);
});

Low-Level API

Refer Example

QQ Group

Group 1: 531522091 Group 2: 657517955 QQ Group

You can’t perform that action at this time.