In [None]:
sc

### CONNECTING TO MONGODB

In [None]:
!mongo --version

In [None]:
!mongosh

In [None]:
from pyspark.sql import SparkSession

# Create Spark Session for MongoDB
spark = SparkSession.builder \
        .appName("DFToMongoDB") \
        .getOrCreate()

# # #
data = [("John", 28), ("Alice", 22), ("Bob", 32)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

In [None]:
df.show(3)

In [None]:
# Configure MongoDB Database Connection
df.write.format("mongodb") \
    .option("uri","mongodb://127.0.0.1:27017/") \
    .option("database","sample_db") \
    .option("collection","scb") \
    .mode("append").save()

### CONNECTING TO MYSQL

In [None]:
!mysql --version

In [None]:
from pyspark.sql import SparkSession

# Create Spark Session for MySQL
spark = SparkSession.builder \
    .appName("DFToMySQL") \
    .getOrCreate()

# # #
data = [("John", 28), ("Alice", 22), ("Bob", 32)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Configure MySQL Database Connection
mysql_options = {
    "url": "jdbc:mysql://localhost:3306/sample",  # MySQL bağlantı URL'si
    "driver": "com.mysql.cj.jdbc.Driver",  # MySQL JDBC sürücüsü
    "dbtable": "yourtable",  # Hedef MySQL tablo adı
    "user": "root",  # MySQL kullanıcı adı
    "password": "password"  # MySQL parola
}

# DataFrame'i MySQL veritabanına yükleyin
df.write.format("jdbc").options(**mysql_options).mode("overwrite").save()

In [None]:
!pwd

In [None]:
!cat zahid.txt

# ================================
# READ TO CSV FROM HDFS VIA SPARK
# ================================

In [1]:
from pyspark.sql import SparkSession

# Create Spark Session
spark = SparkSession.builder \
        .appName("HDFSToCSV") \
        .getOrCreate()

# Specify CSV file path throught HDFS
hdfs_file_path = "/ProjectTweets.csv"

# Read CSV file with Spark DataFrame
df = spark.read.csv(hdfs_file_path, header=False, inferSchema=True)

                                                                                

In [None]:
# Show DataFrame First 5 Rows
df.show(5)

In [None]:
df.printSchema()

In [None]:
# The first method for renamed the column names
df1 = df.withColumnRenamed("_c0", "id").withColumnRenamed("_c1", "stamp").withColumnRenamed("_c2", "date").withColumnRenamed("_c3", "flag").withColumnRenamed("_c4", "user").withColumnRenamed("_c5", "text")
df1.show(5)

In [2]:
# The second method for renamed the column names
df = df.selectExpr("_c0 as ID", "_c1 as STAMP", "_c2 as DATE", "_c3 as FLAG", "_c4 as USER", "_c5 as TEXT")
df.show(5)

+---+----------+--------------------+--------+---------------+--------------------+
| ID|     STAMP|                DATE|    FLAG|           USER|                TEXT|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|  1|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|  2|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|  3|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|  4|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows



In [None]:
# How many rows does the dataframe 
row_count = df.count()
# Print row_count
print("DataFrame has {} rows.".format(row_count))

In [None]:
from pyspark.sql.functions import col

columns = ["ID", "STAMP", "DATE", "FLAG", "USER", "TEXT"]

Columns = df.columns

# Check out the each column and Count unique values
for column in Columns:
    unique_values = df.select(column).distinct()
    unique_count = unique_values.count()
    
    if unique_count > 0:
        print(f"{column} has {unique_count} unique values:")
    else:
        print(f"{column} has no unique value.")

In [None]:
from pyspark.sql.functions import col

columns = ["ID", "STAMP", "DATE", "FLAG", "USER", "TEXT"]

Columns = df.columns

# Check out the each column and Count duplicate values
for column in Columns:
    count_df = df.groupBy(column).count()
    duplicate_values = count_df.filter(col("count") > 1).count()
    
    if duplicate_values > 0:
        print(f"{column} has {duplicate_values} duplicate values.")
    else:
        print(f"{column} has no duplicate value.")

In [None]:
# Drop the selected columns
df = df.drop("STAMP", "FLAG", "USER")
df.show(5)

In [None]:
df.describe().show()

In [None]:
df.summary().show()

In [None]:
from pyspark.sql.functions import col

# Do a grouping and counting operation to find duplicate values in the "TEXT" column
count_df = df.groupBy("TEXT").count()

# Filter rows containing duplicate values
duplicate_values = count_df.filter(col("count") > 1)

# If there are duplicate values, show them
if duplicate_values.count() > 0:
    print("Duplicate values:")
    duplicate_values.show(truncate=False)  # Display column values in full length
else:
    print("No duplicate values found.")

In [None]:
# How many rows does the dataframe 
row_count = df.count()
# Print row_count
print("DataFrame has {} rows.".format(row_count))

In [None]:
df.printSchema()

# ====================
# TEXT PRE-PROCESSING
# ====================

standard pre-processing techniques:

- Lower casing the corpus 
- Removing the punctuation 
- Removing the stopwords 
- Tokenizing the corpus 
- Stemming and Lemmatization
- Word embeddings using CountVectorizer and TF-IDF  

In [None]:
text_df = df.select("TEXT")
text_df.show(3, truncate=False)

In [None]:
from pyspark.sql.functions import col, udf, lower, regexp_replace
from pyspark.sql.types import ArrayType, StringType, FloatType

import string
import nltk
import re
import contractions
import torch

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.stem import SnowballStemmer

from textblob import TextBlob

#### TEXT CLEANING

In [None]:
import re

# Cleaning Function
def clean_text(text):
    text = re.sub(r"http\S+|www\S+|https\S+", '', text, flags=re.MULTILINE)
    text = re.sub(r'\@\w+|\#','', text)
    text = text.lower()
    return text

# Save as UDF
clean_text_udf = udf(clean_text, StringType())

# Create new column
text_df = text_df.withColumn("TEXT_C1", clean_text_udf(col("text")))
text_df.show(3, truncate=False)

#### EXPAND CONTRACTIONS

In [None]:
import contractions

# Function to expand contractions
def expand_contractions(text):
    expanded_text = contractions.fix(text)
    return expanded_text

# Save as UDF
expand_contractions_udf = udf(expand_contractions, StringType())

# Use the UDF and Create new column
text_df = text_df.withColumn("TEXT_C2", expand_contractions_udf(col("TEXT_C1")))

# Show the dataframe
text_df.show(3, truncate=False)

#### CLEAN THE PUNCTUATION CHARACTERS

In [None]:
# Define punctuation characters
punctuation_characters = r'[!\"#\$%&\'\(\)\*\+,\-./:;<=>\?@[\\]\^_`{|}~]'

# Remove punctuation characters
text_df = text_df.withColumn("TEXT_C3", regexp_replace(col("TEXT_C2"), punctuation_characters, ""))

# Show the dataframe
text_df.show(3, truncate=False)

#### CLEAN THE STOPWORDS

In [None]:
# Download "stopwords" from nltk dictionary
nltk.download("stopwords")

# Configure the language as english
stop_words = set(stopwords.words("english"))

# Define the udf 
remove_stopwords_udf = udf(lambda text: " ".join([word for word in text.split() if word not in stop_words]), StringType())

# Use the UDF in order to remove stopwords and Create new column
text_df = text_df.withColumn("TEXT_C4", remove_stopwords_udf(col("TEXT_C3")))

# Show the dataframe
text_df.show(3, truncate=False)

#### IMPLEMENT LEMMATIZATION

In [None]:
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

# Download 'punkt','averaged_perceptron_tagger','wordnet' from nltk dictionary
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')

# Defining the function that implements the Lemmatization operation as a UDF
def lemmatize_text(text):
    lemmatizer = WordNetLemmatizer()
    lemmatized_sentence = []
    words = word_tokenize(text)
    for w in words:
        lemma = lemmatizer.lemmatize(w)
        lemmatized_sentence.append(lemma)
    lemmatized_text = " ".join(lemmatized_sentence)
    return lemmatized_text

# Define the UDF
lemmatize_text_udf = udf(lemmatize_text, StringType())

# Use the UDF and Create new column
text_df = text_df.withColumn("TEXT_C5", lemmatize_text_udf(text_df["TEXT_C4"]))

# Show the dataframe
text_df.show(3, truncate=False)

#### IMPLEMENT STEMMING

In [None]:
from nltk.stem import SnowballStemmer

# Defining the function that finds word roots as UDF 
def stem_text(text):
    snow = SnowballStemmer('english')
    stemmed_sentence = []
    words = word_tokenize(text)
    for w in words:
        stemmed_sentence.append(snow.stem(w))
    stemmed_text = " ".join(stemmed_sentence)
    return stemmed_text

# Define the UDF
stem_text_udf = udf(stem_text, StringType())

# Use the UDF and Create new column
text_df = text_df.withColumn("TEXT_C6", stem_text_udf(text_df["TEXT_C5"]))

# Show the dataframe
text_df.show(3, truncate=False)

#### IMPLEMENT TOKENIZATION

In [None]:
from pyspark.sql.types import ArrayType, StringType
from nltk.tokenize import word_tokenize

# Function that splits text into tokens using NLTK
def tokenize_text(text):
    words = word_tokenize(text)
    return words

# Define the UDF
tokenize_text_udf = udf(tokenize_text, ArrayType(StringType()))

# Use the UDF and Create new column
text_df = text_df.withColumn("tokens", tokenize_text_udf(text_df["TEXT_C6"]))

# Show the selected dataframe
text_df.select("TEXT_C6", "tokens").show(3, truncate=False)

#### IMPLEMENT TOKENIZATION AND SPLIT WORDS TO ROWS

In [None]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import explode, col

# Create The Tokenizer
tokenizer = Tokenizer(inputCol="TEXT_C6", outputCol="words")
tokenizer_df = tokenizer.transform(text_df)

# Separate words into individual lines
tokenizer_df = tokenizer_df.select(explode(col("words")).alias("word"))

# Show the dataframe
tokenizer_df.show(10, truncate=False)

#### COUNT THE TOKENIZER WORDS

In [None]:
# Count the tokenizer words
tokenizer_df_count = tokenizer_df.groupBy("word").count().orderBy("count", ascending=False)

# Show the dataframe
tokenizer_df_count.show(50)

#### SENTIMENT LABEL ( POSITIVE - NEGATIVE - NEUTRAL )

In [None]:
from textblob import TextBlob

# Sentimental Analysis Function
def get_sentiment(text):
    analysis = TextBlob(text)
    return analysis.sentiment.polarity

# Sentiment Label Function
def label_sentiment(score):
    if score > 0:
        return 'positive'
    elif score < 0:
        return 'negative'
    else:
        return 'neutral'
===================================================
from pyspark.sql.types import FloatType, StringType

# Define the UDF with Functions
sentiment_udf = udf(get_sentiment, FloatType())
label_udf = udf(label_sentiment, StringType())

# Use the UDF and Create new columns
text_df = text_df.withColumn('sentiment_score', sentiment_udf(text_df['TEXT_C6']))
text_df = text_df.withColumn('sentiment_label', label_udf(text_df['sentiment_score']))

# Count and Show the 'sentiment_label' column
text_df.groupBy('sentiment_label').count().show()`

In [None]:
# Count the whole text 
total_count = text_df.count()

# Count the positive, neutral and negative sentiment label
positive_count = text_df.filter(text_df.sentiment_label == "positive").count()
neutral_count = text_df.filter(text_df.sentiment_label == "neutral").count()
negative_count = text_df.filter(text_df.sentiment_label == "negative").count()

# Calculate the positive, neutral and negative rate
positive_rate = (positive_count / total_count) * 100
neutral_rate = (neutral_count / total_count) * 100
negative_rate = (negative_count / total_count) * 100

# Print the positive, neutral and negative rate
print(f"Positive rate: {positive_rate}%")
print(f"Neutral rate: {neutral_rate}%")
print(f"Negative rate: {negative_rate}%")

In [None]:
from pyspark.sql.functions import when, col

# Duygu etiketlerini yeni bir sütunda saklama
text_df = text_df.withColumn("sentiment_label_column", 
     when(text_df["sentiment_label"] == "positive", "positive")
    .when(text_df["sentiment_label"] == "neutral", "neutral")
    .when(text_df["sentiment_label"] == "negative", "negative")
    .otherwise("unknown")
)

# Show the selected columns
text_df.select("TEXT_C6", "sentiment_label_column").show(truncate=False)

In [None]:
# Function that measures polarity
def get_polarity(text):
    analysis = TextBlob(text)
    return analysis.sentiment.polarity 

# Define UDF
polarity_udf = udf(get_polarity, FloatType())

# Calculate polarity score for each text in column 'TEXT_C6' and add to a new column
text_df = text_df.withColumn("polarity_score", polarity_udf(text_df['TEXT_C6']))

# Show the new column
text_df.select("TEXT_C6", "polarity_score").show(truncate=False)

# ====================
# TIME SERIES ANALYSIS
# ====================

In [3]:
# Select the DATE column
date_df = df.select("DATE")

# Show the DATE column
date_df.show(truncate=False)

+----------------------------+
|DATE                        |
+----------------------------+
|Mon Apr 06 22:19:45 PDT 2009|
|Mon Apr 06 22:19:49 PDT 2009|
|Mon Apr 06 22:19:53 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
|Mon Apr 06 22:19:57 PDT 2009|
|Mon Apr 06 22:20:00 PDT 2009|
|Mon Apr 06 22:20:03 PDT 2009|
|Mon Apr 06 22:20:03 PDT 2009|
|Mon Apr 06 22:20:05 PDT 2009|
|Mon Apr 06 22:20:09 PDT 2009|
|Mon Apr 06 22:20:16 PDT 2009|
|Mon Apr 06 22:20:17 PDT 2009|
|Mon Apr 06 22:20:19 PDT 2009|
|Mon Apr 06 22:20:19 PDT 2009|
|Mon Apr 06 22:20:20 PDT 2009|
|Mon Apr 06 22:20:20 PDT 2009|
|Mon Apr 06 22:20:22 PDT 2009|
|Mon Apr 06 22:20:25 PDT 2009|
|Mon Apr 06 22:20:31 PDT 2009|
|Mon Apr 06 22:20:34 PDT 2009|
+----------------------------+
only showing top 20 rows



In [None]:
# Show the schema of the dataframe
date_df.printSchema()

In [5]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "Legacy")

from pyspark.sql.functions import to_timestamp

timestamp = to_timestamp("Mon Apr 06 22:19:45 PDT 2009", "EEE MMM dd HH:mm:ss Z yyyy")

In [6]:
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

date_df = date_df.withColumn("TIMESTAMP", F.to_timestamp(date_df["DATE"], "EEE MMM dd HH:mm:ss Z yyyy"))

date_df = date_df.withColumn("TIMESTAMP", F.col("TIMESTAMP").cast(TimestampType()))

date_df.show(5, truncate=False)

+----------------------------+-------------------+
|DATE                        |TIMESTAMP          |
+----------------------------+-------------------+
|Mon Apr 06 22:19:45 PDT 2009|2009-04-07 06:19:45|
|Mon Apr 06 22:19:49 PDT 2009|2009-04-07 06:19:49|
|Mon Apr 06 22:19:53 PDT 2009|2009-04-07 06:19:53|
|Mon Apr 06 22:19:57 PDT 2009|2009-04-07 06:19:57|
|Mon Apr 06 22:19:57 PDT 2009|2009-04-07 06:19:57|
+----------------------------+-------------------+
only showing top 5 rows



In [None]:
# Sort ascending via TIMESTAMP column
date_df = date_df.orderBy("TIMESTAMP", ascending=True)

# Show sorted DataFrame
date_df.show(truncate=False)

In [7]:
# Select THE NEWEST DATE
newest_date = date_df.select("TIMESTAMP").first()[0]

# Print THE NEWEST DATE
print("THE NEWEST DATE:", newest_date)

THE OLDEST DATE: 2009-04-07 06:19:45


In [8]:
# Select THE OLDEST DATE
oldest_date = date_df.select("TIMESTAMP").collect()[-1][0]

# Print THE OLDEST DATE
print("THE OLDEST DATE:", oldest_date)

                                                                                

THE OLDEST DATE: 2009-06-16 16:40:50


In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import date_format

# 1-Week Analysis
weekly_data = date_df.groupBy(date_format("TIMESTAMP", "yyyy-ww")).count()
weekly_data = weekly_data.withColumnRenamed("date_format(TIMESTAMP, yyyy-ww)", "week")
weekly_data = weekly_data.orderBy("week", ascending=True)
weekly_data.show()

# Get the result and visualize it
weekly_data_pd = weekly_data.toPandas()

# Plot for 1-Week Time Series
plt.figure(figsize=(12, 6))
plt.bar(weekly_data_pd["week"], weekly_data_pd["count"], width=0.5)
plt.title("Weekly Tweet Count")
plt.xlabel("Week")
plt.ylabel("Tweet Count")
plt.xticks(rotation=45)
plt.show()

In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import date_format

# 1-Month Analysis
monthly_data = date_df.groupBy(date_format("TIMESTAMP", "yyyy-MM")).count()
monthly_data = monthly_data.withColumnRenamed("date_format(TIMESTAMP, yyyy-MM)", "month")
monthly_data.show()

# Get the results and visualize it
monthly_data_pd = monthly_data.toPandas()

# Plot for 1-Month Time Series
plt.figure(figsize=(12, 6))
plt.bar(monthly_data_pd["month"], monthly_data_pd["count"], width=0.5)
plt.title("Monthly Tweet Count")
plt.xlabel("Month")
plt.ylabel("Tweet Count")
plt.xticks(rotation=45)
plt.show()

In [None]:
import matplotlib.pyplot as plt
from pyspark.sql.functions import date_format

# 3-Month Analysis
quarterly_data = date_df.groupBy(date_format("TIMESTAMP", "yyyy-MM")).count()
quarterly_data = quarterly_data.withColumnRenamed("date_format(TIMESTAMP, yyyy-MM)", "quarter")
quarterly_data.show()

# Get the results and visualize it
quarterly_data_pd = quarterly_data.toPandas()

# Plot for 3-Month Time Series
plt.figure(figsize=(12, 6))
plt.bar(quarterly_data_pd["quarter"], quarterly_data_pd["count"], width=0.5)
plt.title("3-Month Tweet Count")
plt.xlabel("Quarter")
plt.ylabel("Tweet Count")
plt.xticks(rotation=45)
plt.show()

In [10]:
# Boş değerleri kontrol etmek için isNull() kullanın
nan_value = date_df.filter(F.col("TIMESTAMP").isNull())

# Hangi satırlarda boş değerler olduğunu gösterin
print("Boş değerlerin olduğu satırlar:")
nan_value.show()

# Toplam boş değer sayısını alın
nan_value_count = nan_value.count()
print("Toplam boş değer sayısı:", nan_value_count)

Boş değerlerin olduğu satırlar:


                                                                                

+----+---------+
|DATE|TIMESTAMP|
+----+---------+
+----+---------+





Toplam boş değer sayısı: 0


                                                                                