In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession   

In [0]:
spark

In [0]:
# Create a SparkSession

#sparksession is basically a entry point of our spark driver's
spark = SparkSession.builder \
    .appName("Transformation") \
    .getOrCreate()

In [0]:
#CREATE ELON_DF DATAFRAME FROM CSV FILE

elon_df=spark.read.format("csv").option("header","true").load("/FileStore/tables/elon_X_tweets-2.csv")

In [0]:
elon_df.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
| ID|                link|                text|                user|               likes|            comments|            retweets|       external-link|        quotes|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|  0|https://twitter.c...|This post was mad...|           Elon Musk|               91567|                7656|               11121|                null|           987|
|  1|https://twitter.c...|                null|           Elon Musk|               48095|                3256|                9219|                null|           506|
|  2|https://twitter.c...|Tesla manufacturi...|               Tesla|               12183|                 755|                1774|                null|        

In [0]:
#CREATE MRBEAST_DF FROM CSV FILE

In [0]:
mrbeast_df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/mr_beast_csv-3.csv")
   

In [0]:
mrbeast_df.show()

+---+--------------------+--------------------+-------+-------+--------+--------+-------------+------+
| ID|                link|                text|   user|  likes|comments|retweets|external-link|quotes|
+---+--------------------+--------------------+-------+-------+--------+--------+-------------+------+
|  0|https://twitter.c...|No takesies backsies|MrBeast|1263270|   38423|   67066|         null| 12458|
|  1|https://twitter.c...|The reason I’m do...|MrBeast|   9908|    1283|     503|         null|    37|
|  2|https://twitter.c...|If you run a busi...|MrBeast|   8898|     928|     484|         null|    33|
|  3|https://twitter.c...|Some stores I vis...|MrBeast|   6939|    1104|     382|         null|    41|
|  4|https://twitter.c...|Found another one...|MrBeast|   8008|    1043|     443|         null|    41|
|  5|https://twitter.c...|I spent 15 hours ...|MrBeast|   9960|    1140|     404|         null|   133|
|  6|https://twitter.c...|I’m seeing this a...|MrBeast|  30128|    2953| 

In [0]:
#DROP ALL NULL COLUMNS FROM BOTH DATAFRAME

elon_df1 =elon_df.drop('external-link')
mrbeast_df1=mrbeast_df.drop('external-link')


In [0]:
elon_df1.show()
mrbeast_df1.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
| ID|                link|                text|                user|               likes|            comments|            retweets|        quotes|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|  0|https://twitter.c...|This post was mad...|           Elon Musk|               91567|                7656|               11121|           987|
|  1|https://twitter.c...|                null|           Elon Musk|               48095|                3256|                9219|           506|
|  2|https://twitter.c...|Tesla manufacturi...|               Tesla|               12183|                 755|                1774|           204|
|  3|https://twitter.c...|San Francisco, th...|           Elon Musk|               52292|                3061|        

In [0]:
#SINCE OUR DATA IS SCATEERED, WE USED THIS TRANSFORMATION TO SET OUR COLUMN WIDTH TO GET IT IN CONCISE MANNER.

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Enable eager evaluation for better display
spark.conf.set("spark.sql.repl.eagerEval.truncate", 50)   # Set the column width



In [0]:
elon_df1.show()
mrbeast_df1.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
| ID|                link|                text|                user|               likes|            comments|            retweets|        quotes|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|  0|https://twitter.c...|This post was mad...|           Elon Musk|               91567|                7656|               11121|           987|
|  1|https://twitter.c...|                null|           Elon Musk|               48095|                3256|                9219|           506|
|  2|https://twitter.c...|Tesla manufacturi...|               Tesla|               12183|                 755|                1774|           204|
|  3|https://twitter.c...|San Francisco, th...|           Elon Musk|               52292|                3061|        

In [0]:
#BROADCAST THE SMALL DATAFRAME i.e elon_df1  before joining 

elon_df2=broadcast(elon_df1)

In [0]:
#CHNAGE NAME OF ELON DATAFRAME TO ELON_DF2

elon_df2.show()

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
| ID|                link|                text|                user|               likes|            comments|            retweets|        quotes|
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+
|  0|https://twitter.c...|This post was mad...|           Elon Musk|               91567|                7656|               11121|           987|
|  1|https://twitter.c...|                null|           Elon Musk|               48095|                3256|                9219|           506|
|  2|https://twitter.c...|Tesla manufacturi...|               Tesla|               12183|                 755|                1774|           204|
|  3|https://twitter.c...|San Francisco, th...|           Elon Musk|               52292|                3061|        

In [0]:


elon_df2.printSchema()


root
 |-- ID: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)



#fetch top 90 tweets of elon musk in terms of likes

In [0]:


## elon_df4 is basically a dataframe with top 50 tweets in terms of likes

elon_df4 = elon_df2.orderBy(elon_df2["likes"].desc()).limit(90)

In [0]:
elon_df4.show(truncate=False) 

