## <font color='#0000FF'>MSC_DA_CA2 - INTEGRATEDCA2<font color='#1ABC9D'>
### <font color='#'>**Advanced Data Analytics  & Big Data Storage and Processing**
### <font color='#1ABC9C'>**Lecturer(s): David McQuaid and Muhammad Iqbal**
------
<font color='#1ABC9C'>**Student name / ID** // Rosilene Francisca da Silva - 2021090

### Data Integration and Preprocessing: Use Apache Spark to populate MySQL databases with large datasets efficiently. 
This process includes data integration and preprocessing, making it easier to manage data effectively.

This continuous assessment was required to identify and carry out an analysis of a large dataset gleaned from the Twitter API “ProjectTweets.csv”:

The dataset is guaranteed to have an organised schema, dependable transactional integrity, strong query capabilities, and simple integration with Apache Spark by selecting MySQL over a NoSQL database. These advantages combine to make MySQL suitable for populating the information and further analytical processing.

The dataset is guaranteed to have an organised schema, dependable transactional integrity, strong query capabilities, and simple integration with Apache Spark by selecting MySQL over a NoSQL database. These advantages combine to make MySQL the superior option for populating the information and doing further analytical processing.

"The dataset used in this analysis, ProjectTweets.csv, was provided by Professor [McQuaid] via the Moodle [https://moodle.cct.ie/mod/assign/view.php?id=44089]course [MSc in Data Analytics] CCT College Dublin on 18 April 2024."

### Configure Apache Spark to connect MySQL 
Start Spark Session:
Set up PySpark, including necessary packages for MySQL connections.

Rationale: This PySpark setup is designed to process data during a database comparison in an effective and scalable manner. With the application name "Database Comparative Analysis," the SparkSession is created, and backward compatibility with timestamp formats is guaranteed by the legacy time parser policy. In order to efficiently handle huge data volumes, the memory configurations for the executors and driver are set to 8 GB each. Moreover, having 4 cores per executor optimises parallel processing.

By dynamically allocating resources according to workload requirements, the cluster can grow from a minimum of 2 executors to a maximum of 100. Large dataset data shuffling speed is enhanced by increasing the shuffle divisions to 1000, which reduces execution time.

Furthermore, smooth connectivity between Spark and MySQL for direct data intake and analysis is ensured by including the MySQL JDBC connector jar. This setup offers a scalable and highly effective environment appropriate for large-scale database comparison analysis projects.

In [59]:
spark.stop()

In [60]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os

spark = SparkSession.builder \
    .appName("Database Comparative Analysis") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "100") \
    .config("spark.sql.shuffle.partitions", "1000") \
    .config("spark.jars", "/home/hduser/Downloads/mysql-connector-j-8.0.33.jar") \
    .getOrCreate()

24/05/11 18:40:04 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [61]:
spark

In [62]:
import py4j
print(py4j.__version__)

0.10.9.7


Rationale:
This PySpark setup is designed to process data during a database comparison in an effective and scalable manner. With the application name "Database Comparative Analysis," the SparkSession is created, and backward compatibility with timestamp formats is guaranteed by the legacy time parser policy. In order to efficiently handle huge data volumes, the memory configurations for the executors and driver are set to 8 GB each. Moreover, having 4 cores per executor optimises parallel processing.

By dynamically allocating resources according to workload requirements, the cluster can grow from a minimum of 2 executors to a maximum of 100. Large dataset data shuffling speed is enhanced by increasing the shuffle divisions to 1000, which reduces execution time. 

Furthermore, smooth connectivity between Spark and MySQL for direct data intake and analysis is ensured by including the MySQL JDBC connector jar. This setup offers a scalable and highly effective environment appropriate for large-scale database comparison analysis projects.

#### Before attempting to read the data, let's test reading data or just establish a connection to identify if has any issue

In [63]:
# Test loading a simple query to ensure connectivity
try:
    jdbc_df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost/twitter_data").option(
        "driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "tweet_details").option(
        "user", "root").option("password", "password").load()
    jdbc_df.show(1)
    print("Connection established successfully.")
    
except Exception as e:
    print("An error occurred:", e)

[Stage 0:>                                                          (0 + 1) / 1]

