In [None]:
%profile glue
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2
%list_sessions
%status
%connections ew_graph-ewtest-dev-neptune
%%configure
{
  "--session-language": "python",
  "--job-language": "python",
  "--continuous-log-logGroup": "ew_graph-ewtest-dev-job_vertices",
  "--enable-continuous-cloudwatch-log": "true",
  "--enable-continuous-log-filter" : "true",
  "--enable-metrics": "",
  "--additional-python-modules": "awswrangler==3.9.1,SPARQLWrapper==2.0.0,requests==2.32.3"
}


In [None]:
%reconnect bd2061c5-575b-42f4-9808-914cd4121720

In [None]:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


In [None]:
import boto3
from io import BytesIO
import gzip
import awswrangler as wr

In [None]:

s3_client = boto3.client('s3', 'us-east-1')
objs = s3_client.list_objects(
    Bucket='commoncrawl',
    Prefix='projects/hyperlinkgraph/cc-main-2024-jun-jul-aug/host/vertices'
)
for o in objs['Contents']:
    print(o['Key'])


In [None]:
# Around 20s on one vertex file
s3_client.upload_fileobj(
  Fileobj=gzip.GzipFile(
      None,
      'rb',
      fileobj=BytesIO(s3_client.get_object(Bucket="commoncrawl", Key="projects/hyperlinkgraph/cc-main-2024-jun-jul-aug/host/vertices/part-00000-3095dbcf-098e-45c9-a3a7-70c1b93b80fa-c000.txt.gz")['Body'].read())),
  Bucket="edgewalker-dev-ew-graph-ewtest-dev-working",
  Key="job_vertices/original_dataset/vertices/part-00000-3095dbcf-098e-45c9-a3a7-70c1b93b80fa-c000.txt"
)

In [None]:
# CSV is better than grok - 2s vs 2 minutes
df = glueContext.create_dynamic_frame_from_options(
    connection_type="s3",
    connection_options={
        # "compressionType": "gzip", # This makes the job very slow
        "paths": ["s3://edgewalker-dev-ew-graph-ewtest-dev-working/job_vertices/original_dataset/vertices/part-00000-3095dbcf-098e-45c9-a3a7-70c1b93b80fa-c000.txt"],
    },
    format="csv",
    format_options={"separator": "\t"},
    # format="grokLog", # This also makes the job very slow
    # format_options={"logFormat": "%{INT:id}\t%{GREEDYDATA:domain}"},
).rename_field('col0', '~id').rename_field('col1', '~label')

df.show(10)

In [None]:
# 3 min
filtered_df = df.filter(
  f=lambda x: x["~label"].endswith(".nz")
)
filtered_df.show(10)

In [None]:
# ~4mins
filtered_df.write(
    connection_type="s3",
    connection_options={
        "path": "s3://edgewalker-dev-ew-graph-ewtest-dev-working/job_vertices/nz_vertices",
    },
    format="parquet",
)

# Alt processing using spark
# df.toDF()
# df.write.parquet('s3://edgewalker-dev-ew-graph-ewtest-dev-working/job_vertices/nz_vertices')

In [None]:
client = wr.neptune.connect("ew-graph-ewtest-dev.cluster-c9v5npv1veav.us-east-1.neptune.amazonaws.com", 8182)

wr.neptune.bulk_load_from_files(
    client=client,
    path="s3://edgewalker-dev-ew-graph-ewtest-dev-working/job_vertices/nz_vertices",
    format="parquet",
    iam_role="arn:aws:iam::400678530796:role/ew_graph-ewtest-dev-job_vertices",
)