Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

在swoole中,作为常驻进程,如何使用该扩展呢? #260

Open
laiason opened this issue Apr 13, 2019 · 7 comments
Open

在swoole中,作为常驻进程,如何使用该扩展呢? #260

laiason opened this issue Apr 13, 2019 · 7 comments

Comments

@laiason
Copy link

laiason commented Apr 13, 2019

最近使用php swoole做项目,启动一个常驻进程,需要把接口日志发送到kafka,问题如下:
1、在github上的示例,异步和同步的生成者方式每次都是new一个生成者。那么在常驻进程中,怎么实现一个生产者连接池,不需要每次都是new一个生成者。
2、使用异步生成者方式进行测试,发现每次会发送多次数据到kafka,这个是怎么回事呢?
3、使用同步方式,把kafka生产者赋给一个类属性,这样不需要每次都去new一个生产者,但是发现会报如下的错误:
Fatal error: Uncaught Kafka\Exception\Socket: After 4 attempts could not write 444 bytes to stream, completed writing only 0 bytes in kafka/nmred/kafka-php/src/Kafka/SocketSync.php:380

@laiason laiason changed the title 在swoole中,作为常驻进程,如何使用该扩展中呢? 在swoole中,作为常驻进程,如何使用该扩展呢? Apr 13, 2019
@lragon
Copy link

lragon commented Apr 19, 2019

use Kafka\Producer;
use Kafka\ProducerConfig;
use Monolog\Handler\ErrorLogHandler;
use Monolog\Logger;
use Swoole\Http\Request;
use Swoole\Http\Response;

//自动加载
require 'vendor/autoload.php';

$server = new \swoole_http_server("0.0.0.0", 9501);
$config = ProducerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('192.168.111.200:9092');
$config->setRequiredAck(1);
$config->setIsAsyn(false);
$config->setProduceInterval(500);
$config->setBrokerVersion('2.2.0');
$logger = new Logger('my_logger');
$logger->pushHandler(new ErrorLogHandler());

$producer = new Producer();
//$producer->setLogger($logger);

$server->on('request', function (Request $request, Response $response) use (&$producer) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $ret = $producer->send([[
        'topic' => 'test',
        'value' => 'test....message.',
        'key' => '',
    ]]);
    $response->end(json_encode($ret));
});
$server->start();

我也遇到这个问题,刚开始很好,但是空闲一段时间后,下一次访问就会报错,并且导致Worker退出,我猜想可能是Client丢失了连接

PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Notice:  fwrite(): send of 94 bytes failed with errno=32 Broken pipe in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 372
PHP Fatal error:  Uncaught Kafka\Exception\Socket: After 4 attempts could not write 94 bytes to stream, completed writing only 0 bytes in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php:380
Stack trace:
#0 /var/www/vendor/nmred/kafka-php/src/Kafka/Producer/SyncProcess.php(84): Kafka\SocketSync->write('\x00\x00\x00Z\x00\x00\x00\x02\x00\x00\x00\x00\x00\tk...')
#1 /var/www/vendor/nmred/kafka-php/src/Kafka/Producer.php(81): Kafka\Producer\SyncProcess->send(Array)
#2 /var/www/server.php(99): Kafka\Producer->send(Array)
#3 {main}
  thrown in /var/www/vendor/nmred/kafka-php/src/Kafka/SocketSync.php on line 380

@laiason
Copy link
Author

laiason commented Apr 20, 2019

@lragon
Copy link

lragon commented Apr 20, 2019

我试过php-rdkafka,像我上面那样写,多个请求共享一个连接,Kafka收不到数据,你是怎么解决共享连接的问题?

@laiason
Copy link
Author

laiason commented Apr 21, 2019

下面这个是我现在使用的:
`class kafka{
private $producer = null; // 生产者
private $partition = 0;
private $topic_conf = null;
private $topic_obj = null;

public function __construct($topic, $broker_list){
    if (empty($topic)) {
        throw new Exception('topic为空');
    }
    if (empty($broker_list)) {
        throw new Exception('broker_list为空');
    }

    $this->topic = $topic;
    $this->broker_list = $broker_list;

    $conf = new RdKafka\Conf();
    $conf->setDrMsgCb(function ($kafka, $message) {
        var_export($message, true);
        // file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
    });
    $conf->setErrorCb(function ($kafka, $err, $reason) {
        sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason);
        // file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
    });

    $this->producer = new RdKafka\Producer($conf);
    $this->producer->setLogLevel(LOG_DEBUG);
    $this->producer->addBrokers($this->broker_list);

    $this->topic_conf = new RdKafka\TopicConf();
    // -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
    // 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
    $this->topic_conf->set('request.required.acks', 0);

    $this->topic_obj = $this->producer->newTopic($this->topic, $this->topic_conf);
}
public function send($messages = []) {
    $this->topic_obj->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
    return true;
}

}`

然后再swoole中使用的方式:
public function onWorkerStart($server, $worker_id) { if ($server->taskworker) { // 创建kafka对象 require_once(__DIR__ . '/kafka.php'); $server->kafka = new kafka($CONFIG['kafka']['topic'], $CONFIG['kafka']['broker_list']); } }

每个task进程都有一个kafka连接,在task进程里面是可以一直使用这个连接的。

@lragon
Copy link

lragon commented Apr 21, 2019

@laiason 感谢,我的方法是把连接放在server上,同步阻塞的方式投递消息,我觉得使用同步方式投递消息是可以接受的,kafka本来就是一个队列系统,投递消息的代价很小,没有必要把投递消息的过程放到另一个进程里

use Swoole\Http\Request;
use Swoole\Http\Response;

//自动加载
require 'vendor/autoload.php';

$server = new \swoole_http_server("0.0.0.0", 9501);

$server->on('WorkerStart', function (\Swoole\Http\Server $server, int $workerId) {
    if ($server->topic == null) {
        $rk = new RdKafka\Producer();
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers("10.0.75.1:9092,10.0.75.1:9092");
        $topic = $rk->newTopic("test");
        $server->topic = $topic;
    }
});

$server->on('request', function (Request $request, Response $response) use ($server) {
    $response->header("Content-Type", "text/html; charset=utf-8");
    $server->topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");
    $response->end(json_encode([]));
});

$server->start();

@hhxsv5
Copy link

hhxsv5 commented Apr 21, 2019

  • 建议每个Worker共享一个Kafka 客户端,这样即便其中某个客户端连接挂了,不会影响其他Worker发送数据。
  • Kafka的调用放在end()之后,这样不会拖慢正常Http响应,类似于FPMfastcgi_finish_request
<?php
$http = new Swoole\Http\Server('127.0.0.1', 5200);
$http->on('workerStart', function (Swoole\Http\Server $server, $workerId) {
    global $topic;

    $brokerList = '192.168.15.83:9092';
    $topic = 'bi_test';

    $conf = new RdKafka\Conf();
    $conf->set('queue.buffering.max.messages', 1000000);

    $rk = new RdKafka\Producer($conf);
    $rk->setLogLevel(LOG_WARNING);
    $rk->addBrokers($brokerList);

    $topic = $rk->newTopic($topic);
});
$http->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
    global $topic;
    $payload = 'Message payload: ' . date('Y-m-d H:i:s ') . microtime(true);
    $response->end($payload);
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload);
});
$http->start();

@kirachen1991
Copy link

rdkafka的包太大了,打包docker的话要多出几百M

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants