In [15]:
import os
import pandas as pd
import boto3
from botocore.exceptions import NoCredentialsError

import ujson as json
import psycopg2
from botocore.exceptions import ClientError


AWS_REGION="ap-northeast-2"
S3_BUCKET="flitto"

REDSHIFT_CLUSTER="redshift-cluster-1"
IAM_ROLE_ARN="arn:aws:iam::946551077663:role/service-role/AmazonRedshift-CommandsAccessRole-20231219T175533"

DATABASE_NAME="corpus"
DATABASE_USER="awsuser"

In [16]:
# AWS 자격 증명 로드
try:
    s3 = boto3.client('s3')
    redshift = boto3.client('redshift-data')

except NoCredentialsError:
    print('AWS 자격 증명 파일을 찾을 수 없습니다. ~/.aws/credentials 파일이 올바르게 설정되었는지 확인하세요.')

dtype_to_redshift_data_types ={
    'int64': 'INTEGER',
    'float64': 'FLOAT',
    'object': 'VARCHAR(255)'
}

response = s3.list_objects_v2(Bucket=S3_BUCKET)


In [17]:
def get_csv_files():
    response = s3.list_objects_v2(Bucket=S3_BUCKET)

    # CSV 파일 목록 출력
    csv_files = [ (os.path.splitext(obj['Key'])[0], f"s3://{S3_BUCKET}/{obj['Key']}") for obj in response.get('Contents', []) if obj['Key'].endswith('.csv')]
    return csv_files

csv_files = get_csv_files()

In [19]:
def get_secret():
    secret_name = 'redshift!redshift-cluster-1-awsuser'
    region_name = 'ap-northeast-2'
    
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        # For a list of exceptions thrown, see
        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
        raise e

    secret = get_secret_value_response['SecretString']
    return secret

secret = json.loads(get_secret())
conn = psycopg2.connect(
    dbname=DATABASE_NAME,
    user=secret['username'],
    password=secret['password'],
    host='redshift-cluster-1.chzlcffktkrj.ap-northeast-2.redshift.amazonaws.com',
    port=5439
)

cursor = conn.cursor()

In [27]:
def parse_s3_url(url):
    import re
    match = re.match(r's3://([^/]+)/(.+)', s3_url)
    if match:
        return match.groups()
    else:
        raise ValueError(f"Invalid S3 URL: {s3_url}")

table_name, s3_url = get_csv_files()[0]
b_name, obj_key = parse_s3_url(s3_url)

obj = s3.get_object(Bucket=b_name, Key=obj_key)
data = obj['Body'].read()
data

b'id,lang_id1,lang_id2,tag_id,cnt\n1,3,11,1,212370\n2,3,11,2,140894\n3,3,11,7,76085\n11,3,17,1,344402\n24382,3,17,21,105\n24383,3,17,19,25\n24384,3,17,20,17\n21093,3,17,17,3\n24392,3,20,21,47\n24397,3,20,19,16\n24398,3,20,20,12\n33,3,22,6,5031\n24408,3,22,19,9\n24409,3,22,20,4\n24410,3,22,21,27\n37,3,22,5,3189\n2488,3,22,0,225\n24431,3,27,21,54\n24433,3,27,19,12\n24434,3,27,20,6\n55,3,30,4,4855\n24445,3,30,21,54\n24446,3,30,19,18\n24447,3,30,20,6\n21144,3,30,17,3\n62,3,33,8,15215\n24459,3,33,21,75\n24460,3,33,19,28\n24461,3,33,20,15\n21155,3,33,17,3\n69,3,38,1,127814\n24468,3,38,21,43\n24471,3,38,19,11\n24474,3,38,20,5\n24482,3,45,21,1\n24488,3,45,20,1\n24497,3,48,19,14\n24498,3,48,20,7\n24499,3,48,21,18\n89,3,52,7,63852\n24511,3,52,19,18\n24512,3,52,20,10\n24513,3,52,21,29\n24520,3,56,21,41\n24522,3,56,19,11\n24525,3,56,20,8\n24533,3,57,21,1\n24544,3,61,19,12\n24547,3,61,21,38\n24548,3,61,20,7\n24555,3,62,21,1\n23897,3,63,7,2\n24562,3,63,21,2\n12409,3,63,1,1\n24574,11,17,21,273\n24575

In [None]:
import psycopg2

conn = psycopg2.connect(
    dbname=DATABASE_NAME,
    user=DATABASE_USER,
    password='YOUR_PASSWORD',
    host=redshift-'cluster-1.chzlcffktkrj.ap-northeast-2.redshift.amazonaws.com',
    port='5439'
)

In [14]:
conn.commit()

In [10]:
cursor = conn.cursor()

