In [19]:
pip install pyspark



In [20]:
from pyspark.sql import SparkSession
import os

In [21]:
spark = SparkSession.builder \
    .appName("Election Prediction") \
    .getOrCreate()

In [22]:
sc = spark.sparkContext

In [23]:
sc

Reading from podscribe through all the nested folders

In [24]:
from pyspark.sql.functions import input_file_name

# df = spark.read.option("recursiveFileLookup", "true").text("/content/drive/Shareddrives/DATA228/Data/podscribe_app")

df = spark.read.option("recursiveFileLookup", "true").text("/content/drive/Shareddrives/DATA228/Data/podscribe_app") \
        .withColumn("file_name", input_file_name())

# # Extract the file names from the DataFrame
# file_names = df.select(input_file_name().alias("file_path")).distinct()

# # Extract only the file names from the file paths
# file_names = file_names.rdd.map(lambda row: os.path.basename(row["file_path"])).collect()

# # Print the list of file names
# for file_name in file_names:
#     print(file_name)

In [25]:
file_names = df.select(input_file_name().alias("file_path")).distinct()

# Extract only the file names from the file paths
file_names_rdd = file_names.rdd.map(lambda row: os.path.basename(row["file_path"]))

# Count the number of files
file_count = file_names_rdd.count()

# Print the count
print("Number of files read:", file_count)

Number of files read: 331


In [26]:
from pyspark.sql.functions import *

df = df.groupBy("file_name").agg(
    collect_list("value").alias("value")  # Collect all contents into a list
)


In [27]:
df = df.withColumn(
    "podcast_name_cleaned",
    regexp_replace(
        regexp_replace(
            regexp_extract(df["file_name"], r"/([^/]+?)(?:/[^/]+?\.[a-z]+?$|$)", 1),
            "%20", " "
        ),
        "[^a-zA-Z0-9\\s]", ""
    )
)

In [28]:
df = df.select("podcast_name_cleaned","value")


In [29]:
df = df.withColumn("cleaned_value", concat_ws(" ", df["value"]))

# Lowercase the text
df = df.withColumn("cleaned_value", lower(df["cleaned_value"]))

In [30]:
df = df.withColumn("cleaned_value", regexp_replace(df["cleaned_value"], r"\[\d{2}:\d{2}:\d{2}\]|\d{2}:\d{2}", ""))


In [31]:
df = df.withColumn("cleaned_value", regexp_replace(df["cleaned_value"], r"\b\d+\b", ""))

In [32]:
df = df.withColumn("cleaned_value", regexp_replace(df["cleaned_value"], r"[^a-zA-Z0-9\s]+", ""))

In [33]:
df = df.withColumn("cleaned_value", trim(df["cleaned_value"]))

In [34]:
from pyspark.sql import functions as F


# Apply the regex pattern to the 'transcript' column
df = df.withColumn("cleaned_transcript", F.regexp_replace("cleaned_value", r'\b\d+m\s*\d+s|\b\d+s', ""))


In [35]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize the cleaned transcripts column
tokenizer = Tokenizer(inputCol="cleaned_transcript", outputCol="tokens")
tokenized_df = tokenizer.transform(df)

# Remove stopwords from the tokenized text
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
filtered_df = remover.transform(tokenized_df)

# Show the resulting DataFrame with the original data plus tokenized and filtered text
df = filtered_df.select("podcast_name_cleaned", "cleaned_transcript", "filtered_tokens")
df.show()