+---+----------+-------------------+------------+-------------+--------------------+
| id|   user_id|               date|query_status|  user_handle|          tweet_text|
+---+----------+-------------------+------------+-------------+--------------------+
|  1|1467810672|2009-04-06 22:19:49|    NO_QUERY|scotthamilton|is upset that he ...|
+---+----------+-------------------+------------+-------------+--------------------+
only showing top 1 row

Connection established successfully.


                                                                                

### Load the data from MySQL into a new DataFrame
The Database `twitter_data` was created on MySQL following the `tweet_details`table.  
Also defined and renamed the column names as "id", "user_id", "date", "query_status", "user_handle", and "tweet_text".

In [64]:
# Load the data from MySQL into a new DataFrame
mysql_df = spark.read.format("jdbc").options(
    url="jdbc:mysql://localhost/twitter_data",
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="tweet_details",
    user="root",
    password="password"
).load()

# Show the first few rows of the DataFrame to verify the data
mysql_df.show(2)

[Stage 1:>                                                          (0 + 1) / 1]

+---+----------+-------------------+------------+-------------+--------------------+
| id|   user_id|               date|query_status|  user_handle|          tweet_text|
+---+----------+-------------------+------------+-------------+--------------------+
|  1|1467810672|2009-04-06 22:19:49|    NO_QUERY|scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-06 22:19:53|    NO_QUERY|     mattycus|@Kenichan I dived...|
+---+----------+-------------------+------------+-------------+--------------------+
only showing top 2 rows



                                                                                

This code loaded the data from the tweet_details table in MySQL into a new DataFrame called mysql_df. As the data was displayed correctly, it indicates that the data was successfully read to MySQL from PySpark.

#### Verify the DataFrame
After load the dataset, it’s a good idea to check the type and first few rows of the DataFrame to ensure that everything is loaded correctly.

In [65]:
# Print DataFrame schema
mysql_df.printSchema()

mysql_df.show(5, truncate=True)