+---+--------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------

In [0]:
elon_df4.count()

Out[18]: 90

In [0]:
elon_df4.printSchema()

root
 |-- ID: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)




NOW WE WILL FIND THE ROLLING COMMENTS OF USER "ELON_MUSK" AFTER EVERY 10 TEXT

In [0]:
from pyspark.sql import Window
import pyspark.sql.functions as F

In [0]:
#DEFINE WINDOW SPECIFICATION // SPECIFY ON HOW MANY ROWS SHOULD WE WINDOW FOR A RSULATANT

#HERE WE ARE WINDOWING OVER 10 ROWS

window_spec = Window.orderBy("ID").rowsBetween(Window.currentRow, 9)

In [0]:
#COLUMN  ("ROLLING COMMENTS")  WILL BE POPULATED WITH THE ROLLING SUM OF COMMENTS AFTER EVERY 10 DAYS 

elon_rolling = elon_df4.withColumn("rolling_comments", F.sum("comments").over(window_spec))

elon_rolling.show()

+---+--------------------+--------------------+--------------------+------------------+--------+--------+------+----------------+
| ID|                link|                text|                user|             likes|comments|retweets|quotes|rolling_comments|
+---+--------------------+--------------------+--------------------+------------------+--------+--------+------+----------------+
|  0|https://twitter.c...|This post was mad...|           Elon Musk|             91567|    7656|   11121|   987|         34269.0|
| 10|https://twitter.c...|Over 1 million jo...|              Hiring|              7817|     980|    1358|   114|         47886.0|
|103|https://twitter.c...|Congrats to @Spac...|           Elon Musk|             71405|    3546|    9237|   282|         47422.0|
|105|https://twitter.c...|Watch Falcon 9 la...|              SpaceX|              7908|     648|    1456|    96|         44498.0|
|108|https://twitter.c...|BREAKING: 𝕏 outp...|        DogeDesigner|              9321|    

In [0]:
elon_rolling.printSchema()

root
 |-- ID: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)
 |-- rolling_comments: double (nullable = true)




#fetch top 50 records of mr beast in terms of likes



In [0]:
mrbeast_df4=mrbeast_df1.orderBy(desc("likes")).limit(90)

In [0]:
mrbeast_df4.show()

+---+--------------------+--------------------+-------+------+--------+--------+------+
| ID|                link|                text|   user| likes|comments|retweets|quotes|
+---+--------------------+--------------------+-------+------+--------+--------+------+
|420|https://twitter.c...|Would you sit in ...|MrBeast| 99752|    4021|    1152|   270|
|  5|https://twitter.c...|I spent 15 hours ...|MrBeast|  9960|    1140|     404|   133|
|  1|https://twitter.c...|The reason I’m do...|MrBeast|  9908|    1283|     503|    37|
|250|https://twitter.c...|We recreated Will...|MrBeast| 98937|    1902|    2686|   271|
|193|https://twitter.c...|I found an old ph...|MrBeast| 98490|    2343|    2297|   133|
|219|https://twitter.c...|I’m curious what ...|MrBeast| 98468|    3135|    1825|   471|
|174|https://twitter.c...|Can I be the new ...|MrBeast|980836|   25663|   38852|  5010|
|324|https://twitter.c...|Today was Beast P...|MrBeast| 97831|    1104|    2859|   199|
|352|https://twitter.c...|I’m in

In [0]:
mrbeast_df4.count()

Out[26]: 90


NOW WE WILL FIND THE ROLLING COMMENTS OF USER "ELON_MUSK" AFTER EVERY 10 TEXT

In [0]:
#DEFINE WINDOW SPECIFICATION // SPECIFY ON HOW MANY ROWS SHOULD WE WINDOW FOR A RSULATANT

#HERE WE ARE WINDOWING OVER 10 ROWS

window_spec1 = Window.orderBy("ID").rowsBetween(Window.currentRow, 9)

In [0]:
#COLUMN  ("ROLLING COMMENTS")  WILL BE POPULATED WITH THE ROLLING SUM OF COMMENTS AFTER EVERY 10 DAYS 

mrbeast_rolling = mrbeast_df4.withColumn("rolling_comments", F.sum("comments").over(window_spec1))

mrbeast_rolling.show()

+---+--------------------+--------------------+---------------+------+--------+--------+------+----------------+
| ID|                link|                text|           user| likes|comments|retweets|quotes|rolling_comments|
+---+--------------------+--------------------+---------------+------+--------+--------+------+----------------+
|  1|https://twitter.c...|The reason I’m do...|        MrBeast|  9908|    1283|     503|    37|         56987.0|
| 10|https://twitter.c...|If you don’t watc...|        MrBeast| 81351|   12234|    4528|   626|         58636.0|
|102|https://twitter.c...|Woke up and reali...|        MrBeast|730869|   25930|   22436|  8246|         50024.0|
|108|https://twitter.c...|Was 40%+ body fat...|        MrBeast| 80482|    1489|     938|    81|         45715.0|
|112|https://twitter.c...|We compared a $1 ...|        MrBeast| 74084|    2077|    1682|   181|         47502.0|
|118|https://twitter.c...|New video almost ...|        MrBeast| 82479|    3178|    2555|   281| 


