In [0]:
#Import libraries
from pyspark.sql.functions import mean
from pyspark.sql.functions import desc
from pyspark.sql.functions import col, sum, when, split, regexp_replace, explode, length, size

In [0]:
# Load data into the big data cluster as a dataframe
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/bernardo.machado4@protonmail.com/cryptonews.csv")
df.printSchema() 

root
 |-- date: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- source: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [0]:
#Print all the database
df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph|   bitcoin|Ether has broken ...|ETH hits 7-month ...|https://cointeleg...|
|2023-04-05 01:09:52|{'class': 'positi...|CoinTelegraph|   bitcoin|With a new quarte...|Marathon Digital ...|https://cointeleg...|
|2023-04-04 23:49:00|{'class': 'positi...| CryptoPotato|   altcoin|The stablecoin B

In [0]:
#Find source with most entries
df.groupBy('source').count().orderBy('count', ascending = 0).show()

+-------------+-----+
|       source|count|
+-------------+-----+
|CoinTelegraph| 8675|
|   CryptoNews| 5261|
| CryptoPotato| 4607|
+-------------+-----+



In [0]:
#Find most comun subject
df.groupBy('subject').count().orderBy('count', ascending = 0).show()

+----------+-----+
|   subject|count|
+----------+-----+
|   bitcoin| 6439|
|   altcoin| 5264|
|blockchain| 3445|
|  ethereum| 1524|
|       nft| 1164|
|      defi|  707|
+----------+-----+



In [0]:
"""We see we have more data the the sentiment evaluation (polarity and subjectivity) but we decided to ingro them and just use the sentiment in our ML apporoach"""
df.select("sentiment").show(truncate=False)

+--------------------------------------------------------------+
|sentiment                                                     |
+--------------------------------------------------------------+
|{'class': 'negative', 'polarity': -0.03, 'subjectivity': 0.2} |
|{'class': 'neutral', 'polarity': 0.0, 'subjectivity': 0.0}    |
|{'class': 'negative', 'polarity': -0.04, 'subjectivity': 0.31}|
|{'class': 'positive', 'polarity': 0.07, 'subjectivity': 0.23} |
|{'class': 'positive', 'polarity': 0.4, 'subjectivity': 0.4}   |
|{'class': 'neutral', 'polarity': 0.0, 'subjectivity': 0.0}    |
|{'class': 'positive', 'polarity': 0.27, 'subjectivity': 0.47} |
|{'class': 'neutral', 'polarity': 0.0, 'subjectivity': 0.0}    |
|{'class': 'negative', 'polarity': -0.1, 'subjectivity': 0.4}  |
|{'class': 'neutral', 'polarity': 0.0, 'subjectivity': 0.0}    |
|{'class': 'positive', 'polarity': 0.25, 'subjectivity': 0.25} |
|{'class': 'neutral', 'polarity': 0.0, 'subjectivity': 0.0}    |
|{'class': 'positive', 'p

In [0]:
#Create a new column with only the sentiment information
df = df.withColumn("OnlySentiment", split(df.sentiment, " ").getItem(1))
df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|  'negative',|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|   'neutral',|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph|   bitcoin|Ether has broken ...|ETH hits 7-month ...|https://cointeleg...|  'negative',|
|2023-04-05 01:09:52|{'class': 'positi...|CoinTelegraph|   bitcoin|With a new quarte...|Marathon Digital ...|https://cointeleg...|

In [0]:
#Clean the strings in cloumn OnlySentiment
df = df.withColumn("OnlySentiment", split(df.OnlySentiment, "'").getItem(1))
df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|     negative|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|      neutral|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph|   bitcoin|Ether has broken ...|ETH hits 7-month ...|https://cointeleg...|     negative|
|2023-04-05 01:09:52|{'class': 'positi...|CoinTelegraph|   bitcoin|With a new quarte...|Marathon Digital ...|https://cointeleg...|

In [0]:
#Analise of the general sentiment
df.groupBy('OnlySentiment').count().orderBy('count', ascending=0).show()

+-------------+-----+
|OnlySentiment|count|
+-------------+-----+
|     positive| 8296|
|      neutral| 6417|
|     negative| 3830|
+-------------+-----+



In [0]:
#Create a new column with integers to use in a machine learning model
df = df.withColumn("IntSentiment", when(df.OnlySentiment == "negative", 0)
                                        .when(df.OnlySentiment == "positive", 1)
                                        .otherwise(3))

df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|IntSentiment|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|     negative|           0|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|      neutral|           3|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph|   bitcoin|Ether has broken ...|ETH hits 7-month ...|https://cointeleg...|     negative|           0|
|2023-04-05 01:09:52|{'class': 'positi...|CoinTelegr

In [0]:
df.printSchema() #confirm data in IntSentiment is an integer

root
 |-- date: string (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- source: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- OnlySentiment: string (nullable = true)
 |-- IntSentiment: integer (nullable = false)



In [0]:
#Find the most comun sentiment in March 2023
df.where(col("date").between("2023-03-01", "2023-03-31")).show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|IntSentiment|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+
|2023-03-30 23:51:00|{'class': 'positi...| CryptoPotato|   altcoin|The proposal is e...|New Binance Smart...|https://cryptopot...|     positive|           1|
|2023-03-30 23:26:00|{'class': 'negati...|   CryptoNews|   altcoin|Bitcoin, the worl...|Bitcoin Price Pre...|https://cryptonew...|     negative|           0|
|2023-03-30 23:03:00|{'class': 'positi...|   CryptoNews|   altcoin|Bitcoin defies US...|Best Crypto to Bu...|https://cryptonew...|     positive|           1|
|2023-03-30 22:03:00|{'class': 'neutra...| CryptoPot

In [0]:
#Create a new column with only the sentiment information
df = df.withColumn("wordsDict", split(df.text, " "))
df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+--------------------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|IntSentiment|           wordsDict|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+--------------------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|     negative|           0|[The, compensatio...|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|      neutral|           3|[On-chain, analyt...|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph|   bitcoin|Ether has broken ...|ETH hits 7-month 

In [0]:
df = df.withColumn("wordCount", size(df["wordsDict"]))
df.show()

+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+--------------------+---------+
|               date|           sentiment|       source|   subject|                text|               title|                 url|OnlySentiment|IntSentiment|           wordsDict|wordCount|
+-------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+-------------+------------+--------------------+---------+
|2023-04-05 06:52:09|{'class': 'negati...|CoinTelegraph|      defi|The compensation ...|Allbridge to firs...|https://cointeleg...|     negative|           0|[The, compensatio...|       22|
|2023-04-05 06:19:00|{'class': 'neutra...| CryptoPotato|   bitcoin|On-chain analytic...|Bitcoin Hodl Patt...|https://cryptopot...|      neutral|           3|[On-chain, analyt...|       17|
|2023-04-05 05:09:44|{'class': 'negati...|CoinTelegraph

In [0]:
# Convert DataFrame to RDD
rdd = df.rdd

# Extract the desired column
text_rdd = rdd.map(lambda row: row.text)

# Split lines into words
words_rdd = text_rdd.flatMap(lambda line: line.split(" "))

# Count the occurrences of each word
word_counts = words_rdd.countByValue()

# print the word count
for word, count in word_counts.items():
    print(f"{word}: {count}")

The: 4631
compensation: 8
process: 50
is: 4732
expected: 118
to: 12112
start: 124
next: 303
week,: 104
starting: 59
with: 2727
users: 491
who: 281
had: 261
funds: 305
on: 3185
the: 19868
bridge: 62
“shortly: 1
before: 262
shutdown.”: 1
On-chain: 30
analytics: 37
revealed: 126
a: 8408
sentiment: 75
shift: 29
for: 3802
BTC: 1006
holders,: 5
suggesting: 38
cycle: 15
inflection: 5
point: 74
could: 1107
be: 1834
occurring.: 1
Ether: 84
has: 3597
broken: 19
$1,900: 1
resistance: 208
level: 199
first: 413
time: 326
in: 7030
months: 143
and: 9011
currently: 165
sitting: 8
above: 530
$1,911.: 1
With: 199
new: 1281
quarterly: 16
production: 18
record,: 2
Marathon: 13
Digital: 137
now: 498
track: 35
meet: 26
its: 2654
mid-year: 1
target: 48
of: 9703
23: 4
exahashes.: 1
stablecoin: 202
BTG: 2
Dol: 1
will: 1864
supposedly: 60
become: 263
23rd: 1
token: 389
available: 76
crypto: 3521
platform: 336
Mynt.: 1
As: 257
concerns: 78
about: 537
regulatory: 188
crackdown: 35
US: 470
heat: 15
up,: 13
flows: 

In [0]:
# Sort the word count in descending order
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)

# Print the word count in descending order
for word, count in sorted_word_counts:
    print(f"{word}: {count}")
    

the: 19868
to: 12112
of: 9703
and: 9011
a: 8408
in: 7030
is: 4732
The: 4631
for: 3802
has: 3597
crypto: 3521
that: 3487
on: 3185
Bitcoin: 2776
with: 2727
as: 2720
its: 2654
are: 2351
price: 2237
by: 2046
will: 1864
be: 1834
from: 1667
have: 1563
it: 1456
an: 1326
market: 1309
at: 1284
new: 1281
but: 1263
their: 1177
could: 1107
USD: 1100
Ethereum: 1049
over: 1014
BTC: 1006
more: 1001
this: 922
after: 897
been: 828
cryptocurrency: 820
NFT: 819
digital: 803
said: 759
trading: 752
A: 748
not: 744
million: 737
blockchain: 718
was: 709
: 680
up: 663
than: 629
can: 622
into: 591
investors: 588
exchange: 575
some: 559
out: 558
about: 537
above: 530
which: 523
while: 508
FTX: 500
his: 499
now: 498
users: 491
one: 491
past: 484
CEO: 474
may: 472
US: 470
last: 470
they: 454
or: 449
still: 434
DeFi: 426
your: 422
assets: 418
first: 413
support: 409
financial: 404
other: 402
recent: 397
below: 396
most: 394
Binance: 393
all: 391
token: 389
major: 378
down: 377
how: 376
asset: 375
NFTs: 370
latest:

In [0]:
#Word count with RDD

# Convert DataFrame to RDD and calculate sum of words for each row
word_count_rdd = df.select("text").rdd.map(lambda row: (row[0], len(row[0].split(" "))))
text_list = list()
count_list = list()
# Print the sum of words for each row
for row in word_count_rdd.collect():
    print(f"Text: {row[0]} <---> Word Count: {row[1]}")
    text_list.append(row[0])
    count_list.append(row[1])

Text: The compensation process is expected to start next week, starting with users who had funds on the bridge “shortly before the shutdown.” <---> Word Count: 22
Text: On-chain analytics revealed a sentiment shift for BTC holders, suggesting a cycle inflection point could be occurring. <---> Word Count: 17
Text: Ether has broken the $1,900 resistance level for the first time in months and is currently sitting above $1,911. <---> Word Count: 19
Text: With a new quarterly production record, Marathon Digital is now on track to meet its mid-year target of 23 exahashes. <---> Word Count: 20
Text: The stablecoin BTG Dol will supposedly become the 23rd token available on the crypto platform Mynt. <---> Word Count: 16
Text: As concerns about a regulatory crackdown in the US heat up, flows could head disproportionately towards the one cryptocurrency that seems to be in the regulatory all-clear - Bitcoin. <---> Word Count: 29
Text: Amid concerns of a potential recession due to declining job ope

In [0]:
#Count all word of text column using an RDD
word_count_rdd.reduce(lambda x, y : ("Total", x[1]+y[1]))

Out[209]: ('Total', 404280)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-2782059379121781>:1[0m
[0;32m----> 1[0m [43mwords_rdd[49m[38;5;241;43m.[39;49m[43mfilter[49m[43m([49m[38;5;28;43;01mlambda[39;49;00m[43m [49m[43mx[49m[43m:[49m[43m [49m[43mx[49m[43m[[49m[38;5;241;43m1[39;49m[43m][49m[43m [49m[38;5;241;43m>[39;49m[43m [49m[38;5;241;43m1000[39;49m[43m)[49m[38;5;241;43m.[39;49m[43mcollect[49m[43m([49m[43m)[49m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m [38;5;28;01mtry[39;00m:
[0;32m---> 48[0m     res [38;5;241m=[39m [43mfunc[49m[43m([49m[38;5;241;43m*[39;49m[43margs[49m[43m,[49m[43m [49m[38;5;241;43