# Storage Data from Hadoop to Hive Databases

The main idea of this Jupyter is to connect to hadoop and read the data using Spark in order to store it in a proper database. In this case I will use Apache Hive Database.

First we import the libraries that we will use throughout the jupyter

In [3]:
# Libraries from Spark
from pyspark.sql import SparkSession # Spark Session for working with hadoop files
from pyspark.sql.functions import lit  # functions for working with text
from pyspark.sql.functions import col, sum # functions for columns 

## Configurate Spark

Configure use spark session to connect to the hive database

In [12]:
# Configuration before creating the Spark session
spark = SparkSession.builder \
    .appName("StockTweetsToHive") \
    .enableHiveSupport() \
    .getOrCreate()

## Directories in Hadoop

We create the directories where our data are located, in this case in csv format.

In [14]:
# Directories
# tweets
dir_tweets = '/data/tweets/stocktweet.csv'

# stocks
set_cvs = [
    ("/data/companies/BAC.csv", "BAC"),
    ("/data/companies/DIS.csv", "DIS"),
    ("/data/companies/PG.csv", "PG"),
    ("/data/companies/TSLA.csv", "TSLA"),
    ("/data/companies/WMT.csv", "WMT")
]

## Reading tweets

In order to be able to read tweets that sometimes contain data that could obstruct some internal spark functions, we must configure the function to read data

In [15]:
df_tweets = spark.read.option("header", "true") \
                     .option("inferSchema", "true") \
                     .option("multiLine", "true") \
                     .option("encoding", "UTF-8") \
                     .option("delimiter", ",") \
                    .csv(dir_tweets, header=True, inferSchema=True)

df_tweets.show()
print(df_tweets.count())

+------+----------+------+--------------------+
|    id|      date|ticker|               tweet|
+------+----------+------+--------------------+
|100001|01/01/2020|  AMZN|$AMZN Dow futures...|
|100002|01/01/2020|  TSLA|$TSLA Daddy's dri...|
|100003|01/01/2020|  AAPL|$AAPL Weâ€™ll been ...|
|100004|01/01/2020|  TSLA|$TSLA happy new y...|
|100005|01/01/2020|  TSLA|"$TSLA haha just ...|
|100006|01/01/2020|  TSLA|$TSLA NOBODY: Gas...|
|100007|02/01/2020|  AAPL|$AAPL $300 calls ...|
|100008|02/01/2020|  AAPL|$AAPL Remember, i...|
|100009|02/01/2020|  AAPL|$AAPL called it, ...|
|100010|02/01/2020|    HD|$HD Bought more a...|
|100011|02/01/2020|  AAPL|Apple is taking t...|
|100012|02/01/2020|  AAPL|$AAPL not a bad d...|
|100013|02/01/2020|  AAPL|$AAPL where are a...|
|100014|03/01/2020|  NVDA|$NVDA This should...|
|100015|03/01/2020|  AAPL|$AAPL tomorrow bu...|
|100016|03/01/2020|  AAPL|$AAPL Thanks for ...|
|100017|03/01/2020|  AAPL|$AAPL leave enoug...|
|100018|03/01/2020|  AAPL|$AAPL short 

## Counting Nulls

Because the function to read the data is not efficient we need to control the nulls that were registered due to formatting problems. The following is a list of the nulls by column

In [16]:
# Assuming you have a DataFrame named df
# We count the null values for each column
null_counts = df_tweets.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_tweets.columns])

# Show the count of null values for each column
null_counts.show()


+---+----+------+-----+
| id|date|ticker|tweet|
+---+----+------+-----+
|  0|   2|     2|    3|
+---+----+------+-----+



## Showing Nulls

Here I am checking where the nulls are by columns 

In [17]:
df_tweet_nulls_date = df_tweets.filter(df_tweets['date'].isNull())
df_tweet_nulls_ticker = df_tweets.filter(df_tweets['ticker'].isNull())
df_tweet_nulls_comment = df_tweets.filter(df_tweets['tweet'].isNull())

In [19]:
# Show the result
df_tweet_nulls_date.show()
print(df_tweet_nulls_date.count())
df_tweet_nulls_ticker.show()
print(df_tweet_nulls_ticker.count())
df_tweet_nulls_comment.show()
print(df_tweet_nulls_comment.count())

