In [11]:
from google.colab import userdata
!pip install boto3 s3fs

Collecting boto3
  Downloading boto3-1.35.74-py3-none-any.whl.metadata (6.7 kB)
Collecting s3fs
  Downloading s3fs-2024.10.0-py3-none-any.whl.metadata (1.7 kB)
Collecting botocore<1.36.0,>=1.35.74 (from boto3)
  Downloading botocore-1.35.74-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.4-py3-none-any.whl.metadata (1.7 kB)
Collecting aiobotocore<3.0.0,>=2.5.4 (from s3fs)
  Downloading aiobotocore-2.15.2-py3-none-any.whl.metadata (23 kB)
INFO: pip is looking at multiple versions of aiobotocore to determine which version is compatible with other requirements. This could take a while.
  Downloading aiobotocore-2.15.1-py3-none-any.whl.metadata (23 kB)
  Downloading aiobotocore-2.15.0-py3-none-any.whl.metadata (23 kB)
  Downloading aiobotocore-2.14.0-py3-none-any.whl.metadata (23 kB)
  Downloading aiobotocore-2.13

In [12]:
AWS_ACCESS_KEY = userdata.get('AWS_ACCESS_KEY')

In [13]:
AWS_SECRET_KEY = userdata.get('AWS_SECRET_KEY')

In [26]:
import boto3
from pyspark.sql import SparkSession

# Set up Spark session
spark = SparkSession.builder \
    .appName("ReadS3WithBoto3") \
    .getOrCreate()

# Set up boto3 S3 client
s3_client = boto3.client('s3', aws_access_key_id= AWS_ACCESS_KEY,
                                 aws_secret_access_key= AWS_SECRET_KEY ,
                                 region_name= "us-east-1")


In [28]:
bucket_name = "dataengineeringprojbucket"
file_key = "raw/uploaded_data.csv"
s3_client.download_file(bucket_name, file_key, "data.csv")

In [31]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("LoadCSVWithPySpark") \
    .getOrCreate()

file_path = "data.csv"

df = spark.read.csv(file_path, header=True, inferSchema=True)


df.show()


df.printSchema()


print(f"Total rows: {df.count()}")


+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  0|1467810369|Mon Apr 06 22:19:45 PDT 2009|NO_QUERY|_TheSpecialOne_|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D|
+---+----------+----------------------------+--------+---------------+-------------------------------------------------------------------------------------------------------------------+
|  0|1467810672|        Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|                                                                                               is upset that he ...|
|  0|1467810917|        Mon Apr 06 22:19:...|NO_QUERY|       mattycus|                                                                                               @Kenichan I dived...|
|  0|1467811184|        Mon Apr 06 22:19:...|NO_QUERY|        Ell

In [32]:
columns = ["sentiment", "id", "date", "query", "username", "content"]

In [33]:
data = df.toDF(*columns)

In [35]:
data.printSchema()

root
 |-- sentiment: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- query: string (nullable = true)
 |-- username: string (nullable = true)
 |-- content: string (nullable = true)



In [36]:
data.show(5)

+---------+----------+--------------------+--------+-------------+--------------------+
|sentiment|        id|                date|   query|     username|             content|
+---------+----------+--------------------+--------+-------------+--------------------+
|        0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|
|        0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|
|        0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|      ElleCTF|my whole body fee...|
|        0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|       Karoli|@nationwideclass ...|
|        0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|     joy_wolf|@Kwesidei not the...|
+---------+----------+--------------------+--------+-------------+--------------------+
only showing top 5 rows



In [40]:
# map sentiment to string
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
decode_map = {0: "NEGATIVE", 2: "NEUTRAL", 4: "POSITIVE"}
@udf(StringType())
def decode_sentiment(label):
    return decode_map.get(label, "UNKNOWN")
data = data.withColumn("sentiment", decode_sentiment(data.sentiment))

In [46]:
data.show(5)

