# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: c8c46827-5483-49a0-bfc9-c37dd418fc7c
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session c8c46827-5483-49a0-bfc9-c37dd418fc7c to get into ready status...
Session c8c46827-5483-49a0-bfc9-c37dd418fc7c ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [10]:
df = glueContext.create_dynamic_frame.from_catalog(
    database = "project",
    table_name = "reddit_data"
).toDF()

df.printSchema()
df.show(5)

root
 |-- col0: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)

+--------------------+--------------------+-------------------+
|                col0|                col1|               col2|
+--------------------+--------------------+-------------------+
|               title|                body|       createad_utc|
|Time to Shake Thi...|**Posting again i...|2025-03-08T14:47:17|
|98.3% of ultrasou...|                    |2025-04-18T07:50:00|
|This ‘College Pro...|Massive Blue is h...|2025-04-17T11:02:16|
|What are some of ...|I've recently bee...|2025-04-17T15:37:28|
+--------------------+--------------------+-------------------+
only showing top 5 rows



In [11]:
from pyspark.sql.functions import col

df = df.withColumnRenamed("col0", "title") \
       .withColumnRenamed("col1", "body") \
       .withColumnRenamed("col2", "created_utc")
df = df.filter((col("title") != "title") & (col("body") != "body") & (col("created_utc") != "created_utc"))




In [12]:
from pyspark.sql.functions import concat_ws, regexp_replace, trim, split, col
from pyspark.ml.feature import StopWordsRemover

# 1. Объединяем title + body → full_text
df_cleaned = df.withColumn("full_text", concat_ws(" ", col("title"), col("body"))) \
    .filter(col("full_text").isNotNull())

# 2. Очищаем текст
df_cleaned = df_cleaned.withColumn("full_text", col("full_text").cast("string")) \
    .withColumn("full_text", regexp_replace(col("full_text"), r"http\S+|www\S+|https\S+", "")) \
    .withColumn("full_text", regexp_replace(col("full_text"), r"[^a-zA-Z\s]", "")) \
    .withColumn("full_text", regexp_replace(col("full_text"), r"\s+", " ")) \
    .withColumn("full_text", trim(col("full_text")))


# 4. Разбиваем текст на слова
df_cleaned = df_cleaned.withColumn("words", split(col("full_text"), " "))

# 5. Удаляем стоп-слова
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df_cleaned = remover.transform(df_cleaned)

df_cleaned = df_cleaned.drop("title", "body", "words")




In [13]:
df_cleaned.show()

+-------------------+--------------------+--------------------+
|        created_utc|           full_text|      filtered_words|
+-------------------+--------------------+--------------------+
|2025-03-08T14:47:17|Time to Shake Thi...|[Time, Shake, Thi...|
|2025-04-18T07:50:00|of ultrasound exa...|[ultrasound, exam...|
|2025-04-17T11:02:16|This College Prot...|[College, Protest...|
|2025-04-17T15:37:28|What are some of ...|[biggest, fears, ...|
|2025-04-17T19:56:52|An AI bot just us...|[AI, bot, used, n...|
|2025-04-17T20:48:28|what major should...|[major, choose, d...|
|2025-04-18T04:41:37|OneMinute Daily A...|[OneMinute, Daily...|
|2025-04-17T15:50:05|Just like ChatGPT...|[like, ChatGPT, G...|
|2025-04-17T22:47:05|I had no idea how...|[idea, much, stuf...|
|2025-04-18T03:42:55|What if AI was as...|[AI, advanced, po...|
|2025-04-17T16:20:36|Court documents r...|[Court, documents...|
|2025-04-16T21:54:30|Why nobody use AI...|[nobody, use, AI,...|
|2025-04-16T18:13:52|Whats the most un..

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [14]:
from awsglue.dynamicframe import DynamicFrame

# Преобразуем DataFrame в DynamicFrame
dyf_cleaned = DynamicFrame.fromDF(df_cleaned, glueContext, "dyf_cleaned")




In [15]:
glueContext.write_dynamic_frame.from_options(
    frame=dyf_cleaned,
    connection_type="s3",
    connection_options={
        "path": "s3://aws-nlp-project/cleaned_csv/",
        "partitionKeys": []  # Можно добавить ключи партиционирования, если нужно
    },
    format="parquet",
    format_options={"compression": "snappy"},
    transformation_ctx="write_cleaned_reddit"
)

<awsglue.dynamicframe.DynamicFrame object at 0x7fdb647f5010>


In [23]:
df_cleaned.write.mode("overwrite").option("header", "true") \
    .csv("s3://aws-nlp-project/cleaned_csv/")


