Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipedrive activities support #7233

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 97 additions & 2 deletions plugins/MauticCrmBundle/Api/PipedriveApi.php
Expand Up @@ -11,13 +11,18 @@ class PipedriveApi extends CrmApi
const ORGANIZATIONS_API_ENDPOINT = 'organizations';
const PERSONS_API_ENDPOINT = 'persons';
const USERS_API_ENDPOINT = 'users';
const ACTIVITY_TYPES = 'activityTypes';
const ACTIVITIES = 'activities';
/**
* @var TransportInterface
*/
private $transport;

private $apiFields = [];

/** @var null|array */
private $activities = null;

/**
* PipedriveApi constructor.
*
Expand Down Expand Up @@ -168,8 +173,7 @@ public function getDataByEndpoint(array $query, $endpoint)
'query' => array_merge($this->getAuthQuery(), $query),
];

$url = sprintf('%s/%s', $this->integration->getApiUrl(), $endpoint);

$url = sprintf('%s/%s', $this->integration->getApiUrl(), $endpoint);
$response = $this->transport->get($url, $params);

return json_decode($response->getBody(), true);
Expand Down Expand Up @@ -230,6 +234,97 @@ private function getRequestParameters(array $data = [])
];
}

/**
* @param array $data
*
* @return array
*/
public function addActivity($data)
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
{
$params = $this->getRequestParameters($data);
$url = sprintf('%s/%s', $this->integration->getApiUrl(), self::ACTIVITIES);
$response = $this->transport->post($url, $params);

return $data = $this->getResponseData($response);
}

/**
* @param array $event
*
* @return array|mixed
*/
public function getActivityType($event)
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
{
$activityName = $event['eventType'];
$activities = $this->getActivityTypes();
// return If already exists
if (isset($activities[$activityName])) {
return $activities[$activityName];
}

return false;
}

/**
* Create activity log on Pipedrive.
*
* @param array $event
*
* @return array
*/
public function createActivityType($event)
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
{
$icon = 'pricetag';

switch ($event['event']) {
case 'email.sent':
case 'email.read':
case 'email.replied':
case 'email.failed':
$icon = 'email';
break;
case 'campaign.event':
case 'campaign.event.scheduled':
$icon = 'task';
break;
case 'lead.source.identified':
case 'lead.source.created':
case 'segment_membership':
case 'lead.donotcontact':
$icon = 'addressbook';
break;
}

$activityName = $event['eventType'];
$data = [];
$data['name'] = $activityName;
$data['icon_key'] = $icon;
$data['color'] = '4e5e9e';

$params = $this->getRequestParameters($data);
$url = sprintf('%s/%s', $this->integration->getApiUrl(), self::ACTIVITY_TYPES);
$response = $this->transport->post($url, $params);

$data = $this->getResponseData($response);

return $this->activities[$activityName] = $data;
}

/**
* @return array
*/
private function getActivityTypes()
{
if (null !== $this->activities) {
return $this->activities;
}

$activities = $this->getDataByEndpoint([], 'activityTypes');

//return data by name
return $this->activities = array_combine(array_column($activities['data'], 'name'), $activities['data']);
}

/**
* @return array
*/
Expand Down
138 changes: 138 additions & 0 deletions plugins/MauticCrmBundle/Command/PushActivitiesToPipedriveCommand.php
@@ -0,0 +1,138 @@
<?php

namespace MauticPlugin\MauticCrmBundle\Command;

use Mautic\CoreBundle\Helper\ProgressBarHelper;
use Mautic\PluginBundle\Entity\IntegrationEntity;
use MauticPlugin\MauticCrmBundle\Integration\Pipedrive\Export\ActivitiesPipedriveExport;
use MauticPlugin\MauticCrmBundle\Integration\Pipedrive\Export\LeadExport;
use MauticPlugin\MauticCrmBundle\Integration\PipedriveIntegration;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