+---------+----------+--------------------+--------+-------------+--------------------+
|sentiment|        id|                date|   query|     username|             content|
+---------+----------+--------------------+--------+-------------+--------------------+
| NEGATIVE|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|
| NEGATIVE|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|
| NEGATIVE|1467811184|Mon Apr 06 22:19:...|NO_QUERY|      ElleCTF|my whole body fee...|
| NEGATIVE|1467811193|Mon Apr 06 22:19:...|NO_QUERY|       Karoli|@nationwideclass ...|
| NEGATIVE|1467811372|Mon Apr 06 22:20:...|NO_QUERY|     joy_wolf|@Kwesidei not the...|
+---------+----------+--------------------+--------+-------------+--------------------+
only showing top 5 rows



In [58]:
from pyspark.sql.functions import udf, year, month, dayofmonth,hour
from pyspark.sql.types import TimestampType
from datetime import datetime
@udf(returnType=TimestampType())
def parse_date(date_str):
    try:

        date_without_tz = " ".join(date_str.split()[:-2]) + " " + date_str.split()[-1]
        return datetime.strptime(date_without_tz, "%a %b %d %H:%M:%S %Y")
    except ValueError:
        return None


data = data.withColumn("parsed_date", parse_date(data.date))


data = data.withColumn("year", year(data.parsed_date)) \
       .withColumn("month", month(data.parsed_date)) \
       .withColumn("day", dayofmonth(data.parsed_date)) \
       .withColumn("hour", hour(data.parsed_date))

data.select("sentiment", "id", "parsed_date", "year", "month", "day", "hour").show()

+---------+----------+-------------------+----+-----+---+----+
|sentiment|        id|        parsed_date|year|month|day|hour|
+---------+----------+-------------------+----+-----+---+----+
| NEGATIVE|1467810672|2009-04-06 22:19:49|2009|    4|  6|  22|
| NEGATIVE|1467810917|2009-04-06 22:19:53|2009|    4|  6|  22|
| NEGATIVE|1467811184|2009-04-06 22:19:57|2009|    4|  6|  22|
| NEGATIVE|1467811193|2009-04-06 22:19:57|2009|    4|  6|  22|
| NEGATIVE|1467811372|2009-04-06 22:20:00|2009|    4|  6|  22|
| NEGATIVE|1467811592|2009-04-06 22:20:03|2009|    4|  6|  22|
| NEGATIVE|1467811594|2009-04-06 22:20:03|2009|    4|  6|  22|
| NEGATIVE|1467811795|2009-04-06 22:20:05|2009|    4|  6|  22|
| NEGATIVE|1467812025|2009-04-06 22:20:09|2009|    4|  6|  22|
| NEGATIVE|1467812416|2009-04-06 22:20:16|2009|    4|  6|  22|
| NEGATIVE|1467812579|2009-04-06 22:20:17|2009|    4|  6|  22|
| NEGATIVE|1467812723|2009-04-06 22:20:19|2009|    4|  6|  22|
| NEGATIVE|1467812771|2009-04-06 22:20:19|2009|    4|  

In [57]:
data.show(5)

+---------+----------+--------------------+--------+-------------+--------------------+-------------------+----+-----+---+
|sentiment|        id|                date|   query|     username|             content|        parsed_date|year|month|day|
+---------+----------+--------------------+--------+-------------+--------------------+-------------------+----+-----+---+
| NEGATIVE|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|2009-04-06 22:19:49|2009|    4|  6|
| NEGATIVE|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|2009-04-06 22:19:53|2009|    4|  6|
| NEGATIVE|1467811184|Mon Apr 06 22:19:...|NO_QUERY|      ElleCTF|my whole body fee...|2009-04-06 22:19:57|2009|    4|  6|
| NEGATIVE|1467811193|Mon Apr 06 22:19:...|NO_QUERY|       Karoli|@nationwideclass ...|2009-04-06 22:19:57|2009|    4|  6|
| NEGATIVE|1467811372|Mon Apr 06 22:20:...|NO_QUERY|     joy_wolf|@Kwesidei not the...|2009-04-06 22:20:00|2009|    4|  6|
+---------+-----

