In [1]:
import os
import pandas as pd
import csv
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, expr, unbase64, decode, regexp_replace, from_json, to_date, count, to_timestamp, date_format, concat, format_string, year, month, date_format, lit, from_unixtime
from pyspark.sql.types import StringType, IntegerType, BooleanType, ArrayType, LongType
import re

In [2]:
def get_csv_columns_with_examples(repo_path):
    result = {}
    for root, _, files in os.walk(repo_path):
        for file in files:
            if file.endswith('.csv'):
                print("Trying", file)
                file_path = os.path.join(root, file)
                relative_path = os.path.relpath(file_path, repo_path)
                try:
                    # Read the CSV file
                    df = pd.read_csv(file_path, nrows=1)  # Only need the first row
                    # Create a dictionary of column names and their example values
                    columns_with_examples = {col: df[col].iloc[0] for col in df.columns}
                    # Add to the result dictionary
                    result[relative_path] = columns_with_examples
                except Exception as e:
                    print(f"Error reading file {relative_path}: {e}")
    return result

# Example usage
repo_path = "Datasets/"
columns_dict = get_csv_columns_with_examples(repo_path)

# Print or process the results
for file_path, columns in columns_dict.items():
    print(f"File: {file_path}")
    for column, example in columns.items():
        print(f"  {column}: {example}")


Trying Bitcoin_tweets.csv
Trying Bitcoin_tweets_dataset_2.csv
Trying tweets.csv
Trying bitcoin-tweets-2021.csv
Trying bitcoin-tweets-2022.csv
Trying Aug 20182018-08-19.csv
Trying Aug 20182018-08-25.csv
Trying 2018-04-07.csv
Trying 2018-04-11.csv
Trying 2018-04-13.csv
Trying 2018-04-14.csv
Trying 2018-04-15.csv
Trying 2018-04-17.csv
Trying 2018-04-18.csv
Trying 2018-04-19.csv
Trying 2018-04-20.csv
Trying 2018-04-21.csv
Trying 2018-04-22.csv
Trying 2018-04-23.csv
Trying 2018-04-24.csv
Trying 2018-04-28.csv
Trying 2018-04-30.csv
Trying 2017-08-02.csv
Trying 2017-08-04.csv
Trying 2017-08-05.csv
Trying 2017-08-13.csv
Trying 2017-08-15.csv
Trying 2017-08-16.csv
Trying 2017-08-18.csv
Trying 2017-08-22.csv
Trying 2017-08-23.csv
Trying 2017-08-26.csv
Trying 2018-08-01.csv
Trying 2018-08-03.csv
Trying 2018-08-04.csv
Trying 2018-08-06.csv
Trying 2018-08-16.csv
Trying 2018-08-17.csv
Trying 2018-08-18.csv
Trying 2018-08-20.csv
Trying 2018-08-21.csv
Trying 2018-08-22.csv
Trying 2018-08-24.csv
Trying

In [3]:
def get_csv_columns_with_examples_fast(repo_path):
    result = {}
    all_cols = set()
    for root, _, files in os.walk(repo_path):
        for file in files:
            if file.endswith('.csv'):
                print("Trying", file)
                file_path = os.path.join(root, file)
                relative_path = os.path.relpath(file_path, repo_path)
                try:
                    with open(file_path, 'r', newline='', encoding='utf-8') as csvfile:
                        reader = csv.reader(csvfile)
                        # Get the header (column names)
                        columns = next(reader)
                        # Get the first row of data
                        first_row = next(reader, [])
                        # Create a dictionary of column names and their first-row examples
                        columns_with_examples = {col: first_row[i] if i < len(first_row) else None for i, col in enumerate(columns)}
                        # Add to the result dictionary
                        result[relative_path] = columns_with_examples
                        all_cols.update(columns)

                except Exception as e:
                    print(f"Error reading file {relative_path}: {e}")
    return result, all_cols

# Example usage
repo_path = "Datasets/"
columns_dict, all_cols = get_csv_columns_with_examples_fast(repo_path)

# Print or process the results
for file_path, columns in columns_dict.items():
    print(f"File: {file_path}")
    for column, example in columns.items():
        print(f"  {column}: {example}")


Trying Bitcoin_tweets.csv
Trying Bitcoin_tweets_dataset_2.csv
Trying tweets.csv
Trying bitcoin-tweets-2021.csv
Trying bitcoin-tweets-2022.csv
Trying Aug 20182018-08-19.csv
Trying Aug 20182018-08-25.csv
Trying 2018-04-07.csv
Trying 2018-04-11.csv
Trying 2018-04-13.csv
Trying 2018-04-14.csv
Trying 2018-04-15.csv
Trying 2018-04-17.csv
Trying 2018-04-18.csv
Trying 2018-04-19.csv
Trying 2018-04-20.csv
Trying 2018-04-21.csv
Trying 2018-04-22.csv
Trying 2018-04-23.csv
Trying 2018-04-24.csv
Trying 2018-04-28.csv
Trying 2018-04-30.csv
Trying 2017-08-02.csv
Trying 2017-08-04.csv
Trying 2017-08-05.csv
Trying 2017-08-13.csv
Trying 2017-08-15.csv
Trying 2017-08-16.csv
Trying 2017-08-18.csv
Trying 2017-08-22.csv
Trying 2017-08-23.csv
Trying 2017-08-26.csv
Trying 2018-08-01.csv
Trying 2018-08-03.csv
Trying 2018-08-04.csv
Trying 2018-08-06.csv
Trying 2018-08-16.csv
Trying 2018-08-17.csv
Trying 2018-08-18.csv
Trying 2018-08-20.csv
Trying 2018-08-21.csv
Trying 2018-08-22.csv
Trying 2018-08-24.csv
Trying