for csv_file in csv_files:
    s3_url = f"s3://{S3_BUCKET}/{csv_file}"
    df = pd.read_csv(s3_url, nrows=1000)

    table_name = os.path.splitext(csv_file)[0]
    
    create_table_query = f"CREATE TABLE {table_name} (\n"

    for field_name, data_type in zip(df.columns, df.dtypes):
        
        if data_type == 'object' and table_name == 'corpus':
            create_table_query += f"{field_name} VARCHAR(MAX),\n"
            
        else:
            create_table_query += f"{field_name} {dtype_to_redshift_data_types[str(data_type)]},\n"
    
    create_table_query = create_table_query.rstrip(',\n') + "\n);"
    
    copy_query = f"COPY {table_name} FROM '{s3_url}' IAM_ROLE '{IAM_ROLE_ARN}' FORMAT CSV IGNOREHEADER 1"

    try:    
        cursor.execute(create_table_query)
        print(f"Created {table_name} --")
        
        cursor.execute(copy_query)
        print(f"Updated rows --")
    
    except Exception as error:
        conn.rollback()
        print(error)
        break

Created cnt_2pairs_corpus --
Updated rows --
Created cnt_3pairs_corpus --
Updated rows --
Created cnt_mono_corpus --
Updated rows --
Created corpus --
Updated rows --
Created corpus_tags_map --
Updated rows --
Created duplicated_corpus --
Updated rows --
Created group_tags_map --
Updated rows --
Created languages --
Updated rows --
Created parallel_corpus --
Updated rows --
Created partners --
Updated rows --
Created project_corpus_map --
Updated rows --
Created projects --
Updated rows --
Created tags --
Updated rows --
Created user_meta --
Updated rows --
Created voice_corpus --
Updated rows --


In [9]:
cursor = conn.cursor()

for table, s3_url in get_csv_files():
    cursor.execute(f'drop table {table}')
    
conn.commit()

In [21]:
cursor = conn.cursor()
query='''
    SELECT
        A.group_id,
        A.corpus_id AS en_id,
        B.corpus_id AS zh_id,
        corpusA.text AS en,
        corpusB.text AS zh,
        project.title AS en_delivery,
        project.title AS zh_delivery,
        COALESCE(tag1.name, '') AS tag_1,
        COALESCE(gtm1.score, 0) AS score_1,
        COALESCE(tag2.name, '') AS tag_2,
        COALESCE(gtm2.score, 0) AS score_2,
        COALESCE(tag3.name, '') AS tag_3,
        COALESCE(gtm3.score, 0) AS score_3
    FROM 
        parallel_corpus A
        JOIN parallel_corpus B ON A.group_id = B.group_id AND A.lang_id = 17 AND B.lang_id = 11
        LEFT JOIN corpus corpusA ON A.corpus_id = corpusA.id
        LEFT JOIN corpus corpusB ON B.corpus_id = corpusB.id
        LEFT JOIN (
            SELECT corpus_id, MAX(project_id) AS max_project_id
            FROM project_corpus_map
            GROUP BY corpus_id
        ) pcm ON A.corpus_id = pcm.corpus_id
        LEFT JOIN projects project ON pcm.max_project_id = project.id
        LEFT JOIN group_tags_map gtm1 ON A.group_id = gtm1.group_id AND gtm1.priority = 1
        LEFT JOIN tags tag1 ON gtm1.tag_id = tag1.id
        LEFT JOIN group_tags_map gtm2 ON A.group_id = gtm2.group_id AND gtm2.priority = 2
        LEFT JOIN tags tag2 ON gtm2.tag_id = tag2.id
        LEFT JOIN group_tags_map gtm3 ON A.group_id = gtm3.group_id AND gtm3.priority = 3
        LEFT JOIN tags tag3 ON gtm3.tag_id = tag3.id
    
    '''

cursor.execute(query)

In [22]:
cursor.fetchall()

[(893789,
  13632279,
  2393789,
  'By then, There are people who look me as role model.',
  '到时候会有人想要把我作为榜样的。',
  '2019_BYTEDANCE_중일',
  '2019_BYTEDANCE_중일',
  'society',
  0.61,
  'beauty',
  0.57,
  '',
  0.0),
 (909136,
  2914272,
  2455224,
  'I sent 30 million won for the budget related to this.',
  '我们关于这些的预算汇款了3千万。',
  '2015_MEGADIC_중영한',
  '2015_MEGADIC_중영한',
  'financial',
  0.99,
  'business',
  0.95,
  'economy',
  0.83),
 (666243,
  1820690,
  1706546,
  'Especially secret village must be very popular.',
  '特别是秘密村在孩子们中间很受欢迎。',
  '2019_BYTEDANCE_중일',
  '2019_BYTEDANCE_중일',
  'beauty',
  0.69,
  '',
  0.0,
  '',
  0.0),
 (5735244,
  34739201,
  22215786,
  'If I stay here, I think I can have a dream with a mighty scale.',
  '如果住在这里的话,好像可以做一个大大的美梦呢。',
  None,
  None,
  'beauty',
  0.85,
  'liberal arts',
  0.56,
  '',
  0.0),
 (202094,
  8517732,
  2866926,
  "It's already been a month since I entered school.",
  '我进入学校已经一个月了。',
  '2019_현대자동차_영한',
  '2019_현대자동차_영한',
  'educat