root
 |-- id: integer (nullable = true)
 |-- user_id: long (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- query_status: string (nullable = true)
 |-- user_handle: string (nullable = true)
 |-- tweet_text: string (nullable = true)



[Stage 2:>                                                          (0 + 1) / 1]

+---+----------+-------------------+------------+-------------+--------------------+
| id|   user_id|               date|query_status|  user_handle|          tweet_text|
+---+----------+-------------------+------------+-------------+--------------------+
|  1|1467810672|2009-04-06 22:19:49|    NO_QUERY|scotthamilton|is upset that he ...|
|  2|1467810917|2009-04-06 22:19:53|    NO_QUERY|     mattycus|@Kenichan I dived...|
|  3|1467811184|2009-04-06 22:19:57|    NO_QUERY|      ElleCTF|my whole body fee...|
|  4|1467811193|2009-04-06 22:19:57|    NO_QUERY|       Karoli|@nationwideclass ...|
|  5|1467811372|2009-04-06 22:20:00|    NO_QUERY|     joy_wolf|@Kwesidei not the...|
+---+----------+-------------------+------------+-------------+--------------------+
only showing top 5 rows



                                                                                

The first part outlines the schema definition, detailing the structure of a dataset with fields such as id, user_id, date, query_status, user_handle, and tweet_text, specifying data types and their nullable status. The second part presents a sample of actual data extracted from this schema, displaying records of tweets with timestamps, user handles, and partial tweet contents. Each row represents individual tweets, marked with an ID and user ID, and classified under "NO_QUERY", indicating that these tweets were not part of a specific query filter. This output is typical of database management or data analysis tasks, providing insights into the nature of the data handled, likely for further processing or analysis.

#### Dataframe Information
Show the number of rows and columns.

In [66]:
num_rows = mysql_df.count()
num_columns = len(mysql_df.columns)
print(f"Number of rows: {num_rows}, Number of columns: {num_columns}")

[Stage 3:>                                                          (0 + 1) / 1]

Number of rows: 1599999, Number of columns: 6


                                                                                

With 1,599,999 rows and 6 columns overall, the dataset has a sizable amount of data that may be thoroughly examined. Each row contains particular information possibly connected to tweets, with seven columns reflecting different dataset aspects. These features include numerical and categorical data. This dataset's extensive structure makes it possible to perform significant exploratory data analysis (EDA) and gain important insights about user behaviour and data patterns. It also makes it possible to perform more complex analysis or machine learning activities.

### Data Pre-Processing in PySpark

#### Checking for missing Values

In [67]:
# Initialize a flag to track whether missing values are found
from pyspark.sql.functions import col
missing_values_found = False

for column in mysql_df.columns:
    missing_count = mysql_df.filter(col(column).isNull() | (col(column) == '')).count()
    if missing_count > 0:
        print(f"Column {column} has {missing_count} missing values")
        missing_values_found = True

# Check the flag after checking all columns, and print a message if no missing values were found
if not missing_values_found:
    print("No missing values found in any column.")

[Stage 21:>                                                         (0 + 1) / 1]

No missing values found in any column.


                                                                                

Based on output the dataset there is no missing values.

In [68]:
# Displaying dtypes of columns
mysql_df.dtypes

[('id', 'int'),
 ('user_id', 'bigint'),
 ('date', 'timestamp'),
 ('query_status', 'string'),
 ('user_handle', 'string'),
 ('tweet_text', 'string')]

From the output the DataFrame schema, the columns `id` and `user_id` are numerical. 
The column `date`  has a type of timestamp, meaning it's a timestamp (or datetime) field, columns like `query_status` `user_handle`, and `tweet_text` has a type of string, meaning it's a textual field (or varchar in SQL).

#### Displaying some columns

In [69]:
mysql_df.select('user_id').show(n=5, truncate=False)

[Stage 24:>                                                         (0 + 1) / 1]

+----------+
|user_id   |
+----------+
|1467810672|
|1467810917|
|1467811184|
|1467811193|
|1467811372|
+----------+
only showing top 5 rows



                                                                                

In [70]:
mysql_df.select('date').show(n=5, truncate=False)

[Stage 25:>                                                         (0 + 1) / 1]

+-------------------+
|date               |
+-------------------+
|2009-04-06 22:19:49|
|2009-04-06 22:19:53|
|2009-04-06 22:19:57|
|2009-04-06 22:19:57|
|2009-04-06 22:20:00|
+-------------------+
only showing top 5 rows



                                                                                

The output illustrates the temporal character of the data by showing the first five rows of the date column from the dataset. Each entry in the date column is a timestamp indicating when a particular event, such as a tweet, occurred.

#### Using the.agg() method and the aggregation functions min and max to find the earliest and latest dates will print the first and last tweet dates. 

In [71]:
# Import Aggregation Functions
from pyspark.sql import functions as F

# Finding the first and last tweet dates
first_last_dates = mysql_df.agg(
    F.min("date").alias("first_date"),
    F.max("date").alias("last_date")
).collect()[0]

first_date = first_last_dates["first_date"]
last_date = first_last_dates["last_date"]

print(f"First tweet date: {first_date}")
print(f"Last tweet date: {last_date}")

[Stage 26:>                                                         (0 + 1) / 1]

First tweet date: 2009-04-06 22:19:49
Last tweet date: 2009-06-25 10:28:31


                                                                                

The earliest and latest timestamps for tweets suggest that the dataset covers the period of April 6, 2009, to June 25, 2009. The last tweet was sent out on June 25, 2009, at 10:28:31, and the first one was sent out on April 6, 2009, at 22:19:49. 

According to this, the dataset includes tweets from over three months' worth of time, giving it a temporal range that is appropriate for studying tweet patterns and trends during that time. The outcome helps to clarify the historical bounds of the dataset and directs further data analysis techniques.

In [72]:
# Selecting multiple columns
mysql_df.select(['user_handle','tweet_text']).show(n=5, truncate=False)

[Stage 29:>                                                         (0 + 1) / 1]

+-------------+---------------------------------------------------------------------------------------------------------------+
|user_handle  |tweet_text                                                                                                     |
+-------------+---------------------------------------------------------------------------------------------------------------+
|scotthamilton|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|
|mattycus     |@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                      |
|ElleCTF      |my whole body feels itchy and like its on fire                                                                 |
|Karoli       |@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. |
|joy_wolf     |@Kwesidei not the whole crew                                                             

                                                                                

The dataset's first rows provide insight into the `user_handle` and `tweet_text` columns. The user_handle column contains Twitter handles that indicate who wrote each tweet, but the tweet_text column has the actual text content of each tweet. For example, user scotthamilton complains about not being able to update Facebook via text, whereas mattycus describes diving for a ball. Other tweets show personal discomfort (ElleCTF), bewilderment (Karoli), and interactions with other users (joy_wolf). This selection captures various user moods and activities on Twitter within the dataset's duration, emphasising the diversity of content, which ranges from personal updates to social interactions. The truncate=False argument guarantees that the complete content of each tweet is displayed without truncation, providing a preview of the user's tweet interactions.

#### Data Distribution
Count distinct values of a categorical column.

In [73]:
# Selecting 'query_status' column
mysql_df.groupBy("query_status").count().show(10)

[Stage 30:>                                                         (0 + 1) / 1]

+------------+-------+
|query_status|  count|
+------------+-------+
|    NO_QUERY|1599999|
+------------+-------+



                                                                                

The dataset's `query_status` column. In this instance, the output indicates that the query_status of NO_QUERY applies to all 1,599,999 tweets. This suggests that the query_status column has a consistent value across all rows, indicating that the field was not meaningfully used in the dataset or that none of the tweets were connected to any particular search query. The consistency shows that, absent other columns or contextual information, the column might not be pertinent for more investigation. All things considered, this discovery can direct data analysts to concentrate on other columns for significant insights while taking into account possible drop this column from further studies.

In [74]:
# Selecting 'user_handle' column
mysql_df.groupBy("user_handle").count().show(10)

[Stage 33:>                                                         (0 + 1) / 1]

+---------------+-----+
|    user_handle|count|
+---------------+-----+
|     megan_rice|   15|
|         MeghTW|    1|
|stranger_danger|   14|
|       kyrabeth|    1|
|    lovelylivxo|   16|
|      tink68113|    1|
|     Svalentyna|    1|
|     bakerbelle|    1|
|  somethingalex|    1|
|     sexy_ass_T|    1|
+---------------+-----+
only showing top 10 rows



                                                                                

The tweet count of distinct users is calculated by grouping the dataset by the `user_handle` column and counting the number of tweets linked with each user. The first ten rows of the output reveal a unique Twitter user and the number of tweets they have sent. For example, megan_rice has 15 tweets, stranger_danger has 14, and lovelylivxo has 16, yet MeghTW, kyrabeth, so on each have only one tweet. This distribution shows that, while some individuals are frequent tweeters, others have a low presence in the dataset. 

### Summary Statistics
Generate summary statistics

In [75]:
# Basic statistics
mysql_df.describe().show()

[Stage 36:>                                                         (0 + 1) / 1]

+-------+-----------------+--------------------+------------+--------------------+--------------------+
|summary|               id|             user_id|query_status|         user_handle|          tweet_text|
+-------+-----------------+--------------------+------------+--------------------+--------------------+
|  count|          1599999|             1599999|     1599999|             1599999|             1599999|
|   mean|         800000.0|1.9988178841753244E9|        null| 4.325887521835714E9|                null|
| stddev|461880.0710141109| 1.935756789172917E8|        null|5.162733218454889E10|                null|
|    min|                1|          1467810672|    NO_QUERY|        000catnap000|                 ...|
|    max|          1599999|          2329205794|    NO_QUERY|          zzzzeus111|ï¿½ï¿½ï¿½ï¿½ï¿½ß§...|
+-------+-----------------+--------------------+------------+--------------------+--------------------+



                                                                                

The gives summary statistics for the dataset, shedding light on the distribution and features of each column. 

With a mean of 800,000.0 and a standard deviation of roughly 461,880, the id column shows a well-distributed set of unique identifiers ranging from 1 to 1,599,999. With a mean of almost 1.998 billion, the user_id column, which represents unique user identifiers, ranges from 1,467,810,672 to 2,329,205,794. All rows in the query_status column have the same value, NO_QUERY. The Twitter `user_handle` column span from 000catnap000 to zzzzeus111, indicating a wide range of users. 
Finally, a variety of text data are contained in the tweet_text column, including special characters that are evident in the maximum value ï¿½ï¿½ï¿½ï¿½ß§..., which may be indicative of encoding problems. A glimpse of the data distribution is given by the summary, which shows that although most columns are diverse and well-populated, the `query_status` column is uniform, and special characters would need to be cleaned up.

#### Correlation Analysis
Find correlations between numerical columns.

In [76]:
# List of numerical columns
numerical_columns = [col for col, dtype in mysql_df.dtypes if dtype in ('int', 'bigint', 'double', 'float')]
print(f"Numerical columns: {numerical_columns}")

Numerical columns: ['id', 'user_id']


In [77]:
# Summary statistics only for numerical columns
mysql_df.select(numerical_columns).describe().show()

[Stage 39:>                                                         (0 + 1) / 1]

+-------+-----------------+--------------------+
|summary|               id|             user_id|
+-------+-----------------+--------------------+
|  count|          1599999|             1599999|
|   mean|         800000.0|1.9988178841753244E9|
| stddev|461880.0710141109| 1.935756789172917E8|
|    min|                1|          1467810672|
|    max|          1599999|          2329205794|
+-------+-----------------+--------------------+



                                                                                

In [78]:
# Calculate correlations between numerical columns
from pyspark.sql.functions import corr

for column1 in numerical_columns:
    for column2 in numerical_columns:
        if column1 != column2:
            correlation = mysql_df.select(corr(column1, column2)).first()[0]
            print(f"Correlation between {column1} and {column2}: {correlation}")

                                                                                

Correlation between id and user_id: 0.22304937463402058


[Stage 45:>                                                         (0 + 1) / 1]

Correlation between user_id and id: 0.22304937463402053


                                                                                

There is roughly a 0.223 correlation between `id` and `user_id`. Although there is a linear link between the two columns, it is not very strong, as indicated by the slight positive correlation between id and user_id. This discovery may help direct future research or data modelling by providing insight into the distribution of data and user behaviour.

#### Adding a New Column with the current timestamp & Extract only those tweets that contain the keyword "summer" in the tweet_text column.
The `inserted_at` column will contain the timestamp indicating when each row was processed.

In [79]:
# Filtering tweets containing a specific keyword 'summer'
filtered_df = mysql_df.filter(mysql_df.tweet_text.contains("summer"))

# Add a current timestamp column for the insert time
from pyspark.sql.functions import current_timestamp

final_df = filtered_df.withColumn("inserted_at", current_timestamp())

# Select only the desired columns and print it
final_df.select("date", "tweet_text", "inserted_at").show(10, truncate=True)

[Stage 48:>                                                         (0 + 1) / 1]

+-------------------+--------------------+--------------------+
|               date|          tweet_text|         inserted_at|
+-------------------+--------------------+--------------------+
|2009-04-06 22:27:00|@jacobsummers Sor...|2024-05-11 18:41:...|
|2009-04-06 23:40:34|@kimmyawesome Ohh...|2024-05-11 18:41:...|
|2009-04-06 23:48:30|It's official! I'...|2024-05-11 18:41:...|
|2009-04-07 00:12:36|ok my TWEET PEEP ...|2024-05-11 18:41:...|
|2009-04-07 00:13:15|summer camp or su...|2024-05-11 18:41:...|
|2009-04-07 00:34:43|Downy weather  Wh...|2024-05-11 18:41:...|
|2009-04-07 00:35:12|Craaaaap. My Macb...|2024-05-11 18:41:...|
|2009-04-07 00:41:46|@dadi_iyal and yo...|2024-05-11 18:41:...|
|2009-04-07 01:57:26|@meatrack no more...|2024-05-11 18:41:...|
|2009-04-07 01:57:35|searching for a j...|2024-05-11 18:41:...|
+-------------------+--------------------+--------------------+
only showing top 10 rows



                                                                                

After filtering the dataset, this result displays the first ten rows of tweets that contain the keyword "summer". Each tweet is shown with its original timestamp (date), the tweet's text (tweet_text), and a new column (inserted_at) that shows the current timestamp when this information was processed. The date column providing information on the temporal distribution of tweets mentioning "summer." For example, one user is undecided between "summer camp or summer school," while another tweets about "Downy Weather." The `inserted_at` column indicates that the data was processed, distinguishing between the original tweet timestamps and the time of processing. 

This allows for the tracking of when the filtered data was curated. Overall, this result displays a wide range of attitudes and interests relating to "summer," from plans to technical concerns, providing useful insights into user interactions about this topic.

#### Word Frequency Count

In [80]:
# Frequency of words in the "tweet_text" column
from pyspark.sql.functions import explode, split
mysql_df.withColumn("word", explode(split(mysql_df["tweet_text"], "\s+"))).groupBy("word").count().orderBy("count", ascending=False).show(10)

[Stage 49:>                                                         (0 + 1) / 1]

+----+-------+
|word|  count|
+----+-------+
|    |1184159|
|  to| 552961|
|   I| 496616|
| the| 487501|
|   a| 366211|
|  my| 280025|
| and| 275263|
|   i| 249976|
|  is| 217692|
| you| 213871|
+----+-------+
only showing top 10 rows



                                                                                

To analyze text data, paying particular attention to word frequency in the "tweet_text" column. The DataFrame is then grouped according to the "word" column, and the count() method is used to determine how many times each word appears. The most common terms are then highlighted by sorting the results in descending order using the "count" column. Lastly, the top ten outcomes are shown. The output displays the word counts. Common English terms like "to," "I," and "the" are followed by the most often occurring word, 1,184,159 times (blank spaces, probably from many consecutive spaces or formatting errors in tweets). 

Rationale: Understanding the dataset's typical word usage, identifying any data cleaning problems (such as the large number of blanks), and using the results as a foundation for additional text analysis tasks like sentiment analysis or theme modelling are all made possible by this analysis.

In [81]:
# Print current DataFrame columns
print(mysql_df.columns)

['id', 'user_id', 'date', 'query_status', 'user_handle', 'tweet_text']


In [82]:
# Displaying dtypes of columns
mysql_df.dtypes

[('id', 'int'),
 ('user_id', 'bigint'),
 ('date', 'timestamp'),
 ('query_status', 'string'),
 ('user_handle', 'string'),
 ('tweet_text', 'string')]

### Converting the dataset from MySQL to Pandas

#### Before converting the dataset to pandas, drop some unnecessary columns for further analysis

In [83]:
mysql_df = mysql_df.drop('id','query_status')

In [84]:
mysql_df.show(2)

[Stage 52:>                                                         (0 + 1) / 1]

+----------+-------------------+-------------+--------------------+
|   user_id|               date|  user_handle|          tweet_text|
+----------+-------------------+-------------+--------------------+
|1467810672|2009-04-06 22:19:49|scotthamilton|is upset that he ...|
|1467810917|2009-04-06 22:19:53|     mattycus|@Kenichan I dived...|
+----------+-------------------+-------------+--------------------+
only showing top 2 rows



                                                                                

#### Checking for missing Values again after dropped some columns

In [85]:
from pyspark.sql.functions import col, sum

null_count = mysql_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in mysql_df.columns])
null_count.show()

