-
Notifications
You must be signed in to change notification settings - Fork 81
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7c7247c
commit bfe14a5
Showing
2 changed files
with
322 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
<?php | ||
|
||
namespace Marquine\Etl\Loaders; | ||
|
||
use Generator; | ||
use Marquine\Etl\Database\Statement; | ||
use Marquine\Etl\Database\Transaction; | ||
|
||
class InsertUpdate extends Loader | ||
{ | ||
/** | ||
* The connection name. | ||
* | ||
* @var string | ||
*/ | ||
public $connection = 'default'; | ||
|
||
/** | ||
* The primary key. | ||
* | ||
* @var mixed | ||
*/ | ||
public $key = ['id']; | ||
|
||
/** | ||
* The columns to insert/update. | ||
* | ||
* @var array | ||
*/ | ||
public $columns = []; | ||
|
||
/** | ||
* Indicates if the table has timestamps columns. | ||
* | ||
* @var bool | ||
*/ | ||
public $timestamps = false; | ||
|
||
/** | ||
* Transaction mode. | ||
* | ||
* @var mixed | ||
*/ | ||
public $transaction = 'single'; | ||
|
||
/** | ||
* The database table. | ||
* | ||
* @var string | ||
*/ | ||
protected $table; | ||
|
||
/** | ||
* Timestamps columns value. | ||
* | ||
* @var string | ||
*/ | ||
protected $time; | ||
|
||
/** | ||
* The select statement. | ||
* | ||
* @var \PDOStatement | ||
*/ | ||
protected $select; | ||
|
||
/** | ||
* The insert statement. | ||
* | ||
* @var \PDOStatement | ||
*/ | ||
protected $insert; | ||
|
||
/** | ||
* The update statement. | ||
* | ||
* @var \PDOStatement | ||
*/ | ||
protected $update; | ||
|
||
/** | ||
* Load data into the given destination. | ||
* | ||
* @param \Generator $data | ||
* @param string $destination | ||
* @return void | ||
*/ | ||
public function load(Generator $data, $destination) | ||
{ | ||
$this->normalizeColumns($data); | ||
|
||
$this->normalizeKey(); | ||
|
||
$this->table = $destination; | ||
|
||
$this->time = date('Y-m-d G:i:s'); | ||
|
||
Transaction::connection($this->connection)->mode($this->transaction)->data($data)->run(function ($row) { | ||
if ($this->exists($row)) { | ||
$this->update($row); | ||
} else { | ||
$this->insert($row); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Verify if a row exists on the database. | ||
* | ||
* @param array $row | ||
* @return bool | ||
*/ | ||
protected function exists($row) | ||
{ | ||
if (! $this->select) { | ||
$this->select = Statement::connection($this->connection)->select($this->table)->where($this->key)->prepare(); | ||
} | ||
|
||
$this->select->execute(array_intersect_key($row, $this->key)); | ||
|
||
return (bool) $this->select->fetch(); | ||
} | ||
|
||
/** | ||
* Insert the row. | ||
* | ||
* @param array $row | ||
* @return void | ||
*/ | ||
protected function insert($row) | ||
{ | ||
if (! $this->insert) { | ||
$this->insert = Statement::connection($this->connection)->insert($this->table, $this->getInsertColumns())->prepare(); | ||
} | ||
|
||
$row = array_intersect_key($row, $this->columns + $this->key); | ||
|
||
if ($this->timestamps) { | ||
$row['created_at'] = $this->time; | ||
$row['updated_at'] = $this->time; | ||
} | ||
|
||
$this->insert->execute($row); | ||
} | ||
|
||
/** | ||
* Update the row. | ||
* | ||
* @param array $row | ||
* @return void | ||
*/ | ||
protected function update($row) | ||
{ | ||
if (! $this->update) { | ||
$this->update = Statement::connection($this->connection)->update($this->table, $this->getUpdateColumns())->where($this->key)->prepare(); | ||
} | ||
|
||
$row = array_intersect_key($row, $this->columns + $this->key); | ||
|
||
if ($this->timestamps) { | ||
$row['updated_at'] = $this->time; | ||
} | ||
|
||
$this->update->execute($row); | ||
} | ||
|
||
/** | ||
* Get the columns for the insert statement. | ||
* | ||
* @return array | ||
*/ | ||
protected function getInsertColumns() | ||
{ | ||
$columns = array_values($this->columns); | ||
|
||
if ($this->timestamps) { | ||
$columns[] = 'created_at'; | ||
$columns[] = 'updated_at'; | ||
} | ||
|
||
return $columns; | ||
} | ||
|
||
/** | ||
* Get the columns for the update statement. | ||
* | ||
* @return void | ||
*/ | ||
protected function getUpdateColumns() | ||
{ | ||
$columns = array_values(array_diff_key($this->columns, $this->key)); | ||
|
||
if ($this->timestamps) { | ||
$columns[] = 'updated_at'; | ||
} | ||
|
||
return $columns; | ||
} | ||
|
||
/** | ||
* Normalize the columns list. | ||
* | ||
* @param \Generator $data | ||
* @return void | ||
*/ | ||
protected function normalizeColumns($data) | ||
{ | ||
if (empty($this->columns)) { | ||
$this->columns = array_keys($data->current()); | ||
} | ||
|
||
$this->columns = array_combine($this->columns, $this->columns); | ||
} | ||
|
||
/** | ||
* Normalize the primary key. | ||
* | ||
* @return void | ||
*/ | ||
protected function normalizeKey() | ||
{ | ||
if (is_string($this->key)) { | ||
$this->key = [$this->key]; | ||
} | ||
|
||
$this->key = array_combine($this->key, $this->key); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
<?php | ||
|
||
namespace Tests\Loaders; | ||
|
||
use Tests\TestCase; | ||
use Marquine\Etl\Loaders\InsertUpdate; | ||
use Marquine\Etl\Database\Manager as DB; | ||
|
||
class InsertUpdateTest extends TestCase | ||
{ | ||
/** @test */ | ||
public function insert_and_update_data() | ||
{ | ||
$this->createUsersTable('default'); | ||
|
||
DB::connection('default')->exec("insert into users values (1, 'Jane', 'janedoe@example.com')"); | ||
|
||
$data = function () { | ||
yield ['id' => '1', 'name' => 'Jane Doe', 'email' => 'janedoe@example.com']; | ||
yield ['id' => '2', 'name' => 'John Doe', 'email' => 'johndoe@example.com']; | ||
}; | ||
|
||
$loader = new InsertUpdate; | ||
|
||
$loader->load($data(), 'users'); | ||
|
||
$expected = [ | ||
['id' => '1', 'name' => 'Jane Doe', 'email' => 'janedoe@example.com'], | ||
['id' => '2', 'name' => 'John Doe', 'email' => 'johndoe@example.com'], | ||
]; | ||
|
||
$query = DB::connection('default')->query('select * from users'); | ||
|
||
$this->assertEquals($expected, $query->fetchAll()); | ||
} | ||
|
||
/** @test */ | ||
public function insert_and_update_specified_columns() | ||
{ | ||
$this->createUsersTable('default'); | ||
|
||
DB::connection('default')->exec("insert into users values (1, 'Jane', 'oldemail@example.com')"); | ||
|
||
$data = function () { | ||
yield ['id' => '1', 'name' => 'Jane Doe', 'email' => 'newemail@example.com']; | ||
yield ['id' => '2', 'name' => 'John Doe', 'email' => 'johndoe@example.com']; | ||
}; | ||
|
||
$loader = new InsertUpdate; | ||
|
||
$loader->columns = ['id', 'name']; | ||
|
||
$loader->load($data(), 'users'); | ||
|
||
$expected = [ | ||
['id' => '1', 'name' => 'Jane Doe', 'email' => 'oldemail@example.com'], | ||
['id' => '2', 'name' => 'John Doe', 'email' => ''], | ||
]; | ||
|
||
$query = DB::connection('default')->query('select * from users'); | ||
|
||
$this->assertEquals($expected, $query->fetchAll()); | ||
} | ||
|
||
/** @test */ | ||
public function insert_and_update_data_with_timestamps() | ||
{ | ||
$this->createUsersTable('default', true); | ||
|
||
$timestamp = '2005-03-24 15:00:00'; | ||
|
||
DB::connection('default')->exec("insert into users values (1, 'Jane', 'jane@example.com', '$timestamp', '$timestamp', null)"); | ||
|
||
$data = function () { | ||
yield ['id' => '1', 'name' => 'Jane Doe', 'email' => 'janedoe@example.com']; | ||
yield ['id' => '2', 'name' => 'John Doe', 'email' => 'johndoe@example.com']; | ||
}; | ||
|
||
$loader = new InsertUpdate; | ||
|
||
$loader->timestamps = true; | ||
|
||
$loader->load($data(), 'users'); | ||
|
||
$expected = [ | ||
['id' => '1', 'name' => 'Jane Doe', 'email' => 'janedoe@example.com', 'created_at' => '2005-03-24 15:00:00', 'updated_at' => date('Y-m-d G:i:s'), 'deleted_at' => null], | ||
['id' => '2', 'name' => 'John Doe', 'email' => 'johndoe@example.com', 'created_at' => date('Y-m-d G:i:s'), 'updated_at' => date('Y-m-d G:i:s'), 'deleted_at' => null], | ||
]; | ||
|
||
$query = DB::connection('default')->query('select * from users'); | ||
|
||
$this->assertEquals($expected, $query->fetchAll()); | ||
} | ||
} |