In [60]:
#processing text
from pyspark.sql import functions as F
import re

def remove_emojis(text):
    emoji_pattern = re.compile("[\U00010000-\U0010ffff]", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)


remove_emojis_udf = F.udf(remove_emojis)


data = data.withColumn("clean_content",
                   F.lower(F.col("content"))  #lowercase
                   .alias("content_lower")
                  )

data = data.withColumn("clean_content",
                   F.regexp_replace(F.col("clean_content"), r"http\S+", "")  #  URLs
                   )

data = data.withColumn("clean_content",
                   remove_emojis_udf(F.col("clean_content"))  #  emojis
                   )

data = data.withColumn("clean_content",
                   F.regexp_replace(F.col("clean_content"), r"[^a-zA-Z\s]", "")  #special characters
                   )

# Show the processed DataFrame
data.select("sentiment", "content", "clean_content").show(truncate=False)

+---------+---------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+
|sentiment|content                                                                                                              |clean_content                                                                                                |
+---------+---------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+
|NEGATIVE |is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!      |is upset that he cant update his facebook by texting it and might cry as a result  school today also blah    |
|NEGATIVE |@Kenichan I dived many times 

In [61]:
data.show(5)

+---------+----------+--------------------+--------+-------------+--------------------+-------------------+----+-----+---+----+--------------------+
|sentiment|        id|                date|   query|     username|             content|        parsed_date|year|month|day|hour|       clean_content|
+---------+----------+--------------------+--------+-------------+--------------------+-------------------+----+-----+---+----+--------------------+
| NEGATIVE|1467810672|Mon Apr 06 22:19:...|NO_QUERY|scotthamilton|is upset that he ...|2009-04-06 22:19:49|2009|    4|  6|  22|is upset that he ...|
| NEGATIVE|1467810917|Mon Apr 06 22:19:...|NO_QUERY|     mattycus|@Kenichan I dived...|2009-04-06 22:19:53|2009|    4|  6|  22|kenichan i dived ...|
| NEGATIVE|1467811184|Mon Apr 06 22:19:...|NO_QUERY|      ElleCTF|my whole body fee...|2009-04-06 22:19:57|2009|    4|  6|  22|my whole body fee...|
| NEGATIVE|1467811193|Mon Apr 06 22:19:...|NO_QUERY|       Karoli|@nationwideclass ...|2009-04-06 22:19:57

In [62]:
final_data = data.select("id", "username", "clean_content", "year", "month", "day", "hour", "parsed_date", "sentiment")

In [63]:
final_data.show(5)

+----------+-------------+--------------------+----+-----+---+----+-------------------+---------+
|        id|     username|       clean_content|year|month|day|hour|        parsed_date|sentiment|
+----------+-------------+--------------------+----+-----+---+----+-------------------+---------+
|1467810672|scotthamilton|is upset that he ...|2009|    4|  6|  22|2009-04-06 22:19:49| NEGATIVE|
|1467810917|     mattycus|kenichan i dived ...|2009|    4|  6|  22|2009-04-06 22:19:53| NEGATIVE|
|1467811184|      ElleCTF|my whole body fee...|2009|    4|  6|  22|2009-04-06 22:19:57| NEGATIVE|
|1467811193|       Karoli|nationwideclass n...|2009|    4|  6|  22|2009-04-06 22:19:57| NEGATIVE|
|1467811372|     joy_wolf|kwesidei not the ...|2009|    4|  6|  22|2009-04-06 22:20:00| NEGATIVE|
+----------+-------------+--------------------+----+-----+---+----+-------------------+---------+
only showing top 5 rows



In [66]:
local_path = "out_data.csv"
final_data.coalesce(1).write.csv(local_path, header=True, mode="overwrite")
import os
from glob import glob


csv_file = glob(os.path.join(local_path, "*.csv"))[0]

s3_file_path = "cleaned/data.csv"
s3_client.upload_file(csv_file, bucket_name, s3_file_path)