class PushActivitiesToPipedriveCommand extends ContainerAwareCommand
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this->setName('mautic:integration:pipedrive:pushactivities')
->setDescription('Pushes the data from Mautic to Pipedrive')
->addOption(
'time-interval',
't',
InputOption::VALUE_OPTIONAL,
'Send time interval to check updates, it should be a correct php formatted time interval in the past eg:(15 minutes)',
'15 minutes'
);

parent::configure();
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$integrationHelper = $this->getContainer()->get('mautic.helper.integration');
/** @var PipedriveIntegration $integrationObject */
$integrationObject = $integrationHelper->getIntegrationObject(PipedriveIntegration::INTEGRATION_NAME);
$em = $this->getContainer()->get('doctrine')->getManager();

if (!$integrationObject->getIntegrationSettings()->getIsPublished()) {
$output->writeln('Pipedrive integration id disabled.');
kuzmany marked this conversation as resolved.
Show resolved Hide resolved

return;
}

$output->writeln('<comment>Push activities</comment>');

/** @var ActivitiesPipedriveExport $activitiesExport */
$activitiesExport = $this->getContainer()->get('mautic_integration.pipedrive.export.activities');
$activitiesExport->setIntegration($integrationObject);

$pushed = 0;
$start = 0;
$limit = 50;
kuzmany marked this conversation as resolved.
Show resolved Hide resolved

// now() - time-interval
$timeInterval = str_replace('--', '-', '-'.$input->getOption('time-interval'));
$lastPossibleSync = (new \DateTime())->setTimestamp(strtotime($timeInterval));

$progress = ProgressBarHelper::init($output, $limit);
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second param should be max count (the count of 100%). 50 is a wild guess of how many integration entities there are. You should count the amount of integration entities that will be processed instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is we don't know how many items will be processed.
We store lastActivitySync timestamp to internal serialize column.
This is reason why we have to run all integration entity in batch, then just skipped already updated https://github.com/mautic/mautic/pull/7233/files#diff-6bdf0596bca5f7bff88ba72d06c7683dR78

I know It's not the best solution, but I didn't better idea how to figure out.
Maybe we can use regexp on intetrnal column and this should work too.


while (true) {
$integrationEntities = $this->getIntegrationEntities($integrationObject, $start, $limit);

if (!$integrationEntities) {
break;
}

/** @var IntegrationEntity $integrationEntity */
foreach ($integrationEntities as $integrationEntity) {
$internal = $integrationEntity->getInternal();
$lastActivitySync = null;
if (isset($internal['last_activity_sync'])) {
$lastActivitySync = (new \DateTime())->setTimestamp($internal['last_activity_sync']);
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
// stop If last activity sync is over timeout
if ($lastActivitySync > $lastPossibleSync) {
continue;
}
}
if ($activitiesExport->createActivities($integrationEntity, $lastActivitySync) !== false) {
++$pushed;
if ($pushed % $limit == 0) {
$progress->setProgress($pushed);
}
}
}
$start = $start + $limit;
$em->clear();
}

$progress->finish();
$output->writeln('');
$output->writeln('Pushed activities to '.$pushed.' contacts');
$output->writeln('Execution time: '.number_format(microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'], 3));
}

/**
* @param PipedriveIntegration $integrationObject
* @param int $start
* @param int $limit
*
* @return array|null
*/
private function getIntegrationEntities($integrationObject, $start, $limit)
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
{
return $integrationObject->getIntegrationEntityRepository()->getEntities(
[
'filter' => [
'force' => [
[
'column' => 'e.integration',
'expr' => 'eq',
'value' => $integrationObject->getName(),
],
[
'column' => 'e.integrationEntity',
'expr' => 'eq',
'value' => LeadExport::PERSON_ENTITY_TYPE,
],
[
'column' => 'e.internalEntity',
'expr' => 'eq',
'value' => LeadExport::LEAD_ENTITY_TYPE,
],
],
],
'start' => $start,
'limit' => $limit,
'ignore_paginator' => true,
]
);
}
}
72 changes: 66 additions & 6 deletions plugins/MauticCrmBundle/Command/PushDataToPipedriveCommand.php
Expand Up @@ -2,6 +2,8 @@

