In [None]:
import sqlalchemy as sql
import pymysql
import datetime as dt
import polars as pl
import boto3
from boto3.s3.transfer import TransferConfig
import os
from dotenv import load_dotenv

load_dotenv()

In [77]:
SUBSET_NAME = "subset_gb_1"

df = pl.read_csv(f"subsets/{ SUBSET_NAME }.csv")

df

text,new_speaker
str,str
"""moved that Lord Walsingham be …","""unknown"""
"""in seconding the motion, took …","""john_scott_6733"""
"""—The question was put, and the…","""john_scott_6733"""
"""then rose, and observed, that …","""unknown"""
"""He had to return his thanks to…","""unknown"""
…,…
"""The best proof he could give t…","""william_portman_4287"""
"""Mr. Grierson told the Committe…","""william_portman_4287"""
"""The proposed change in the law…","""william_portman_4287"""
"""On the contrary, it would impo…","""william_portman_4287"""


In [None]:
host = "democracy-viewer-dev.ct4cawowow64.us-east-2.rds.amazonaws.com"
database = "democracy_viewer"
port = "3306"
username = "admin"
password = "a4ceoABO14HpZpafuwWe"

# Connect to database
conn_str = "mysql+pymysql://{}:{}@{}:{}/{}".format(
    os.environ.get("DATABASE_USERNAME"), os.environ.get("PASSWORD"), os.environ.get("HOST"), os.environ.get("PORT"), os.environ.get("DATABASE")
)
        
engine = sql.create_engine(conn_str)
meta = sql.MetaData()
meta.reflect(engine)

In [79]:
METADATA_TEMPLATE = {
    "metadata": {
        "email": "rdschaefer@smu.edu",
        "title": "test",
        "description": "test", 
        "is_public": False,
        "preprocessing_type": "lemma", # or "stem" or "none"
        "embeddings": True,
        "embed_col": "new_speaker", 
        "language": "English",
        "num_records": len(df)
    },
    "tags": [

    ],
    "text": [
        "text"
    ],
    "embed": [
        "new_speaker"
    ]
}

In [80]:
with engine.connect() as conn:
    conn.execute((
        sql.delete(meta.tables["dataset_metadata"])
            .where(meta.tables["dataset_metadata"].c.table_name == SUBSET_NAME)
    ))
    conn.commit()
    

In [81]:
dataset_metadata = {
    "table_name": SUBSET_NAME,
    "date_posted": dt.date.today()
}
for key, value in METADATA_TEMPLATE["metadata"].items():
    if value != "":
        dataset_metadata[key] = value
query = (
    sql.insert(meta.tables["dataset_metadata"])
        .values(dataset_metadata)
)

with engine.connect() as conn:
    conn.execute(query)
    conn.commit()

In [82]:
tags = []
for tag in METADATA_TEMPLATE["tags"]:
    query = (
        sql.insert(meta.tables["tags"])
            .values({
            "table_name": SUBSET_NAME,
            "tag_name": tag
        })
    )
    tags.append(query)


with engine.connect() as conn:
    for query in tags:
        conn.execute(query)
        conn.commit()

In [83]:
all_cols = []
for col in df.columns:
    query = (
        sql.insert(meta.tables["dataset_all_cols"])
            .values({
                "table_name": SUBSET_NAME,
                "col": col
            })
    )
    all_cols.append(query)


with engine.connect() as conn:
    for query in all_cols:
        conn.execute(query)
    conn.commit()

In [84]:
text_cols = []
for col in METADATA_TEMPLATE["text"]:
    query = (
        sql.insert(meta.tables["dataset_text_cols"])
            .values({
                "table_name": SUBSET_NAME,
                "col": col
            })
    )
    text_cols.append(query)

with engine.connect() as conn:
    for query in text_cols:
        conn.execute(query)
    conn.commit()

In [85]:
embed_cols = []
for col in METADATA_TEMPLATE["embed"]:
    query = (
        sql.insert(meta.tables["dataset_embed_cols"])
            .values({
                "table_name": SUBSET_NAME,
                "col": col
            })
    )
    embed_cols.append(query)

with engine.connect() as conn:
    for query in embed_cols:
        conn.execute(query)
    conn.commit()

In [86]:
df = df.with_row_index("record_id")
df.write_parquet(f"subsets/{ SUBSET_NAME }.parquet", use_pyarrow=True)

df

record_id,text,new_speaker
u32,str,str
0,"""moved that Lord Walsingham be …","""unknown"""
1,"""in seconding the motion, took …","""john_scott_6733"""
2,"""—The question was put, and the…","""john_scott_6733"""
3,"""then rose, and observed, that …","""unknown"""
4,"""He had to return his thanks to…","""unknown"""
…,…,…
6390179,"""The best proof he could give t…","""william_portman_4287"""
6390180,"""Mr. Grierson told the Committe…","""william_portman_4287"""
6390181,"""The proposed change in the law…","""william_portman_4287"""
6390182,"""On the contrary, it would impo…","""william_portman_4287"""


In [None]:
s3_client = boto3.client(
    "s3",
    aws_access_key_id = os.environ.get("S3_KEY"),
    aws_secret_access_key = os.environ.get("S3_SECRET"),
    region_name = os.environ.get("S3_REGION")
)

In [None]:
config = TransferConfig(
    multipart_threshold=1024 * 500,  # 100MB
    max_concurrency=10,
    multipart_chunksize=1024 * 500,  # 100MB
    use_threads=True
)

s3_client.upload_file(
    f"subsets/{ SUBSET_NAME }.parquet",
    os.environ.get("S3_BUCKET"),
    f"tables/datasets_{ SUBSET_NAME }/{ SUBSET_NAME }.parquet",
    Config = config
)

In [89]:
BATCH_QUEUE = "democracy-viewer-processing-queue-large"
BATCH_DEF = "democracy-viewer-processing-large-def"

# Initialize the AWS Batch client
batch_client = boto3.client('batch')

# Define the job submission command
response = batch_client.submit_job(
    jobName=SUBSET_NAME,
    jobQueue=BATCH_QUEUE,
    jobDefinition=BATCH_DEF,
    parameters={
        'table_name': SUBSET_NAME,
        'num_threads': "15"
    }
)

response

{'ResponseMetadata': {'RequestId': 'ddaf5e4b-6900-4d6d-a2d2-dc59669f5449',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 13 Nov 2024 04:51:31 GMT',
   'content-type': 'application/json',
   'content-length': '161',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'ddaf5e4b-6900-4d6d-a2d2-dc59669f5449',
   'access-control-allow-origin': '*',
   'x-amz-apigw-id': 'BKx9IF_rCYcEZdg=',
   'access-control-expose-headers': 'X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date',
   'x-amzn-trace-id': 'Root=1-67343053-1d5218c922eba3aa423fab59'},
  'RetryAttempts': 0},
 'jobArn': 'arn:aws:batch:us-east-2:399426333101:job/a6435516-d465-4e40-9b0e-3efef8d70685',
 'jobName': 'subset_gb_1',
 'jobId': 'a6435516-d465-4e40-9b0e-3efef8d70685'}

In [90]:
print("Initial size:", os.path.getsize(f"subsets/{ SUBSET_NAME }.csv"))
print("Compressed size:", os.path.getsize(f"subsets/{ SUBSET_NAME }.parquet"))
print("Record count:", len(df))

Initial size: 1159456082
Compressed size: 403142581
Record count: 6390184
