# Process Stock Price Data (Structured)
Upload the csv file in a folder named tesla using hadoop web ui at [http://localhost:9870](http://localhost:9870/explorer.html#/)

### Install libraries

In [None]:
!pip install transformers[torch]
!pip install pymongo
!pip install mysql-connector-python
!pip install pandas
!pip install numpy
!pip install nltk

Collecting transformers[torch]
  Downloading transformers-4.26.1-py3-none-any.whl (6.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting pyyaml>=5.1
  Downloading PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (661 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m661.8/661.8 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting filelock
  Downloading filelock-3.9.0-py3-none-any.whl (9.7 kB)
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading tokenizers-0.13.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.6/7.6 MB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting tqdm>=4.27
  Downloading tqdm-4.65.0-py3-none-any.whl (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [1]:
from pymongo import MongoClient
from datetime import datetime
import pandas as pd
import torch
from transformers import RobertaTokenizer, RobertaForSequenceClassification
import mysql.connector as msql
from mysql.connector import Error
import pandas as pd
import numpy as np
import re
import string
import nltk
from nltk.corpus import stopwords

  from .autonotebook import tqdm as notebook_tqdm


### Load Data from Hadoop

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [None]:
spark_df = spark.read.option("header",True).csv("hdfs://hadoop-namenode:9000/tesla/TSLA.csv")

                                                                                

In [None]:
spark_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [None]:
spark_df.show()

+----------+--------+--------+--------+--------+---------+--------+
|      Date|    Open|    High|     Low|   Close|Adj Close|  Volume|
+----------+--------+--------+--------+--------+---------+--------+
|2010-06-29|3.800000|5.000000|3.508000|4.778000| 4.778000|93831500|
|2010-06-30|5.158000|6.084000|4.660000|4.766000| 4.766000|85935500|
|2010-07-01|5.000000|5.184000|4.054000|4.392000| 4.392000|41094000|
|2010-07-02|4.600000|4.620000|3.742000|3.840000| 3.840000|25699000|
|2010-07-06|4.000000|4.000000|3.166000|3.222000| 3.222000|34334500|
|2010-07-07|3.280000|3.326000|2.996000|3.160000| 3.160000|34608500|
|2010-07-08|3.228000|3.504000|3.114000|3.492000| 3.492000|38557000|
|2010-07-09|3.516000|3.580000|3.310000|3.480000| 3.480000|20253000|
|2010-07-12|3.590000|3.614000|3.400000|3.410000| 3.410000|11012500|
|2010-07-13|3.478000|3.728000|3.380000|3.628000| 3.628000|13400500|
|2010-07-14|3.588000|4.030000|3.552000|3.968000| 3.968000|20976000|
|2010-07-15|3.988000|4.300000|3.800000|3.978000|

In [None]:
price_df = spark_df.toPandas()
price_df

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume
0,2010-06-29,3.800000,5.000000,3.508000,4.778000,4.778000,93831500
1,2010-06-30,5.158000,6.084000,4.660000,4.766000,4.766000,85935500
2,2010-07-01,5.000000,5.184000,4.054000,4.392000,4.392000,41094000
3,2010-07-02,4.600000,4.620000,3.742000,3.840000,3.840000,25699000
4,2010-07-06,4.000000,4.000000,3.166000,3.222000,3.222000,34334500
...,...,...,...,...,...,...,...
2951,2022-03-18,874.489990,907.849976,867.390015,905.390015,905.390015,33408500
2952,2022-03-21,914.979980,942.849976,907.090027,921.159973,921.159973,27327200
2953,2022-03-22,930.000000,997.859985,921.750000,993.979980,993.979980,35289500
2954,2022-03-23,979.940002,1040.699951,976.400024,999.109985,999.109985,40225400


### Create operational database

In [None]:
try:
    conn = msql.connect(host='mysql-server', user='root', password='Secret1234')
    if conn.is_connected():
        cursor = conn.cursor()
        cursor.execute("DROP DATABASE IF EXISTS tesla_db")
        cursor.execute("CREATE DATABASE tesla_db")
        print("tesla_db database is created")

except Error as e:
    print("Error while connecting to MySQL", e)

### Create price table

In [None]:
try:
    conn = msql.connect(host='mysql-server', database='tesla_db', user='root', password='Secret1234')
    if conn.is_connected():
        cursor = conn.cursor()
        cursor.execute('DROP TABLE IF EXISTS price;')
        print('Creating table...')
        cursor.execute("CREATE TABLE price (date DATE, open FLOAT(12,7), high FLOAT(12,7), low FLOAT(12,7), close FLOAT(12,7), adj_close FLOAT(12,7), volume INT UNSIGNED); ")
        conn.commit()
        cursor.close()
        print("price table is created...")
    conn.close()
except Error as e:
    print("Error while connecting to MySQL", e)

### Insert data to price table

In [None]:
try:
    conn = msql.connect(host='mysql-server', database='tesla_db', user='root', password='Secret1234')
    if conn.is_connected():
        cursor = conn.cursor()
        for i, row in price_df.iterrows():
            sql = "INSERT INTO tesla_db.price VALUES (%s,%s,%s,%s,%s,%s,%s)"
            cursor.execute(sql, tuple(row))
            conn.commit()
        print("{} records added".format(i))
        cursor.close()
    conn.close()
except Error as e:
    print("Error while connecting to MySQL", e)

# Process News Sentiment Data (Unstrctured)

### Load Data from MondoDB

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.\
builder.\
appName("pyspark-notebook3").\
master("spark://spark-master:7077").\
config("spark.executor.memory", "512m").\
config("spark.mongodb.input.uri","mongodb+srv://student:student@cluster0.uscm4nx.mongodb.net/?retryWrites=true&w=majority").\
config("spark.mongodb.output.uri","mongodb+srv://student:student@cluster0.uscm4nx.mongodb.net/?retryWrites=true&w=majority").\
config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").\
getOrCreate()

In [4]:
spark_df2 = spark.read.format("mongo").option('database', 'tesla_news').option('collection', 'articles').load()

                                                                                

In [5]:
spark_df2.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
|                 _id|             authors|      published_date|summary|                text|               title|
+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
|[64008c4c2b736756...|[Matt Pressman, G...| 2019-01-01 00:00:00|       |Originally publis...|This Hypnotic Tes...|
|[64008c4c2b736756...|[Matt Pressman, G...| 2019-01-01 00:00:00|       |Originally publis...|Tesla’s Direct Sa...|
|[64008c4d2b736756...|           [Deleted]|            19-01-01|       |Resident Evil 2 r...|Resident Evil 2 r...|
|[64008c4d2b736756...|   [Dacia J. Ferris]|2019-01-01 16:58:...|       |ByUsing the “Orga...|First living tiss...|
|[64008c4e2b736756...|   [January, Am Est]| 2019-01-01 00:00:00|       |Large Cap Value f...|Is American Funds...|
|[64008c4f2b736756...|      [Kwan-Chen Ma]|2019-01-02 12:58:...|       |For 2018

                                                                                

In [6]:
news_df = spark_df2.toPandas()
news_df

                                                                                

Unnamed: 0,_id,authors,published_date,summary,text,title
0,"(64008c4c2b7367568d5a53f6,)","[Matt Pressman, Guest Contributor, José Pontes...",2019-01-01 00:00:00,,"Originally published on EVANNEX.Let’s face it,...",This Hypnotic Tesla Model X ASMR Video Has 2½ ...
1,"(64008c4c2b7367568d5a53f7,)","[Matt Pressman, Guest Contributor, José Pontes...",2019-01-01 00:00:00,,Originally published on EVANNEX.Buying a car c...,Tesla’s Direct Sales Model An “Inspiration” Fo...
2,"(64008c4d2b7367568d5a53f8,)",[Deleted],19-01-01,,"Resident Evil 2 remake release date PS4, Xbox ...","Resident Evil 2 remake release date PS4, Xbox ..."
3,"(64008c4d2b7367568d5a53f9,)",[Dacia J. Ferris],2019-01-01 16:58:53+00:00,,"ByUsing the “Organaut”, a 3D bioprinter design...",First living tissue 3D printed in space aboard...
4,"(64008c4e2b7367568d5a53fa,)","[January, Am Est]",2019-01-01 00:00:00,,Large Cap Value fund seekers should consider t...,Is American Funds Mutual Fund A (AMRMX) a Stro...
...,...,...,...,...,...,...
3698,"(640410da5d816a4649bc4c5c,)",[],20-03-30,,How to unlock Terraforming - Animal Crossing: ...,How to unlock Terraforming - Animal Crossing: ...
3699,"(640410da5d816a4649bc4c5d,)","[March, Am Edt, Bnk Invest]",2020-03-30 00:00:00,,Looking at the underlying holdings of the ETFs...,How The Parts Add Up: HDV Targets $97
3700,"(640410db5d816a4649bc4c5e,)",[],20-03-30,,"Internet providers are among a unique, and rat...",Vodafone share price history: reaping benefits...
3701,"(640410db5d816a4649bc4c5f,)",[],20-03-30,,The underworld is an unfun place for the most ...,ShackStream: Indie-licious takes on the underw...


### Preprocess Data

In [7]:
def guess_date(string):
    for fmt in ["%Y-%m-%d %X","%Y-%m-%d %X.%f%z", "%Y-%m-%d %X%z", "%Y-%m-%d", "%y-%m-%d"]:
        try:
            return datetime.strptime(string, fmt).date()
        except ValueError:
            continue
    raise ValueError(string)

In [8]:
news_df.drop(['_id', 'authors', 'summary'], axis=1, inplace=True)

In [9]:
news_df.reset_index(drop=True, inplace=True)

In [10]:
news_df

Unnamed: 0,published_date,text,title
0,2019-01-01 00:00:00,"Originally published on EVANNEX.Let’s face it,...",This Hypnotic Tesla Model X ASMR Video Has 2½ ...
1,2019-01-01 00:00:00,Originally published on EVANNEX.Buying a car c...,Tesla’s Direct Sales Model An “Inspiration” Fo...
2,19-01-01,"Resident Evil 2 remake release date PS4, Xbox ...","Resident Evil 2 remake release date PS4, Xbox ..."
3,2019-01-01 16:58:53+00:00,"ByUsing the “Organaut”, a 3D bioprinter design...",First living tissue 3D printed in space aboard...
4,2019-01-01 00:00:00,Large Cap Value fund seekers should consider t...,Is American Funds Mutual Fund A (AMRMX) a Stro...
...,...,...,...
3698,20-03-30,How to unlock Terraforming - Animal Crossing: ...,How to unlock Terraforming - Animal Crossing: ...
3699,2020-03-30 00:00:00,Looking at the underlying holdings of the ETFs...,How The Parts Add Up: HDV Targets $97
3700,20-03-30,"Internet providers are among a unique, and rat...",Vodafone share price history: reaping benefits...
3701,20-03-30,The underworld is an unfun place for the most ...,ShackStream: Indie-licious takes on the underw...


In [11]:
news_df['published_date'] = news_df['published_date'].map(guess_date)

In [12]:
news_df

Unnamed: 0,published_date,text,title
0,2019-01-01,"Originally published on EVANNEX.Let’s face it,...",This Hypnotic Tesla Model X ASMR Video Has 2½ ...
1,2019-01-01,Originally published on EVANNEX.Buying a car c...,Tesla’s Direct Sales Model An “Inspiration” Fo...
2,2019-01-01,"Resident Evil 2 remake release date PS4, Xbox ...","Resident Evil 2 remake release date PS4, Xbox ..."
3,2019-01-01,"ByUsing the “Organaut”, a 3D bioprinter design...",First living tissue 3D printed in space aboard...
4,2019-01-01,Large Cap Value fund seekers should consider t...,Is American Funds Mutual Fund A (AMRMX) a Stro...
...,...,...,...
3698,2020-03-30,How to unlock Terraforming - Animal Crossing: ...,How to unlock Terraforming - Animal Crossing: ...
3699,2020-03-30,Looking at the underlying holdings of the ETFs...,How The Parts Add Up: HDV Targets $97
3700,2020-03-30,"Internet providers are among a unique, and rat...",Vodafone share price history: reaping benefits...
3701,2020-03-30,The underworld is an unfun place for the most ...,ShackStream: Indie-licious takes on the underw...


In [14]:
news_df.rename(columns={'published_date': 'Published date', 'title': 'Title', 'text': 'Article'}, inplace=True)

In [15]:
# Droping all rows that contains then message'Are you a robot?' in the tittle
news_df = news_df[~news_df['Title'].isin(['Are you a robot?'])]

### Clean text data

In [17]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [18]:
stop_words = stopwords.words('english') #English language's stop words

def textpreprocess(s):
    """This function cleans the text
    Input: string to be cleaned
    Return: string after cleaning
    """
    words = [] # empty list
    
    s = s.strip().lower() # lower the string
    s = re.sub('\[.*?\]', '', s) # removes symbols (.*?\)
    s = re.sub('https?://\S+|www\.\S+', '', s) # remove URLS
    s = re.sub('<.*?>+', '', s)
    s = re.sub('[%s]' % re.escape(string.punctuation), '', s) # remove punctuations
    s = re.sub('\n', '', s) # remove next line character
    s = re.sub('\w*\d\w*', '', s)
    
    words = s.split() # split the string into list of words
    words = [word for word in words if word not in stop_words] # removing stop words
    s = ' '.join(words) # converting list to string
    
    return s

In [19]:
ColAux1 = []
sentences = list(news_df['Title'])
for sen in sentences:
    ColAux1.append(textpreprocess(sen))
ColAux2 = []
sentences = list(news_df['Article'])
for sen in sentences:
    ColAux2.append(textpreprocess(sen))

In [30]:
news_df.drop('Title', axis=1, inplace=True)
news_df.drop('Article', axis=1, inplace=True)
news_df['Title'] = ColAux1
news_df['Article'] = ColAux2
news_df.dropna(axis=0,inplace=True)

In [21]:
news_df.head(10)

Unnamed: 0,Published date,Title,Article
0,2019-01-01,hypnotic tesla model x asmr video million views,originally published evannexlet’s face tesla v...
1,2019-01-01,tesla’s direct sales model “inspiration” big a...,originally published evannexbuying car gruelin...
2,2019-01-01,resident evil remake release date xbox one pc,resident evil remake release date xbox one pc ...
3,2019-01-01,first living tissue printed space aboard inter...,byusing “organaut” bioprinter designed microgr...
4,2019-01-01,american funds mutual fund amrmx strong mutual...,large cap value fund seekers consider taking l...
5,2019-01-02,tesla institutional investors return nasdaqtsla,teslas nasdaqtsla institutional investors meas...
6,2019-01-02,analysts react teslas delivery miss tax credits,tesla inc nasdaq tsla investors missed market ...
7,2019-01-02,tesla’s delivery production report model deliv...,bytesla released production delivery figures f...
8,2019-01-02,tesla model x easily tows chevy silverado supe...,bythere denying tesla’s electric cars bound po...
9,2019-01-02,clarksville lacrosse club celebrates statewide...,clarksville tenn espn clarksville – clarksvill...


### Sentiment analysis

In [22]:
# Load the pre-trained model and tokenizer
tokenizer = RobertaTokenizer.from_pretrained('siebert/sentiment-roberta-large-english')
model = RobertaForSequenceClassification.from_pretrained('siebert/sentiment-roberta-large-english')

# Set maximum input length
max_length = 512

# Define function to preprocess text and truncate to max length
def preprocess(text):
    tokens = tokenizer(text, truncation=True, max_length=max_length, padding='max_length')
    return tokens

# Define function to predict sentiment
def predict_sentiment(text):
    tokens = preprocess(text)
    input_ids = torch.tensor(tokens['input_ids']).unsqueeze(0)
    attention_mask = torch.tensor(tokens['attention_mask']).unsqueeze(0)
    outputs = model(input_ids, attention_mask)
    _, predicted = torch.max(outputs[0], 1)
    sentiment = predicted.item()
    if sentiment == 0:
        return 'negative'
    elif sentiment == 1:
        return 'positive'


# Load your dataframe

# Apply the predict_sentiment function to each row of the DataFrame
#df['sentiment'] = df['Article'].apply(predict_sentiment)

Downloading (…)olve/main/vocab.json: 100%|██████████| 798k/798k [00:00<00:00, 2.17MB/s]
Downloading (…)olve/main/merges.txt: 100%|██████████| 456k/456k [00:00<00:00, 2.34MB/s]
Downloading (…)cial_tokens_map.json: 100%|██████████| 150/150 [00:00<00:00, 62.7kB/s]
Downloading (…)okenizer_config.json: 100%|██████████| 256/256 [00:00<00:00, 96.7kB/s]
Downloading (…)lve/main/config.json: 100%|██████████| 687/687 [00:00<00:00, 218kB/s]
Downloading pytorch_model.bin: 100%|██████████| 1.42G/1.42G [05:25<00:00, 4.37MB/s]


In [28]:
articles=news_df['Article'].tolist()
sentiments=[]
for article in articles:
    sentiment=predict_sentiment(article)
    sentiments.append(sentiment)

### Append sentiment column to dataframe

In [29]:
news_df['sentiment']=sentiments

In [25]:
news_df.head()

Unnamed: 0,Published date,Title,Article,sentiment
0,2019-01-01,hypnotic tesla model x asmr video million views,originally published evannexlet’s face tesla v...,positive
1,2019-01-01,tesla’s direct sales model “inspiration” big a...,originally published evannexbuying car gruelin...,positive
2,2019-01-01,resident evil remake release date xbox one pc,resident evil remake release date xbox one pc ...,positive
3,2019-01-01,first living tissue printed space aboard inter...,byusing “organaut” bioprinter designed microgr...,positive
4,2019-01-01,american funds mutual fund amrmx strong mutual...,large cap value fund seekers consider taking l...,positive


In [None]:
news_df.to_csv('cleaned_df_with_sentiment01.csv')

In [26]:
news_df['sentiment'].value_counts()

positive    2900
negative     582
Name: sentiment, dtype: int64

In [27]:
column_dict = news_df.dtypes.apply(lambda x: x.name).to_dict()
column_dict

{'Published date': 'object',
 'Title': 'object',
 'Article': 'object',
 'sentiment': 'object'}

### Create news table

In [None]:
try:
    conn = msql.connect(host='mysql-server', database='tesla_db', user='root', password='Secret1234')
    if conn.is_connected():
        cursor = conn.cursor()
        cursor.execute('DROP TABLE IF EXISTS news;')
        print('Creating table...')
        cursor.execute("CREATE TABLE news ( Published_Date DATE, Title VARCHAR(1024), Article TEXT, Sentiment VARCHAR(10)); ")
        conn.commit()
        cursor.close()
        print("price table is created...")
    conn.close()
except Error as e:
    print("Error while connecting to MySQL", e)

### Insert data in news table

In [None]:
try:
    conn = msql.connect(host='mysql-server', database='tesla_db', user='root', password='Secret1234')
    if conn.is_connected():
        cursor = conn.cursor()
        for i, row in news_df.iterrows():
            sql = "INSERT INTO tesla_db.news VALUES (%s,%s,%s,%s)"
            cursor.execute(sql, tuple(row))
            conn.commit()
        print("{} records added".format(i))
        cursor.close()
    conn.close()
except Error as e:
    print("Error while connecting to MySQL", e)