+--------------------+----+------+-----+
|                  id|date|ticker|tweet|
+--------------------+----+------+-----+
|And what did the ...|null|  null| null|
|             ðŸ˜·ðŸ˜·ðŸ˜·"|null|  null| null|
+--------------------+----+------+-----+

2
+--------------------+----+------+-----+
|                  id|date|ticker|tweet|
+--------------------+----+------+-----+
|And what did the ...|null|  null| null|
|             ðŸ˜·ðŸ˜·ðŸ˜·"|null|  null| null|
+--------------------+----+------+-----+

2
+--------------------+-------------+--------------------+-----+
|                  id|         date|              ticker|tweet|
+--------------------+-------------+--------------------+-----+
|And what did the ...|         null|                null| null|
|             ðŸ˜·ðŸ˜·ðŸ˜·"|         null|                null| null|
|                Well| there you go| crash accordingl...| null|
+--------------------+-------------+--------------------+-----+

3


## Dropping nulls

I need to eliminate the nulls that do not correspond to the real data.

In [20]:
df_tweets = df_tweets.dropna()
df_tweets.show()
print(df_tweets.count())

+------+----------+------+--------------------+
|    id|      date|ticker|               tweet|
+------+----------+------+--------------------+
|100001|01/01/2020|  AMZN|$AMZN Dow futures...|
|100002|01/01/2020|  TSLA|$TSLA Daddy's dri...|
|100003|01/01/2020|  AAPL|$AAPL Weâ€™ll been ...|
|100004|01/01/2020|  TSLA|$TSLA happy new y...|
|100005|01/01/2020|  TSLA|"$TSLA haha just ...|
|100006|01/01/2020|  TSLA|$TSLA NOBODY: Gas...|
|100007|02/01/2020|  AAPL|$AAPL $300 calls ...|
|100008|02/01/2020|  AAPL|$AAPL Remember, i...|
|100009|02/01/2020|  AAPL|$AAPL called it, ...|
|100010|02/01/2020|    HD|$HD Bought more a...|
|100011|02/01/2020|  AAPL|Apple is taking t...|
|100012|02/01/2020|  AAPL|$AAPL not a bad d...|
|100013|02/01/2020|  AAPL|$AAPL where are a...|
|100014|03/01/2020|  NVDA|$NVDA This should...|
|100015|03/01/2020|  AAPL|$AAPL tomorrow bu...|
|100016|03/01/2020|  AAPL|$AAPL Thanks for ...|
|100017|03/01/2020|  AAPL|$AAPL leave enoug...|
|100018|03/01/2020|  AAPL|$AAPL short 

## Reading Prices


To read the share prices I have to create a new column in this case company in order to identify the records of each cvs. 

In [21]:
# Create a list of DataFrames with the column 'company'.
dataframes = []

for cvs, company in set_cvs:
    # Load the DataFrame from the CSV file
    df = spark.read.csv(cvs, header=True, inferSchema=True)
    
    # Add a 'company' column with the company name
    df_with_company = df.withColumn("company", lit(company))
    
    # Add the DataFrame with the new column to the list
    dataframes.append(df_with_company)

# Union all DataFrames into one
df_stock = dataframes[0]
for df in dataframes[1:]:
    df_stock = df_stock.union(df)

# Display the final DataFrame
df_stock.show()
print(df_stock.count())


+----------+------------------+------------------+------------------+------------------+------------------+--------+-------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|company|
+----------+------------------+------------------+------------------+------------------+------------------+--------+-------+
|2019-12-31|35.029998779296875|  35.2599983215332|34.970001220703125|35.220001220703125|31.893484115600586|29630100|    BAC|
|2020-01-02|35.349998474121094| 35.65999984741211|35.290000915527344| 35.63999938964844|32.273807525634766|37614200|    BAC|
|2020-01-03| 34.97999954223633|35.150001525878906|  34.7599983215332|34.900001525878906|31.603710174560547|50357900|    BAC|
|2020-01-06| 34.40999984741211|34.900001525878906|34.369998931884766|34.849998474121094|31.558433532714844|42185000|    BAC|
|2020-01-07| 34.70000076293945| 34.91999816894531|34.529998779296875|34.619998931884766| 31.35015296936035|34149000|    BAC|


## Saving registers in Hive DataBase

Before we can save data to the database, we must make sure that the database is turned on and has been correctly configured.

### Showing databases 