[Stage 53:>                                                         (0 + 1) / 1]

+-------+----+-----------+----------+
|user_id|date|user_handle|tweet_text|
+-------+----+-----------+----------+
|      0|   0|          0|         0|
+-------+----+-----------+----------+



                                                                                

The output shows there are no missing values, then the dataset is ready for further processing.

### Collect the Dataset to Pandas 
Objective: Gather all rows from the Spark DataFrame (mysql_df) into a list of rows.

Functionality:
.collect(): Fetches all data rows from the distributed environment into the driver node as a list of Row objects.

Rationale: Because this operation requires sufficient memory, the memory configurations for the executors and driver were set from 4 GB to 8 GB each, the configuration below on Spark was changed at the beginning, and unnecessary columns were removed before running the process:

Setup before:

    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "100") \
    .config("spark.sql.shuffle.partitions", "1000") \ 

Setup upgraded:

     .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "100") \
    .config("spark.sql.shuffle.partitions", "1000") \

In [86]:
# Collect the Dataset to Pandas
new_df = mysql_df.collect()

                                                                                

### Create Pandas DataFrame from MySQL
Objective: Convert the list of rows into a Pandas DataFrame for further analysis.

Functionality: pd.DataFrame(new_df): Initializes a Pandas DataFrame from the list of rows.
twitter_df.columns = mysql_df.columns: Assigns the original column names from the Spark DataFrame (mysql_df) to the Pandas DataFrame.

