# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%%configure
{
   "--datalake-formats": "iceberg",
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse --conf spark.sql.defaultCatalog=glue_catalog"
}  

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
The following configurations have been updated: {'--datalake-formats': 'iceberg', '--conf': 'spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse --conf spark.sql.defaultCatalog=glue_catalog'}


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from datetime import datetime
import pandas as pd
from pyspark.sql.functions import to_timestamp

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: d924d1ec-290a-4346-9ed3-0f8bda130194
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--datalake-formats iceberg
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse --conf spark.sql.defaultCatalog=glue_c

In [None]:
from datetime import datetime
import pandas as pd
from pyspark.sql.functions import to_timestamp, to_date, col, when, lit
from pyspark.sql import functions as F
from pyspark.sql.types import LongType

In [8]:
# Delete rows where ID is not int
# New line characters to be removed, like /n
# Remove empty columns and almost empty

### Load data from bronze layer

In [47]:
dyf_sunbird_tweets_bronze = glueContext.create_dynamic_frame.from_catalog(database="tgsn_bronze", table_name="sunbird_x_tweets", transformation_ctx="dyf_sunbird_bronze")




In [48]:
dyf_sunbird_users_bronze = glueContext.create_dynamic_frame.from_catalog(database="tgsn_bronze", table_name="sunbird_x_users", transformation_ctx="dyf_sunbird_users_bronze")




### Converto from dynamic dataframe to spark dataframe

In [49]:
df_sunbird_tweets_bronze = dyf_sunbird_tweets_bronze.toDF()




In [50]:
df_sunbird_users_bronze = dyf_sunbird_users_bronze.toDF()




In [51]:
df_sunbird_users_bronze.columns

['id', 'name', 'username', 'created_at', 'protected', 'withheld', 'location', 'url', 'description', 'verified', 'verified_type', 'entities', 'profile_image_url', 'public_metrics', 'pinned_tweet_id', 'pinned_tweet_text', 'most_recent_tweet_id', 'most_recent_tweet_text', 'collection_date']


### Drop all rows where the tweet_id is not an integer

In [52]:
df_sunbird_tweets_bronze = df_sunbird_tweets_bronze.withColumn(
    'tweet_id_validation',F.col("tweet_id").cast(LongType()).isNotNull()
    ).filter(
        "tweet_id_validation=='true'"
            ).drop("tweet_id_validation")




In [53]:
df_sunbird_users_bronze = df_sunbird_users_bronze.withColumn(
    'most_recent_tweet_id_validation',F.col("most_recent_tweet_id").cast(LongType()).isNotNull()
    ).filter(
        "most_recent_tweet_id_validation=='true' or most_recent_tweet_id is null or most_recent_tweet_id == ''"
            ).drop("most_recent_tweet_id_validation")




### Check if there are tweet_ids that are not int

In [54]:
df_sunbird_tweets_bronze.select(
  "tweet_id",
  F.col("tweet_id").cast(LongType()).isNotNull().alias("Value")
).where("Value =='false'").show(20, False)

+--------+-----+
|tweet_id|Value|
+--------+-----+
+--------+-----+


In [55]:
df_sunbird_users_bronze.select(
  "most_recent_tweet_id",
  F.col("most_recent_tweet_id").cast(LongType()).isNotNull().alias("Value")
).where("Value =='false' and most_recent_tweet_id is not null and most_recent_tweet_id != ''").show(20, False)

+--------------------+-----+
|most_recent_tweet_id|Value|
+--------------------+-----+
+--------------------+-----+


### Remove all '\n' 

In [56]:
df_sunbird_tweets_bronze = df_sunbird_tweets_bronze.withColumn('tweet_content', F.regexp_replace('tweet_content', '\n', ''))




In [57]:
df_sunbird_users_bronze = df_sunbird_users_bronze.withColumn(
    'description', F.regexp_replace('description', '\n', '')).withColumn(
    'pinned_tweet_text', F.regexp_replace('pinned_tweet_text', '\n', '')).withColumn(
    'most_recent_tweet_text', F.regexp_replace('most_recent_tweet_text', '\n', ''))




In [58]:
df_sunbird_tweets_bronze.where("tweet_content like '%\n%'").select(df_sunbird_tweets_bronze.columns[8:12]).show(5, False)

+-------------+----------------------+------------------------------+----------------------------+
|tweet_content|edit_history_tweet_ids|edit_controls_is_edit_eligible|edit_controls_editable_until|
+-------------+----------------------+------------------------------+----------------------------+
+-------------+----------------------+------------------------------+----------------------------+