In [22]:
spark.sql("Show databases;").show()

+---------+
|namespace|
+---------+
|benchmark|
|      ca2|
|  default|
| testhive|
+---------+



### Creating a Database

In [12]:
spark.sql("CREATE DATABASE IF NOT EXISTS ca2")

DataFrame[]

In [13]:
spark.sql("Show databases;").show()

+---------+
|namespace|
+---------+
|benchmark|
|      ca2|
|  default|
| testhive|
+---------+



#### Using the Database created

In [25]:
spark.sql("USE ca2;").show()

++
||
++
++



#### Writing the data to database hive

In [26]:
df_tweets.write \
    .mode("overwrite") \
    .saveAsTable("ca2.tweets")  # Name of the database and table in Hive

                                                                                

In [16]:
spark.sql("Select * from tweets;").show()

+------+----------+------+--------------------+
|    id|      date|ticker|               tweet|
+------+----------+------+--------------------+
|100001|01/01/2020|  AMZN|$AMZN Dow futures...|
|100002|01/01/2020|  TSLA|$TSLA Daddy's dri...|
|100003|01/01/2020|  AAPL|$AAPL Weâ€™ll been ...|
|100004|01/01/2020|  TSLA|$TSLA happy new y...|
|100005|01/01/2020|  TSLA|"$TSLA haha just ...|
|100006|01/01/2020|  TSLA|$TSLA NOBODY: Gas...|
|100007|02/01/2020|  AAPL|$AAPL $300 calls ...|
|100008|02/01/2020|  AAPL|$AAPL Remember, i...|
|100009|02/01/2020|  AAPL|$AAPL called it, ...|
|100010|02/01/2020|    HD|$HD Bought more a...|
|100011|02/01/2020|  AAPL|Apple is taking t...|
|100012|02/01/2020|  AAPL|$AAPL not a bad d...|
|100013|02/01/2020|  AAPL|$AAPL where are a...|
|100014|03/01/2020|  NVDA|$NVDA This should...|
|100015|03/01/2020|  AAPL|$AAPL tomorrow bu...|
|100016|03/01/2020|  AAPL|$AAPL Thanks for ...|
|100017|03/01/2020|  AAPL|$AAPL leave enoug...|
|100018|03/01/2020|  AAPL|$AAPL short 

In [31]:
spark.sql("Select COUNT(*) from ca2.tweets;").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



### Writing the data to database hive

In [27]:
df_stock.write \
    .mode("overwrite") \
    .saveAsTable("ca2.prices")  # Name of the database and table in Hive

24/12/10 15:27:38 WARN MemoryManager: Total allocation exceeds 50.00% (522,977,280 bytes) of heap memory
Scaling row group sizes to 97.41% for 4 writers
24/12/10 15:27:38 WARN MemoryManager: Total allocation exceeds 50.00% (522,977,280 bytes) of heap memory
Scaling row group sizes to 77.93% for 5 writers
24/12/10 15:27:38 WARN MemoryManager: Total allocation exceeds 50.00% (522,977,280 bytes) of heap memory
Scaling row group sizes to 97.41% for 4 writers


### Showing the dato from database

In [29]:
spark.sql("Select * from ca2.prices;").show()

+----------+------------------+------------------+------------------+------------------+------------------+---------+-------+
|      Date|              Open|              High|               Low|             Close|         Adj Close|   Volume|company|
+----------+------------------+------------------+------------------+------------------+------------------+---------+-------+
|2019-12-31|              27.0|28.086000442504883| 26.80533218383789|  27.8886661529541|  27.8886661529541|154285500|   TSLA|
|2020-01-02|28.299999237060547|28.713333129882812| 28.11400032043457| 28.68400001525879| 28.68400001525879|142981500|   TSLA|
|2020-01-03|29.366666793823242|30.266666412353516|29.128000259399414|29.534000396728516|29.534000396728516|266677500|   TSLA|
|2020-01-06|29.364667892456055|30.104000091552734|29.333332061767578|  30.1026668548584|  30.1026668548584|151995000|   TSLA|
|2020-01-07|30.760000228881836|31.441999435424805|30.224000930786133|31.270666122436523|31.270666122436523|268231500| 

In [20]:
spark.sql("Select COUNT(*) from ca2.prices;").show()

+--------+
|count(1)|
+--------+
|    1270|
+--------+