In [87]:
#Create Pandas DataFrame from MySQL

import pandas as pd

twitter_df = pd.DataFrame(new_df)
twitter_df.columns =  mysql_df.columns
twitter_df.head(5)

Unnamed: 0,user_id,date,user_handle,tweet_text
0,1467810672,2009-04-06 22:19:49,scotthamilton,is upset that he can't update his Facebook by ...
1,1467810917,2009-04-06 22:19:53,mattycus,@Kenichan I dived many times for the ball. Man...
2,1467811184,2009-04-06 22:19:57,ElleCTF,my whole body feels itchy and like its on fire
3,1467811193,2009-04-06 22:19:57,Karoli,"@nationwideclass no, it's not behaving at all...."
4,1467811372,2009-04-06 22:20:00,joy_wolf,@Kwesidei not the whole crew


In [88]:
twitter_df.shape

(1599999, 4)

The techniques was applyed to collect a large dataset from a Spark DataFrame (mysql_df) to a Pandas DataFrame (twitter_df) for further analysis. 

The command .collect() gathers all rows from the Spark DataFrame into a list of Row objects, which are then transformed to a Pandas DataFrame. The resulting twitter_df has 1,599,999 rows and 4 columns, indicating that the dataset is large enough to be explored and manipulated with Pandas. However, due to the scale of the dataset, attention should be given with memory utilisation, as Pandas operations might be more memory-intensive than Spark DataFrames.

