Skip to content

Commit

Permalink
Adds context deadline to connection endpoints + alerts to schema tabl…
Browse files Browse the repository at this point in the history
…es (#926)
  • Loading branch information
alishakawaguchi committed Dec 19, 2023
1 parent 5753041 commit 79665b1
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 11 deletions.
16 changes: 12 additions & 4 deletions backend/services/mgmt/v1alpha1/connection-service/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ func (s *Service) GetConnectionForeignConstraints(
if err != nil {
return nil, err
}
allConstraints, err := getAllPostgresFkConstraints(pgquerier, ctx, pool, schemas)
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()
allConstraints, err := getAllPostgresFkConstraints(pgquerier, cctx, pool, schemas)
if err != nil {
return nil, err
}
Expand All @@ -436,7 +438,9 @@ func (s *Service) GetConnectionForeignConstraints(
logger.Error(fmt.Errorf("failed to close connection: %w", err).Error())
}
}()
allConstraints, err := getAllMysqlFkConstraints(mysqlquerier, ctx, conn, schemas)
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()
allConstraints, err := getAllMysqlFkConstraints(mysqlquerier, cctx, conn, schemas)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -496,10 +500,12 @@ func (s *Service) GetConnectionInitStatements(
if err != nil {
return nil, err
}
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()
for k, v := range schemaTableMap {
statements := []string{}
if req.Msg.Options.InitSchema {
stmt, err := dbschemas_postgres.GetTableCreateStatement(ctx, pool, pgquerier, v.Schema, v.Table)
stmt, err := dbschemas_postgres.GetTableCreateStatement(cctx, pool, pgquerier, v.Schema, v.Table)
if err != nil {
return nil, err
}
Expand All @@ -525,10 +531,12 @@ func (s *Service) GetConnectionInitStatements(
logger.Error(fmt.Errorf("failed to close connection: %w", err).Error())
}
}()
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()
for k, v := range schemaTableMap {
statements := []string{}
if req.Msg.Options.InitSchema {
stmt, err := dbschemas_mysql.GetTableCreateStatement(ctx, conn, &dbschemas_mysql.GetTableCreateStatementRequest{
stmt, err := dbschemas_mysql.GetTableCreateStatement(cctx, conn, &dbschemas_mysql.GetTableCreateStatementRequest{
Schema: v.Schema,
Table: v.Table,
})
Expand Down
7 changes: 5 additions & 2 deletions backend/services/mgmt/v1alpha1/connection-service/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"time"

"connectrpc.com/connect"
_ "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -81,10 +82,12 @@ func (s *Service) GetConnectionSchema(
logger.Error(fmt.Errorf("failed to close sql connection: %w", err).Error())
}
}()
cctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second))
defer cancel()

switch connCfg.Config.(type) {
case *mgmtv1alpha1.ConnectionConfig_PgConfig:
dbSchema, err := getDatabaseSchema(ctx, conn, getPostgresTableSchemaSql)
dbSchema, err := getDatabaseSchema(cctx, conn, getPostgresTableSchemaSql)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +97,7 @@ func (s *Service) GetConnectionSchema(
}), nil

case *mgmtv1alpha1.ConnectionConfig_MysqlConfig:
dbSchema, err := getDatabaseSchema(ctx, conn, getMysqlTableSchemaSql)
dbSchema, err := getDatabaseSchema(cctx, conn, getMysqlTableSchemaSql)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
UpdateJobSourceConnectionResponse,
} from '@neosync/sdk';
import { ExclamationTriangleIcon } from '@radix-ui/react-icons';
import { ReactElement } from 'react';
import { ReactElement, useEffect } from 'react';
import { useForm } from 'react-hook-form';
import { SchemaMap, getColumnMapping } from './DataSyncConnectionCard';
import { getFkIdFromGenerateSource } from './util';
Expand All @@ -66,8 +66,21 @@ export default function DataGenConnectionCard({ jobId }: Props): ReactElement {
isLoading: isJobLoading,
} = useGetJob(account?.id ?? '', jobId);
const fkSourceConnectionId = getFkIdFromGenerateSource(data?.job?.source);
const { data: schema, isLoading: isGetConnectionsSchemaLoading } =
useGetConnectionSchema(account?.id ?? '', fkSourceConnectionId);
const {
data: schema,
isLoading: isGetConnectionsSchemaLoading,
error,
} = useGetConnectionSchema(account?.id ?? '', fkSourceConnectionId);

useEffect(() => {
if (error) {
toast({
title: 'Unable to get connection schema',
description: getErrorMessage(error),
variant: 'destructive',
});
}
}, [error]);

const allJobMappings =
schema?.schemas.map((r) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import {
UpdateJobSourceConnectionRequest,
UpdateJobSourceConnectionResponse,
} from '@neosync/sdk';
import { ReactElement } from 'react';
import { ReactElement, useEffect } from 'react';
import { useForm } from 'react-hook-form';
import * as Yup from 'yup';
import { getConnection } from '../../util';
Expand Down Expand Up @@ -93,7 +93,7 @@ export default function DataSyncConnectionCard({ jobId }: Props): ReactElement {
const { account } = useAccount();
const { data, mutate } = useGetJob(account?.id ?? '', jobId);
const sourceConnectionId = getConnectionIdFromSource(data?.job?.source);
const { data: schema } = useGetConnectionSchema(
const { data: schema, error } = useGetConnectionSchema(
account?.id ?? '',
sourceConnectionId
);
Expand All @@ -102,6 +102,16 @@ export default function DataSyncConnectionCard({ jobId }: Props): ReactElement {

const connections = connectionsData?.connections ?? [];

useEffect(() => {
if (error) {
toast({
title: 'Unable to get connection schema',
description: getErrorMessage(error),
variant: 'destructive',
});
}
}, [error]);

const form = useForm({
resolver: yupResolver<SourceFormValues>(FORM_SCHEMA),
defaultValues: {
Expand Down

0 comments on commit 79665b1

Please sign in to comment.