In [4]:
cols_serie = pd.Series(list(all_cols))
cols_serie.to_csv("col-names.csv", index=False)


In [5]:
print(columns_dict.keys())

dict_keys(['archive\\Bitcoin_tweets.csv', 'archive\\Bitcoin_tweets_dataset_2.csv', 'archive (1)\\tweets.csv', 'archive (10)\\bitcoin-tweets-2021.csv', 'archive (10)\\bitcoin-tweets-2022.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Aug 20182018-08-19.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Aug 20182018-08-25.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-07.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-11.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-13.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-14.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-15.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-17.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-18.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-19.csv', 'archive (11)\\Raw-20201031T194820Z-001\\Raw\\Abril 2018\\2018-04-

In [46]:
# Step 1: Initialize SparkSession
spark = SparkSession.builder \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .appName("CSV Date Occurrence Analysis") \
    .getOrCreate()

# Step 2: Read the CSV file into a Spark DataFrame
dir = "Datasets"
folder = "archive"
file = "Bitcoin_tweets.csv"  # Replace with your CSV file path

file_path = "/".join([dir, folder, file])

initialDF = spark.read.csv(
    file_path, 
    header=True,  # Use the first row as column names
    inferSchema=True,  # Infer data types
    multiLine=True,  # Handle newlines within fields
    escape='"',  # Escape character for double quotes
    quote='"',  # Define the quote character

    # Different depending on the document
    sep=",",  # Specify the correct delimiter


    mode="PERMISSIVE"  # Handle malformed rows gracefully
)
# Show the schema of the data
initialDF.printSchema()

root
 |-- user_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_created: string (nullable = true)
 |-- user_followers: string (nullable = true)
 |-- user_friends: string (nullable = true)
 |-- user_favourites: string (nullable = true)
 |-- user_verified: string (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- source: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)



In [38]:
initialDF.show(truncate=False)

+--------------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+--------------+------------+---------------+-------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+-----------------------------------------+-------------------+
|user_name                                   |user_location  |user_description                                                                                                                                                |user_created       |user_followers|user_friends|user_favourites|user_verified|date               |text                                                             

In [39]:
if len(columns_dict[folder+'\\'+file]) > 1:
    dict_items = list(columns_dict[folder+'\\'+file].items())
else:
    keys = [key.split(';') for key in columns_dict[folder+'\\'+file].keys()]
    values = [value.split(';') for value in columns_dict[folder+'\\'+file].values()]
    dict_items = zip(keys[0], values[0])
df = pd.DataFrame(dict_items, columns=['Column', 'Example'])
df

Unnamed: 0,Column,Example
0,user_name,DeSota Wilson
1,user_location,"Atlanta, GA"
2,user_description,"Biz Consultant, real estate, fintech, startups..."
3,user_created,2009-04-26 20:05:09
4,user_followers,8534.0
5,user_friends,7605
6,user_favourites,4838
7,user_verified,False
8,date,2021-02-10 23:59:04
9,text,Blue Ridge Bank shares halted by NYSE after #b...


In [47]:
columns_required = ["date", "id", "text", "hashtags", "is_retweet", "retweets", "likes", "replies", "language", "user_id", "user_name", "user_location", "user_description", "user_creation_date", "user_followers", "user_friends", "user_favourites", "user_verified"]

columns = initialDF.columns

while len(columns) > 0:
    if columns[0] in columns_required:
        columns_required.remove(columns[0])
        columns.pop(0)
        continue
    column = input(f'Rename {columns[0]}:')
    column = str(column)
    if column == "drop":
        initialDF = initialDF.drop(columns[0])
        columns.pop(0)
    elif column in columns_required:
        initialDF = initialDF.withColumnRenamed(columns[0], column)
        columns_required.remove(column)
        columns.pop(0)
    
for column in columns_required:
    initialDF = initialDF.withColumn(column, lit(None))

columns_required_types = {
    "date": StringType(),  # Typically, date would be a string or timestamp
    "id": LongType(),  # IDs are usually integers
    "text": StringType(),  # Text is typically a string
    "hashtags": StringType(),  # Hashtags are a list of strings
    "is_retweet": BooleanType(),  # Whether a tweet is a retweet is a boolean
    "retweets": IntegerType(),  # Retweets count is an integer
    "likes": IntegerType(),  # Likes count is an integer
    "replies": IntegerType(),  # Replies count is an integer
    "language": StringType(), 
    "user_id": StringType(),  # User ID is usually an integer
    "user_name": StringType(),  # User name is a string
    "user_location": StringType(),  # User location is a string
    "user_description": StringType(),  # User description is a string
    "user_creation_date": StringType(),  # User creation date is typically a string or timestamp
    "user_followers": IntegerType(),  # User followers count is an integer
    "user_friends": IntegerType(),  # User friends count is an integer
    "user_favourites": IntegerType(),  # User favourites count is an integer
    "user_verified": BooleanType(),  # Whether a user is verified is a boolean
}

for column in columns_required_types.keys():
    initialDF = initialDF.withColumn(column, col(column).cast(columns_required_types[column]))


resultDF = initialDF.select("date", "id", "text", "hashtags", "is_retweet", "retweets", "likes", "replies", "language", "user_name", "user_id", "user_location")
userDF = initialDF.select( "user_id", "user_name", "user_location", "user_description", "user_creation_date", "user_followers", "user_friends", "user_favourites", "user_verified")

resultDF.printSchema()
userDF.printSchema()

root
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- retweets: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- replies: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_location: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_creation_date: string (nullable = true)
 |-- user_followers: integer (nullable = true)
 |-- user_friends: integer (nullable = true)
 |-- user_favourites: integer (nullable = true)
 |-- user_verified: boolean (nullable = true)



In [48]:
resultDF.show()

+-------------------+----+--------------------+--------------------+----------+--------+-----+-------+--------+--------------------+-------+---------------+
|               date|  id|                text|            hashtags|is_retweet|retweets|likes|replies|language|           user_name|user_id|  user_location|
+-------------------+----+--------------------+--------------------+----------+--------+-----+-------+--------+--------------------+-------+---------------+
|2021-02-10 23:59:04|NULL|Blue Ridge Bank s...|         ['bitcoin']|     false|    NULL| NULL|   NULL|    NULL|       DeSota Wilson|   NULL|    Atlanta, GA|
|2021-02-10 23:58:48|NULL|😎 Today, that's ...|['Thursday', 'Btc...|     false|    NULL| NULL|   NULL|    NULL|            CryptoND|   NULL|           NULL|
|2021-02-10 23:54:48|NULL|Guys evening, I h...|                NULL|     false|    NULL| NULL|   NULL|    NULL|           Tdlmatias|   NULL|London, England|
|2021-02-10 23:54:33|NULL|$BTC A big chance...|['Bitcoin', 

In [49]:
resultDF = resultDF.withColumn("parsed_date", date_format(to_timestamp(col("date")), "yyyy-MM-dd HH:mm:ss"))
resultDF.printSchema()
# userDF = userDF.withColumn("user_creation_date", date_format(to_timestamp(col("date")), "yyyy-MM-dd HH:mm:ss"))
# resultDF.printSchema()

root
 |-- date: string (nullable = true)
 |-- id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- is_retweet: boolean (nullable = true)
 |-- retweets: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- replies: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_location: string (nullable = true)
 |-- parsed_date: string (nullable = true)



In [50]:
null_dateDF = resultDF.filter(col("parsed_date").isNull())

In [52]:
null_dateDF.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+-----------------------------------------------------------------------------------------------------------------------+-------------------+----------+--------+-----+-------+--------+------------------------------------------------------------------------------------+-------+-------------------------------------------------------------------------------------------------------+-----------+
|date                                                                                                                                                                                                                          |id  |text                                                                                                                   |hashtags           |is_retwee

In [178]:
resultDF = resultDF.dropna(how='all')
resultDF.show(truncate=False)
userDF = userDF.dropna(how='all')
userDF.show(truncate=False)

+-------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------+--------+----------+--------+-----+-------+--------+---------+-------+-------------+
|date               |id  |text                                                                                                                                             |hashtags|is_retweet|retweets|likes|replies|language|user_name|user_id|user_location|
+-------------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------+--------+----------+--------+-----+-------+--------+---------+-------+-------------+
|2021-02-11 00:00:00|NULL|Blue Ridge Bank shares halted by NYSE after #bitcoin ATM announcement https://t.co/xaaZmaJKiV @MyBlueRidgeBank… https://t.co/sgBxMkP1SI          |NULL    |NULL      |NULL    |NULL |NULL   |NULL    |NULL 

In [179]:
tweets_folder = "Datasets/TweetDB/"
users_folder = "Datasets/UserDB/"
resultDF.write.option("header",True).csv(tweets_folder + folder + "_" + file)
userDF.write.option("header",True).csv(users_folder + folder + "_" + file)

In [170]:
readDF = spark.read.csv(
    tweets_folder + folder + "_" + file,
    header=True,  # Use the first row as column names
    inferSchema=True,  # Infer data types
    multiLine=True,  # Handle newlines within fields
)

In [171]:
readDF.show(truncate=False)

+-------------------+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+----------+--------+-----+-------+--------+---------+---------------+-------------+
|date               |id  |text                                                                                                                                                                                                                                                                                                                |hashtags|is_retweet|retweets|likes|replies|language|user_name|user_id        |user_location|
+-------------------+----+----------------------------------------------------------------------------------------------------------------------