+--------------------+--------------------+--------------------+
|podcast_name_cleaned|  cleaned_transcript|     filtered_tokens|
+--------------------+--------------------+--------------------+
|Bill OReillys No ...|samsung  tonight ...|[samsung, , tonig...|
|Bill OReillys No ...| it is ryan here ...|[, ryan, question...|
|Bill OReillys No ...|samsung  tonight ...|[samsung, , tonig...|
|       Candace Owens| happy wednesday ...|[, happy, wednesd...|
|       Candace Owens| all right guys h...|[, right, guys, h...|
|       Candace Owens| alright everybod...|[, alright, every...|
|       Candace Owens| alright right gu...|[, alright, right...|
|Common Sense with...| common sense lin...|[, common, sense,...|
|Common Sense with...| hes dan carlin a...|[, hes, dan, carl...|
|Common Sense with...| hes dan carlin a...|[, hes, dan, carl...|
|FiveThirtyEight P...| also im gonna ri...|[, also, im, gonn...|
|FiveThirtyEight P...| the hunt for a n...|[, hunt, new, new...|
|         Steve Deace| wa

In [18]:
from pyspark.sql.functions import expr

# Find the index of the first occurrence of "biden" or "trump" in the filtered_tokens column
df = df.withColumn("biden_index", expr("CASE WHEN array_contains(filtered_tokens, 'biden') THEN array_position(filtered_tokens, 'biden') ELSE -1 END"))
df = df.withColumn("trump_index", expr("CASE WHEN array_contains(filtered_tokens, 'trump') THEN array_position(filtered_tokens, 'trump') ELSE -1 END"))

# Slice the filtered_tokens column based on the index of "biden" or "trump"
df = df.withColumn("biden_context", expr("slice(filtered_tokens, greatest(1, biden_index - 100), least(size(filtered_tokens), biden_index + 100))"))
df = df.withColumn("trump_context", expr("slice(filtered_tokens, greatest(1, trump_index - 100), least(size(filtered_tokens), trump_index + 100))"))

# Show the DataFrame with the new columns
df.show(truncate=False)


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [36]:
# Count the number of rows
num_rows = df.count()

# Get the list of column names
columns = df.columns

# Get the number of columns
num_cols = len(columns)

# Print the shape of the DataFrame
print("Shape of the DataFrame: {} rows, {} columns".format(num_rows, num_cols))


Shape of the DataFrame: 331 rows, 3 columns


In [51]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define a function to extract context tokens
def extract_context(tokens, keywords):
    contexts = []
    for i, token in enumerate(tokens):
        if any(keyword in token for keyword in keywords):
            start_index = max(0, i - 100)
            end_index = min(len(tokens), i + 101)
            context = tokens[start_index:end_index]
            contexts.append(context)
    return contexts

# Define a UDF to apply the extract_context function to each row of the DataFrame
extract_context_udf = udf(lambda tokens: extract_context(tokens, ["biden", "trump"]), ArrayType(ArrayType(StringType())))

# Apply the UDF to the filtered_tokens column and create a new column context_tokens
df = df.withColumn("context_tokens", extract_context_udf(df["filtered_tokens"]))

# Show the DataFrame with the new column
df.show(truncate=False)


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-51-2fb11a668ad1>", line 16, in <lambda>
  File "<ipython-input-51-2fb11a668ad1>", line 9, in extract_context
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 174, in wrapped
    return f(*args, **kwargs)
TypeError: max() takes 1 positional argument but 2 were given


In [53]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define a function to extract context tokens
def extract_context(tokens, keywords):
    contexts = []
    for i in range(len(tokens)):
        token = tokens[i]
        if any(keyword in token for keyword in keywords):
            start_index = max(0, i - 100)
            end_index = min(len(tokens), i + 101)
            context = tokens[start_index:end_index]
            contexts.append(context)
    return contexts

# Define a UDF to apply the extract_context function to each row of the DataFrame
extract_context_udf = udf(lambda tokens: extract_context(tokens, ["biden", "trump"]), ArrayType(ArrayType(StringType())))

# Apply the UDF to the filtered_tokens column and create a new column context_tokens
df = df.withColumn("context_tokens", extract_context_udf(df["filtered_tokens"]))

# Show the DataFrame with the new column
df.show(truncate=False)


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-53-af22b2300dfe>", line 17, in <lambda>
  File "<ipython-input-53-af22b2300dfe>", line 10, in extract_context
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 174, in wrapped
    return f(*args, **kwargs)
TypeError: max() takes 1 positional argument but 2 were given


In [56]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType, IntegerType

# Define a UDF to find the index of the first occurrence of a keyword in an array column
find_index_udf = udf(lambda tokens, keyword: tokens.index(keyword) if keyword in tokens else -1, IntegerType())

# Define a UDF to slice the array column based on the index of the keyword
slice_array_udf = udf(lambda tokens, index: tokens[max(0, index - 100): min(len(tokens), index + 101)], ArrayType(StringType()))

# Find the index of the first occurrence of "biden" or "trump" in the filtered_tokens column
df = df.withColumn("biden_index", find_index_udf(col("filtered_tokens"), "biden"))
df = df.withColumn("trump_index", find_index_udf(col("filtered_tokens"), "trump"))

# Slice the filtered_tokens column based on the index of "biden" or "trump"
df = df.withColumn("biden_context", slice_array_udf(col("filtered_tokens"), col("biden_index")))
df = df.withColumn("trump_context", slice_array_udf(col("filtered_tokens"), col("trump_index")))

# Show the DataFrame with the new columns
df.show(truncate=False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `biden` cannot be resolved. Did you mean one of the following? [`filtered_tokens`, `context_tokens`, `cleaned_transcript`, `podcast_name_cleaned`].;
'Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, context_tokens#1282, <lambda>(filtered_tokens#752, 'biden)#1358 AS biden_index#1359]
+- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#1281 AS context_tokens#1282]
   +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#1206 AS context_tokens#1207]
      +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#1131 AS context_tokens#1132]
         +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#1056 AS context_tokens#1057]
            +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#981 AS context_tokens#982]
               +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#906 AS context_tokens#907]
                  +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752, <lambda>(filtered_tokens#752)#831 AS context_tokens#832]
                     +- Project [podcast_name_cleaned#702, cleaned_transcript#732, filtered_tokens#752]
                        +- Project [podcast_name_cleaned#702, value#699, cleaned_value#728, cleaned_transcript#732, tokens#740, UDF(tokens#740) AS filtered_tokens#752]
                           +- Project [podcast_name_cleaned#702, value#699, cleaned_value#728, cleaned_transcript#732, UDF(cleaned_transcript#732) AS tokens#740]
                              +- Project [podcast_name_cleaned#702, value#699, cleaned_value#728, regexp_replace(cleaned_value#728, \b\d+m\s*\d+s|\b\d+s, , 1) AS cleaned_transcript#732]
                                 +- Project [podcast_name_cleaned#702, value#699, trim(cleaned_value#724, None) AS cleaned_value#728]
                                    +- Project [podcast_name_cleaned#702, value#699, regexp_replace(cleaned_value#720, [^a-zA-Z0-9\s]+, , 1) AS cleaned_value#724]
                                       +- Project [podcast_name_cleaned#702, value#699, regexp_replace(cleaned_value#716, \b\d+\b, , 1) AS cleaned_value#720]
                                          +- Project [podcast_name_cleaned#702, value#699, regexp_replace(cleaned_value#712, \[\d{2}:\d{2}:\d{2}\]|\d{2}:\d{2}, , 1) AS cleaned_value#716]
                                             +- Project [podcast_name_cleaned#702, value#699, lower(cleaned_value#708) AS cleaned_value#712]
                                                +- Project [podcast_name_cleaned#702, value#699, concat_ws( , value#699) AS cleaned_value#708]
                                                   +- Project [podcast_name_cleaned#702, value#699]
                                                      +- Project [file_name#690, value#699, regexp_replace(regexp_replace(regexp_extract(file_name#690, /([^/]+?)(?:/[^/]+?\.[a-z]+?$|$), 1), %20,  , 1), [^a-zA-Z0-9\s], , 1) AS podcast_name_cleaned#702]
                                                         +- Aggregate [file_name#690], [file_name#690, collect_list(value#688, 0, 0) AS value#699]
                                                            +- Project [value#688, input_file_name() AS file_name#690]
                                                               +- Relation [value#688] text


In [57]:
df.select("context_tokens").show(truncate=False)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-53-af22b2300dfe>", line 17, in <lambda>
  File "<ipython-input-53-af22b2300dfe>", line 10, in extract_context
  File "/usr/local/lib/python3.10/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 174, in wrapped
    return f(*args, **kwargs)
TypeError: max() takes 1 positional argument but 2 were given


In [37]:
%cd /content/drive/Shareddrives/DATA228

/content/drive/Shareddrives/DATA228


In [38]:
!git clone https://github.com/soniawmeyer/PodPresPred.git

Cloning into 'PodPresPred'...
remote: Enumerating objects: 77, done.[K
remote: Counting objects: 100% (77/77), done.[K
remote: Compressing objects: 100% (67/67), done.[K
remote: Total 77 (delta 26), reused 53 (delta 8), pack-reused 0[K
Receiving objects: 100% (77/77), 557.40 KiB | 8.99 MiB/s, done.
Resolving deltas: 100% (26/26), done.


In [40]:
!cp /content/drive/Shareddrives/DATA228/datacleaningexceptpickingouttokens.ipynb /content/drive/Shareddrives/DATA228/PodPresPred

In [41]:
%cd /content/drive/Shareddrives/DATA228/PodPresPred
!git add .
!git commit -m "everything except the token picking on the text files."
!git push origin main


/content/drive/Shareddrives/DATA228/PodPresPred
Author identity unknown

*** Please tell me who you are.

Run

  git config --global user.email "you@example.com"
  git config --global user.name "Your Name"

to set your account's default identity.
Omit --global to set the identity only in this repository.

fatal: unable to auto-detect email address (got 'root@1a2dc8d9d7c1.(none)')
fatal: could not read Username for 'https://github.com': No such device or address


In [42]:
!git config --global user.email "bhuvanck8@gmail.com"
!git config --global user.name "Your Name"
!git add .
!git commit -m "data cleaning except 100 tokens is done on the text files"
!git push origin main


shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
fatal: Unable to read current working directory: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
fatal: Unable to read current working directory: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
fatal: Unable to read current working directory: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
fatal: Unable to read current working directory: No such file or directory
shell-init: error retrieving current directory: getcwd: cannot access parent directories: No such file or directory
fatal: Unable to read current working directory: No such file or directory