### Check if the edit_history_tweet_ids has anything that are not tweet_ids

In [59]:
df_sunbird_tweets_bronze.withColumn(
    'edit_history_tweet_ids_noComma', F.regexp_replace('edit_history_tweet_ids', ',', '')).select(
      "edit_history_tweet_ids_noComma",
      F.col("edit_history_tweet_ids_noComma").cast(LongType()).isNotNull().alias("Value")
    ).filter(F.col("edit_history_tweet_ids_noComma").rlike('\D+')).show(20, False)

+------------------------------+-----+
|edit_history_tweet_ids_noComma|Value|
+------------------------------+-----+
+------------------------------+-----+


No lines, all rows have tweet_ids in edit_history_tweet_ids

### Replace empty values '' for None

In [60]:
for column in df_sunbird_tweets_bronze.columns:
    df_sunbird_tweets_bronze = \
    df_sunbird_tweets_bronze.withColumn(column, \
                                        when(df_sunbird_tweets_bronze[f'{column}']=='' ,None) \
                                        .otherwise(df_sunbird_tweets_bronze[f'{column}']))




In [61]:
for column in df_sunbird_users_bronze.columns:
    df_sunbird_users_bronze = \
    df_sunbird_users_bronze.withColumn(column, \
                                        when(df_sunbird_users_bronze[f'{column}']=='' ,None) \
                                        .otherwise(df_sunbird_users_bronze[f'{column}']))




### Check empty columns and almost empty columns

In [62]:
empty_col = []
almost_empty_col = []
threshold = 0.1 # threshold to use to define almost empty columns

total_count = df_sunbird_tweets_bronze.count()
for column in df_sunbird_tweets_bronze.columns:
    cnt = df_sunbird_tweets_bronze.where(df_sunbird_tweets_bronze[f'{column}'].isNotNull()).count()
    if cnt == 0:
        empty_col.append(column)
        df_sunbird_tweets_bronze = df_sunbird_tweets_bronze.drop(column)

    elif cnt/total_count < threshold:
        almost_empty_col.append(column)
        df_sunbird_tweets_bronze = df_sunbird_tweets_bronze.drop(column)

print(f'Empty columns: {empty_col}')
print(f'-------------')
print(f'Almost empty columns: {almost_empty_col}')

Empty columns: ['poll_end', 'poll_ids', 'source_app', 'place_contained_within', 'place_country', 'place_country_code', 'place_full_name', 'place_name', 'place_type']
-------------
Almost empty columns: ['note_tweet_text', 'poll_duration_minutes', 'poll_options', 'poll_voting_status', 'geo_place_id', 'geo_coordinates', 'withheld_copyright', 'withheld_country_codes', 'place_id']


In [63]:
empty_col = []
almost_empty_col = []
threshold = 0.1 # threshold to use to define almost empty columns

total_count = df_sunbird_users_bronze.count()
for column in df_sunbird_users_bronze.columns:
    cnt = df_sunbird_users_bronze.where(df_sunbird_users_bronze[f'{column}'].isNotNull()).count()
    if cnt == 0:
        empty_col.append(column)
        df_sunbird_users_bronze = df_sunbird_users_bronze.drop(column)

    elif cnt/total_count < threshold:
        almost_empty_col.append(column)
        df_sunbird_users_bronze = df_sunbird_users_bronze.drop(column)

print(f'Empty columns: {empty_col}')
print(f'-------------')
print(f'Almost empty columns: {almost_empty_col}')

Empty columns: []
-------------
Almost empty columns: []


### Save cleaned data into Silver

In [66]:
additional_options = {}
# Create or replace table
df_sunbird_tweets_bronze.writeTo("glue_catalog.tgsn_silver.sunbird_x_tweets") \
        .tableProperty("format-version", "2") \
        .tableProperty("location", "s3://tgsn-silver-bucket/sunbird/x/tweets/tgsn_silver/sunbird_x_tweets") \
        .tableProperty("write.parquet.compression-codec", "gzip") \
        .options(**additional_options) \
        .partitionedBy("collection_date") \
.createOrReplace()




In [65]:
additional_options = {}
# Create or replace table
df_sunbird_users_bronze.writeTo("glue_catalog.tgsn_silver.sunbird_x_users") \
        .tableProperty("format-version", "2") \
        .tableProperty("location", "s3://tgsn-silver-bucket/sunbird/x/users/tgsn_silver/sunbird_x_users") \
        .tableProperty("write.parquet.compression-codec", "gzip") \
        .options(**additional_options) \
        .partitionedBy("collection_date") \
.createOrReplace()