namespace MauticPlugin\MauticCrmBundle\Command;

use Doctrine\ORM\EntityManager;
use Mautic\CoreBundle\Helper\ProgressBarHelper;
use Mautic\LeadBundle\Entity\Company;
use Mautic\LeadBundle\Entity\Lead;
use MauticPlugin\MauticCrmBundle\Integration\PipedriveIntegration;
Expand Down Expand Up @@ -41,6 +43,7 @@ protected function configure()
protected function execute(InputInterface $input, OutputInterface $output)
{
$integrationHelper = $this->getContainer()->get('mautic.helper.integration');
/** @var PipedriveIntegration $integrationObject */
$integrationObject = $integrationHelper->getIntegrationObject(PipedriveIntegration::INTEGRATION_NAME);
$this->io = new SymfonyStyle($input, $output);
$em = $this->getContainer()->get('doctrine')->getManager();
Expand Down Expand Up @@ -77,18 +80,75 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->io->text('Pushed '.$pushed);
}

$leads = $em->getRepository(Lead::class)->findAll();
$this->io->title('Pushing Leads');

$leadExport = $this->getContainer()->get('mautic_integration.pipedrive.export.lead');
$leadExport->setIntegration($integrationObject);
$pushed = 0;
foreach ($leads as $lead) {
if ($leadExport->create($lead)) {
++$pushed;

$pushed = 0;
$start = 0;
$limit = 50;
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, may be good idea to make the limit configurable

$progress = ProgressBarHelper::init($output, $limit);
Copy link
Sponsor Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second param should be max count (the count of 100%). 50 is a wild guess of how many integration entities there are. You should count the amount of integration entities that will be processed instead.

while (true) {
$leads = $this->getLeads($em, $start, $limit);

if (!$leads) {
break;
}

foreach ($leads as $lead) {
if ($leadExport->create($lead)) {
++$pushed;
if ($pushed % $limit == 0) {
$progress->setProgress($pushed);
}
}
}
$start = $start + $limit;
$em->clear();
}
$this->io->text('Pushed '.$pushed);

$progress->finish();

$output->writeln('');
$this->io->text('Pushed total '.$pushed);
$this->io->success('Execution time: '.number_format(microtime(true) - $_SERVER['REQUEST_TIME_FLOAT'], 3));
}

/**
* @param EntityManager $em
* @param $start
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
* @param $limit
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
*
* @return array
*/
private function getLeads(EntityManager $em, $start, $limit)
{
return $em->getRepository(Lead::class)->getEntities(
[
'filter' => [
'force' => [
[
'column' => 'l.email',
'expr' => 'neq',
'value' => '',
kuzmany marked this conversation as resolved.
Show resolved Hide resolved
],
[
'column' => 'l.firstname',
'expr' => 'neq',
'value' => '',
],
[
'column' => 'l.lastname',
'expr' => 'neq',
'value' => '',
],
],
],
'start' => $start,
'limit' => $limit,
'ignore_paginator' => true,
]
);
}
}
7 changes: 7 additions & 0 deletions plugins/MauticCrmBundle/Config/config.php
Expand Up @@ -144,6 +144,13 @@
'mautic_integration.pipedrive.export.company',
],
],
'mautic_integration.pipedrive.export.activities' => [
'class' => \MauticPlugin\MauticCrmBundle\Integration\Pipedrive\Export\ActivitiesPipedriveExport::class,
'arguments' => [
'doctrine.orm.entity_manager',
'mautic.lead.model.lead',
],
],
],
'forms' => [
'mautic.form.type.connectwise.campaignaction' => [
Expand Down