JOIN BOTH DATAFRAME (ELON_DF4)  & (MRBEAST_DF4)

In [0]:
elon_mrbeast_df=elon_rolling.join(mrbeast_rolling, elon_df4["id"]==mrbeast_df4["id"], "inner")

In [0]:
elon_mrbeast_df.count()

Out[30]: 14

In [0]:
elon_mrbeast_df.show()

+---+--------------------+--------------------+---------------+-----+--------+--------+------+----------------+---+--------------------+--------------------+-------+-----+--------+--------+------+----------------+
| ID|                link|                text|           user|likes|comments|retweets|quotes|rolling_comments| ID|                link|                text|   user|likes|comments|retweets|quotes|rolling_comments|
+---+--------------------+--------------------+---------------+-----+--------+--------+------+----------------+---+--------------------+--------------------+-------+-----+--------+--------+------+----------------+
| 10|https://twitter.c...|Over 1 million jo...|         Hiring| 7817|     980|    1358|   114|         47886.0| 10|https://twitter.c...|If you don’t watc...|MrBeast|81351|   12234|    4528|   626|         58636.0|
|108|https://twitter.c...|BREAKING: 𝕏 outp...|   DogeDesigner| 9321|    1326|    1469|   117|         50097.0|108|https://twitter.c...|Was 40%+ 

In [0]:
elon_mrbeast_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)
 |-- rolling_comments: double (nullable = true)
 |-- ID: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)
 |-- rolling_comments: double (nullable = true)




after joining dataframe, we want to select user and all their columns who has highest no of rolling comments after every 10 tweets, i have columns like id, tweets, retweets, like, comments


WRITE DF (ELON_MRBEAST_DF TO S3 BUCKET)

In [0]:
import boto3


SINCE WE HAVE DUPLICATE NAMES IN COLUMN, S3 IS ENCOUNTERING AN ERROR :

AnalysisException: [COLUMN_ALREADY_EXISTS] The column `comments` already exists. Consider to choose another name or rename the existing column.


In [0]:
# Get the list of all columns
all_columns = elon_mrbeast_df.columns

In [0]:
# Create a dictionary to keep track of column name counts
column_counts = {}

In [0]:
#loop through each column and add suffix to only duplicate names leaving uniwue names unchanged

new_columns=[]

for column in all_columns:
   if column in column_counts:
       column_counts[column] += 1
       new_columns.append(f"{column}_{column_counts[column]}")
   else:
       column_counts[column] = 1
       new_columns.append(column)


In [0]:
elon_mrbeast_df=elon_mrbeast_df.toDF(*new_columns)


In [0]:
elon_mrbeast_df.show()

+---+--------------------+--------------------+---------------+-----+--------+--------+------+----------------+----+--------------------+--------------------+-------+-------+----------+----------+--------+------------------+
| ID|                link|                text|           user|likes|comments|retweets|quotes|rolling_comments|ID_2|              link_2|              text_2| user_2|likes_2|comments_2|retweets_2|quotes_2|rolling_comments_2|
+---+--------------------+--------------------+---------------+-----+--------+--------+------+----------------+----+--------------------+--------------------+-------+-------+----------+----------+--------+------------------+
| 10|https://twitter.c...|Over 1 million jo...|         Hiring| 7817|     980|    1358|   114|         47886.0|  10|https://twitter.c...|If you don’t watc...|MrBeast|  81351|     12234|      4528|     626|           58636.0|
|108|https://twitter.c...|BREAKING: 𝕏 outp...|   DogeDesigner| 9321|    1326|    1469|   117|       

In [0]:
import os

# Set AWS credentials as environment variables
os.environ["AWS_ACCESS_KEY_ID"] = "MMMMMMMMMMMM"
os.environ["AWS_SECRET_ACCESS_KEY"] = "XXXXXXX"


In [0]:
# Configure Spark with AWS credentials

#Configure Spark to use the AWS credentials you've set as environment variables.

spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.environ["AWS_ACCESS_KEY_ID"])
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.environ["AWS_SECRET_ACCESS_KEY"])



In [0]:
# Assuming you already have a DataFrame named elon_beast_data that you want to save to S3

# Step 2: Write the DataFrame to S3 in CSV format

elon_mrbeast_df.write.format("csv").mode("overwrite").option("header", "true").save("s3://mydatabrickss3bucket/BUCK/")



REASON WHY WE WROTE OPTION("HEADER, "TRUE)  BECAUSE WE HAVE ALREADY LOADED A CSV FILE IN S3 BUCKET WHICH DID'T HAVE ANY HEADER / COLUMN NAME IN IT.

THIS MAY, WE INCLUDED ("MODE", "OVERWRITE) AND "HEADER" OPTION IN QUERY.