Skip to content

Commit

Permalink
fix: formatting and type cast
Browse files Browse the repository at this point in the history
  • Loading branch information
Scarjit committed Mar 8, 2024
1 parent bbe2a11 commit fe42b5e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,35 @@ func (c *Connection) InsertProductTypeCreate(msg *sharedStructs.ProductTypeCreat
// Start tx (this shouln't take more then 1 minute)
ctx, cncl := get1MinuteContext()
defer cncl()
tx, err := c.db.Begin(ctx)
tx, err := c.Db.Begin(ctx)
if err != nil {
return err
}

// Insert product_type
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
INSERT INTO product_types (externalProductTypeId, cycleTime, assetId)
VALUES ($1, $2, $3)
ON CONFLICT (externalProductTypeId, assetId) DO NOTHING
INSERT INTO product_type
(
external_product_type_id,
cycle_time_ms,
asset_id
)
VALUES
(
$1,
$2,
$3
)
on conflict
(
external_product_type_id,
asset_id
)
do nothing
`, msg.ExternalProductTypeId, int(msg.CycleTimeMs), int(assetId))
if err != nil {
zap.S().Warnf("Error inserting product-type: %v (externalProductTypeId: %v) [%s]", err, msg.ExternalProductTypeId, cmdTag)
zap.S().Warnf("Error inserting product-type: %v (external_product_type_id: %v) [%s]", err, msg.ExternalProductTypeId, cmdTag)
zap.S().Debugf("Message: %v (Topic: %v)", msg, topic)
errR := tx.Rollback(ctx)
if errR != nil {
Expand Down
42 changes: 26 additions & 16 deletions golang/cmd/kafka-to-postgresql-v2/postgresql/analytics-product.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@ func (c *Connection) InsertProductAdd(msg *sharedStructs.ProductAddMessage, topi
// Insert producth
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
INSERT INTO product(external_product_type_id, product_batch_id, asset_id, start_time, end_time, quantity, bad_quantity)
VALUES (
$1,
$2,
$3,
CASE
WHEN $4 IS NOT NULL THEN to_timestamp($4/1000)
END,
to_timestamp($5/1000),
$6,
$7
INSERT INTO product
(
external_product_type_id,
product_batch_id,
asset_id,
start_time,
end_time,
quantity,
bad_quantity
)
VALUES
(
$1,
$2,
$3,
CASE
WHEN $4::int IS NOT NULL THEN To_timestamp($4::int/1000)
END::timestamptz,
To_timestamp($5/1000),
$6,
$7::int
)
`, int(productTypeId), helper.StringPtrToNullString(msg.ProductBatchId), int(assetId), helper.Uint64PtrToNullInt64(msg.StartTimeUnixMs), msg.EndTimeUnixMs, int(msg.Quantity), helper.Uint64PtrToNullInt64(msg.BadQuantity))
if err != nil {
Expand Down Expand Up @@ -74,11 +84,11 @@ func (c *Connection) UpdateBadQuantityForProduct(msg *sharedStructs.ProductSetBa
// Update bad quantity with check integrated in WHERE clause
cmdTag, err := tx.Exec(ctx, `
UPDATE product
SET bad_quantity = bad_quantity + $1
WHERE external_product_type_id = $2
AND asset_id = $3
AND end_time = to_timestamp($4/1000)
AND (quantity - bad_quantity) >= $1
SET bad_quantity = bad_quantity + $1
WHERE external_product_type_id = $2
AND asset_id = $3
AND end_time = To_timestamp($4 / 1000)
AND ( quantity - bad_quantity ) >= $1
`, int(msg.BadQuantity), int(productTypeId), int(assetId), msg.EndTimeUnixMs)

if err != nil {
Expand Down
27 changes: 19 additions & 8 deletions golang/cmd/kafka-to-postgresql-v2/postgresql/analytics-shift.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@ func (c *Connection) InsertShiftAdd(msg *sharedStructs.ShiftAddMessage, topic *s
// Start tx (this shouln't take more then 1 minute)
ctx, cncl := get1MinuteContext()
defer cncl()
tx, err := c.db.Begin(ctx)
tx, err := c.Db.Begin(ctx)
if err != nil {
return err
}

// Insert shift
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
INSERT INTO shifts (assetId, startTime, endTime)
VALUES ($1, to_timestamp($2 / 1000), to_timestamp($3 / 1000))
ON CONFLICT ON CONSTRAINT shift_start_asset_uniq
DO NOTHING;
INSERT INTO shift
(
asset_id,
start_time,
end_time
)
VALUES
(
$1,
To_timestamp($2 / 1000),
To_timestamp($3 / 1000)
)
on conflict
ON CONSTRAINT shift_start_asset_uniq do nothing;
`, int(assetId), msg.StartTimeUnixMs, msg.EndTimeUnixMs)

if err != nil {
Expand All @@ -49,16 +59,17 @@ func (c *Connection) DeleteShiftByStartTime(msg *sharedStructs.ShiftDeleteMessag

ctx, cncl := get1MinuteContext()
defer cncl()
tx, err := c.db.Begin(ctx)
tx, err := c.Db.Begin(ctx)
if err != nil {
return err
}

// Delete shift
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
DELETE FROM shifts
WHERE assetId = $1 AND startTime = to_timestamp($2 / 1000);
DELETE FROM shift
WHERE asset_id = $1
AND start_time = To_timestamp($2 / 1000);
`, int(assetId), msg.StartTimeUnixMs)

if err != nil {
Expand Down
71 changes: 44 additions & 27 deletions golang/cmd/kafka-to-postgresql-v2/postgresql/analytics-state.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ func (c *Connection) InsertStateAdd(msg *sharedStructs.StateAddMessage, topic *s
// Start tx (this shouln't take more then 1 minute)
ctx, cncl := get1MinuteContext()
defer cncl()
tx, err := c.db.Begin(ctx)
tx, err := c.Db.Begin(ctx)
if err != nil {
return err
}

// If there is already a previous state, set it's end time to the new state's start time
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
UPDATE states
SET endTime = to_timestamp($2/1000)
WHERE assetId = $1
AND endTime IS NULL
AND startTime < to_timestamp($2/1000)
UPDATE state
SET end_time = To_timestamp($2 / 1000)
WHERE asset_id = $1
AND end_time IS NULL
AND start_time < To_timestamp($2 / 1000)
`, int(assetId), msg.StartTimeUnixMs)

if err != nil {
Expand All @@ -42,10 +42,20 @@ func (c *Connection) InsertStateAdd(msg *sharedStructs.StateAddMessage, topic *s

// Insert state
cmdTag, err = tx.Exec(ctx, `
INSERT INTO states (assetId, startTime, state)
VALUES ($1, to_timestamp($2/1000), $3)
ON CONFLICT ON CONSTRAINT state_start_asset_uniq
DO NOTHING
INSERT INTO state
(
asset_id,
start_time,
state
)
VALUES
(
$1,
To_timestamp($2/1000),
$3
)
on conflict
ON CONSTRAINT state_start_asset_uniq do nothing
`, int(assetId), msg.StartTimeUnixMs, int(msg.State))

if err != nil {
Expand Down Expand Up @@ -75,18 +85,18 @@ func (c *Connection) OverwriteStateByStartEndTime(msg *sharedStructs.StateOverwr
// Start tx (this shouln't take more then 1 minute)
ctx, cncl := get1MinuteContext()
defer cncl()
tx, err := c.db.Begin(ctx)
tx, err := c.Db.Begin(ctx)
if err != nil {
return err
}

// Delete states between start and end time (inclusive) for the asset
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
DELETE FROM states
WHERE assetId = $1
AND startTime >= to_timestamp($2/1000)
AND startTime <= to_timestamp($3/1000)
DELETE FROM state
WHERE asset_id = $1
AND start_time >= To_timestamp($2 / 1000)
AND start_time <= To_timestamp($3 / 1000)
`, int(assetId), msg.StartTimeUnixMs, msg.EndTimeUnixMs)

if err != nil {
Expand All @@ -101,11 +111,11 @@ func (c *Connection) OverwriteStateByStartEndTime(msg *sharedStructs.StateOverwr

// Check for overlapping state and modify it's end time
cmdTag, err = tx.Exec(ctx, `
UPDATE states
SET endTime = to_timestamp($2/1000)
WHERE assetId = $1
AND endTime > to_timestamp($2/1000)
AND endTime <= to_timestamp($3/1000)
UPDATE state
SET end_time = To_timestamp($2 / 1000)
WHERE asset_id = $1
AND end_time > To_timestamp($2 / 1000)
AND end_time <= To_timestamp($3 / 1000)
`, int(assetId), msg.StartTimeUnixMs, msg.EndTimeUnixMs)

if err != nil {
Expand All @@ -120,11 +130,11 @@ func (c *Connection) OverwriteStateByStartEndTime(msg *sharedStructs.StateOverwr

// Check for overlapping state and modify it's start time
cmdTag, err = tx.Exec(ctx, `
UPDATE states
SET startTime = to_timestamp($3/1000)
WHERE assetId = $1
AND startTime >= to_timestamp($2/1000)
AND startTime < to_timestamp($3/1000)
UPDATE state
SET start_time = To_timestamp($3 / 1000)
WHERE asset_id = $1
AND start_time >= To_timestamp($2 / 1000)
AND start_time < To_timestamp($3 / 1000)
`, int(assetId), msg.StartTimeUnixMs, msg.EndTimeUnixMs)

if err != nil {
Expand All @@ -140,8 +150,15 @@ func (c *Connection) OverwriteStateByStartEndTime(msg *sharedStructs.StateOverwr
// Insert state

cmdTag, err = tx.Exec(ctx, `
INSERT INTO states (assetId, startTime, endTime, state)
VALUES ($1, to_timestamp($2/1000), to_timestamp($3/1000), $4)
INSERT INTO state
(asset_id,
start_time,
end_time,
state)
VALUES ($1,
To_timestamp($2 / 1000),
To_timestamp($3 / 1000),
$4)
`, int(assetId), msg.StartTimeUnixMs, msg.EndTimeUnixMs, int(msg.State))

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,27 @@ func (c *Connection) InsertWorkOrderCreate(msg *sharedStructs.WorkOrderCreateMes
zap.S().Debugf("Inserting work order: %+v", values)
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
INSERT INTO work_order(external_work_order_id, asset_id, product_type_id, quantity, status, start_time, end_time)
VALUES ($1, $2, $3, $4, $5, CASE WHEN $6 IS NOT NULL THEN to_timestamp($6/1000) ELSE NULL END::timestamptz, CASE WHEN $7 IS NOT NULL THEN to_timestamp($7/1000) ELSE NULL END::timestamptz)
INSERT INTO work_order
(external_work_order_id,
asset_id,
product_type_id,
quantity,
status,
start_time,
end_time)
VALUES ($1,
$2,
$3,
$4,
$5,
CASE
WHEN $6 :: INT IS NOT NULL THEN To_timestamp($6 :: INT / 1000)
ELSE NULL
END :: timestamptz,
CASE
WHEN $7 :: INT IS NOT NULL THEN To_timestamp($7 :: INT / 1000)
ELSE NULL
END :: timestamptz)
`, values...)
if err != nil {
zap.S().Warnf("Error inserting work order: %v (workOrderId: %v) [%s]", err, msg.ExternalWorkOrderId, cmdTag)
Expand Down Expand Up @@ -61,11 +80,12 @@ func (c *Connection) UpdateWorkOrderSetStart(msg *sharedStructs.WorkOrderStartMe
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
UPDATE work_order
SET status = 1, start_time = to_timestamp($2 / 1000)
WHERE external_work_order_id = $1
AND status = 0
AND start_time IS NULL
AND asset_id = $3
SET status = 1,
start_time = To_timestamp($2 / 1000)
WHERE external_work_order_id = $1
AND status = 0
AND start_time IS NULL
AND asset_id = $3
`, msg.ExternalWorkOrderId, msg.StartTimeUnixMs, int(assetId))
if err != nil {
zap.S().Warnf("Error updating work order: %v (workOrderId: %v) [%s]", err, msg.ExternalWorkOrderId, cmdTag)
Expand Down Expand Up @@ -102,11 +122,12 @@ func (c *Connection) UpdateWorkOrderSetStop(msg *sharedStructs.WorkOrderStopMess
var cmdTag pgconn.CommandTag
cmdTag, err = tx.Exec(ctx, `
UPDATE work_order
SET status = 2, end_time = to_timestamp($2 / 1000)
WHERE external_work_order_id = $1
AND status = 1
AND end_time IS NULL
AND asset_id = $3
SET status = 2,
end_time = To_timestamp($2 / 1000)
WHERE external_work_order_id = $1
AND status = 1
AND end_time IS NULL
AND asset_id = $3
`, msg.ExternalWorkOrderId, msg.EndTimeUnixMs, int(assetId))
if err != nil {
zap.S().Warnf("Error updating work order: %v (workOrderId: %v) [%s]", err, msg.ExternalWorkOrderId, cmdTag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestWorkOrder(t *testing.T) {
// Expect Exec from InsertWorkOrderCreate
mock.ExpectBeginTx(pgx.TxOptions{})
mock.ExpectExec(`
INSERT INTO work_order\(external_work_order_id, asset_id, product_type_id, quantity, status, start_time, end_time\) VALUES \(\$1, \$2, \$3, \$4, \$5, CASE WHEN \$6 IS NOT NULL THEN to_timestamp\(\$6/1000\) ELSE NULL END\:\:timestamptz, CASE WHEN \$7 IS NOT NULL THEN to_timestamp\(\$7/1000\) ELSE NULL END\:\:timestamptz\)
INSERT INTO work_order\(external_work_order_id, asset_id, product_type_id, quantity, status, start_time, end_time\) VALUES \(\$1, \$2, \$3, \$4, \$5, CASE WHEN \$6\:\:int IS NOT NULL THEN to_timestamp\(\$6\:\:int/1000\) ELSE NULL END\:\:timestamptz, CASE WHEN \$7\:\:int IS NOT NULL THEN to_timestamp\(\$7\:\:int/1000\) ELSE NULL END\:\:timestamptz\)
`).WithArgs("#1274", 1, 1, 0, 0, helper.Uint64PtrToNullInt64(helper.IntToUint64Ptr(0)), helper.Uint64PtrToNullInt64(helper.IntToUint64Ptr(0))).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
mock.ExpectCommit()
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestProduct(t *testing.T) {
// Expect Exec from InsertProductAdd
mock.ExpectBeginTx(pgx.TxOptions{})
mock.ExpectExec(`INSERT INTO product\(external_product_type_id, product_batch_id, asset_id, start_time, end_time, quantity, bad_quantity\)
VALUES \( \$1, \$2, \$3, CASE WHEN \$4 IS NOT NULL THEN to_timestamp\(\$4\/1000\) ELSE NULL END\:\:timestamptz, to_timestamp\(\$5\/1000\), \$6, \$7 \)`).
VALUES \( \$1, \$2, \$3, CASE WHEN \$4\:\:int IS NOT NULL THEN to_timestamp\(\$4\:\:int\/1000\) ELSE NULL END\:\:timestamptz, to_timestamp\(\$5\/1000\), \$6, \$7 \)`).
WithArgs(1, helper.StringToNullString("0000-1234"), 1, helper.Uint64PtrToNullInt64(helper.IntToUint64Ptr(0)), uint64(10), 512, helper.Uint64PtrToNullInt64(helper.IntToUint64Ptr(0))).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
mock.ExpectCommit()
Expand Down

0 comments on commit fe42b5e

Please sign in to comment.