### Save the Dataset CSV

In [89]:
# Save the Pandas DataFrame to a CSV file
twitter_df.to_csv('twitter_df.csv', index = False)

In [90]:
!ls -lh

total 401M
-rw-r--r-- 1 hduser hadoopgroup  52K May 11 18:43 'Integrated_CA2MScDA_ BD_ADA -BDS1.ipynb'
-rw-r--r-- 1 hduser hadoopgroup 101K May 10 17:02 'Integrated_CA2MScDA_ BD_ADA .ipynb'
drwxr-xr-x 4 hduser hadoopgroup 4.0K May  6 10:02  MSCCA12023V2
-rw-r--r-- 1 hduser hadoopgroup 2.4M May  5 15:20  mysql-connector-j-8.0.33.jar
-rw-r--r-- 1 hduser hadoopgroup 219M Apr 28 09:50  ProjectTweets.csv
-rw-r--r-- 1 hduser hadoopgroup   32 Apr 25 09:23  README.md
-rw-r--r-- 1 hduser hadoopgroup 180M May 11 18:42  twitter_df.csv


The collected DataFrame (twitter_df) is saved to a CSV file named twitter_df.csv via the to_csv() method. The index=False argument ensures that the index column does not appear in the output. This method facilitates data export, allowing the dataset to be shared, analysed, or preserved in CSV format. The generated CSV file has 1,599,999 rows and four columns (user_id, date, user_handle, and tweet_text), offering a complete and portable snapshot of Twitter data for further analysis.

The command `!ls—lh` was applied to list all files in the current directory with detailed information like size, permissions, owner, and timestamps in a readable format and to confirm whether the twitter_df.csv dataset was stored back in the Hadoop directory.

### Transitioning to Pandas for Continuous Analysis in a New Jupyter Notebook (Integrated_CA2MScDA_ BD_ADA_V2)

#### Rationale:

The decision to proceed with data analysis in Python using Pandas on the Windows platform was influenced by several factors. Primarily, challenges with integrating TensorFlow on the Linux virtual machine (VM) prompted a switch, compounded by the VM's restrictive memory limitations. Furthermore, pandas provide a more extensive and detailed library for data manipulation than PySpark, especially considering the breadth and depth of functionality available directly out of the box. Pandas' comprehensive functionality greatly expands its data processing capabilities on the Windows platform.

#### New Jupyter Notebook: Integrated_CA2MScDA_ BD_ADA_V2 / Using the same dataset that was renamed and processed  as a 'twitter_df.csv' dataset .