# NoSQL solution: MongoDB

![alt text](https://img.icons8.com/color/48/000000/mongodb.png "MongoDB")

### Installation on local machine (MacOS)

* Install brew package manager: [brew website](https://brew.sh/#install)
* From terminal issue the following commands:
```shell
brew tap mongodb/brew
```
* Install the latest available release of MongoDB coommunity server
```shell
brew install mongodb-community
```

# Data Sources

### 1. Unstructured dataset: Archive.org Tweets

[Link to dataset](https://archive.org/details/archiveteam-twitter-stream-2019-06)

The tweets were scraped from the "Spritzer" Twitter stream, which is a realtime stream of a random sample of `1%` of all tweets. There are approximately `500 million` tweets for each day.
The dataset considered refers to the month of June.


The decompressed dataset contains `151 million` tweets and occupies `449.5 GiB`.

We tackled the relatively big dimension of the dataset by automating the exctraction of the compressed folders and the insertion into a MongoDB storing data in an external hard drive.

#### Connection to MongoDB running on local machine with pymongo module

In [None]:
from pymongo import MongoClient
from bson.son import SON
from bson.regex import Regex

# Connecting to MongoDB
client = MongoClient()
client = MongoClient('localhost', 27017)

# Creation of DB and collection
db = client.finalmongotweets
collection = db.collection

### Decompressig Files

In [None]:
import bz2
import os
import json


tweets = "/Volumes/G-HDD/Tweets/"

dirs = [f.path for f in os.scandir(tweets) if f.is_dir()]
#29
for loop1 in range(19,29): # range(len(dirs)):
    file_counter = 0
    subdirs = [f.path for f in os.scandir(dirs[loop1]) if f.is_dir()]
    for loop2 in range(len(subdirs)):
        print(f"Currently going through {subdirs[loop2]}")
        files = [f for f in os.listdir(subdirs[loop2]) if os.path.isfile(os.path.join(subdirs[loop2], f))]
        for i in files:
            if i.endswith('.json.bz2'):
                file_counter += 1
                filepath = os.path.join(subdirs[loop2], i)
                newfilepath = os.path.join(subdirs[loop2], "{0}.json".format(file_counter))
                with open(newfilepath, 'wb') as new_file, bz2.BZ2File(filepath, 'rb') as file:
                    for data in iter(lambda : file.read(100 * 1024), b''):
                        new_file.write(data)

### Uploading Files

In [None]:
tweets = "/Volumes/G-HDD/Tweets/"

dirs = [f.path for f in os.scandir(tweets) if f.is_dir()]

for loop1 in range(3): # range(len(dirs)):
    subdirs = [f.path for f in os.scandir(dirs[loop1]) if f.is_dir()]
    for loop2 in range(3): # range(len(subdirs)):
        print(f"Currently going through {subdirs[loop2]}")
        files = [f for f in os.listdir(subdirs[loop2]) if os.path.isfile(os.path.join(subdirs[loop2], f))]
        for i in files:
            if i.endswith('.json'):
                print(f"Currently adding {i}")
                data = [json.loads(line) for line in open(os.path.join(subdirs[loop2], i), 'r')]
                db.collection.insert_many(data)

### 2. Structured dataset: Scraping Twitter users' timeline

This code returns .csv files containing the latest 3200 tweets (Twitter API limitation) from each account.

Accounts considered following [this article](https://www.forbes.com/sites/alapshah/2017/11/16/the-100-best-twitter-accounts-for-finance/):
```
['John_Hempton', 'BarbarianCap', 'muddywatersre', 'AlderLaneeggs', 'CitronResearch', 'BrattleStCap', 'KerrisdaleCap', 'modestproposal1', 'marketfolly', 'EventDrivenMgr', 'ActivistShorts', 'Carl_C_Icahn', 'LongShortTrader', 'DonutShorts', 'sprucepointcap', 'BluegrassCap', 'SIRF_Report', 'NoonSixCap', 'WallStCynic', 'GothamResearch', 'herbgreenberg', 'Valuetrap13', 'valuewalk', 'UnionSquareGrp', 'PlanMaestro', 'ReformedBroker', 'SkeleCap', 'FatTailCapital', 'ShortSightedCap', 'footnoted', 'Mega_Man_2', 'JacobWolinsky', 'zerohedge', 'FundyLongShort', 'MugatuCapital', 'DumbLuckCapital', 'Hedge_FundGirl', 'PresciencePoint', 'DavidSchawel', 'pmarca', 'fundiescapital', 'ActAccordingly', 'EquityNYC', 'nosunkcosts', 'MicroFundy', 'BergenCapital', 'marginalidea', 'Keubiko', 'Jesse_Livermore', 'PainCapital']
```

In [None]:
# Authentication, after creating a Twitter developer account

import csv
import tweepy as tw

consumer_key = 'mykey'
consumer_secret = 'mysecret'
access_token = 'mytoken'
access_token_secret = 'mytokensecret'
auth = tw.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tw.API(auth)

# List of accounts that are considered experts in the financial
accounts = ['marketfolly', 'EventDrivenMgr', 'ActivistShorts', 'Carl_C_Icahn', 'LongShortTrader', 'DonutShorts', 'sprucepointcap', 'BluegrassCap', 'SIRF_Report', 'NoonSixCap', 'WallStCynic', 'GothamResearch', 'herbgreenberg', 'Valuetrap13', 'valuewalk', 'UnionSquareGrp', 'PlanMaestro', 'ReformedBroker', 'SkeleCap', 'FatTailCapital', 'ShortSightedCap', 'footnoted', 'Mega_Man_2', 'JacobWolinsky', 'zerohedge', 'FundyLongShort', 'MugatuCapital', 'DumbLuckCapital', 'Hedge_FundGirl', 'PresciencePoint', 'DavidSchawel', 'pmarca', 'fundiescapital', 'ActAccordingly', 'EquityNYC', 'nosunkcosts', 'MicroFundy', 'BergenCapital', 'marginalidea', 'Keubiko', 'Jesse_Livermore', 'PainCapital']

# Scraping the accounts
for account in accounts:
    twdata = []
    print(f"Currently scraping {account} twitter account.")

    for tweet in tw.Cursor(api.user_timeline, id=account, tweet_mode="extended").items(3200):
        user = tweet.user.screen_name
        tweets = tweet.full_text
        tweet_id = tweet.id
        date = tweet.created_at
        rts = tweet.retweet_count
        fav = tweet.favorite_count
        hashtags = ','.join([hashtag["text"] for hashtag in tweet.entities["hashtags"]])
        source = tweet.source
        twdata.append([user, tweets, tweet_id, date, rts, fav, hashtags, source])
        
    # Saving to csv
    csv_file = account + "_tweets.csv"
    print(f">>> Saving {csv_file}.")

    with open(csv_file, 'w+') as csvFile:
        writer = csv.writer(csvFile, delimiter=',')
        writer.writerow(['Username', 'Tweets', 'Tweet ID', 'Created At', 'Retweets', 'Favorites', 'Hashtags', 'Source'])
        writer.writerows(twdata)

### 3. Financial data

Historical financial data were retrieved from [Yahoo Finance](https://finance.yahoo.com/). In order to retrieve more data points, like intraday prices, World Trading Data API can be used, however the number of free requests were very limited.

In [None]:
# Requesting single stock historical data from WorldTradingData API

import requests
import json

url = 'https://api.worldtradingdata.com/api/v1/history' # for intraday: https://intraday.worldtradingdata.com/api/v1/intraday
params = {
  'symbol': 'TSLA',
  'api_token': 'mykey'
  'date_from': '2019-01-01'
  'date_to': '2020-01-01'
}
response = requests.request('GET', url, params=params)
response.json()

### Queries

1. Returns all the tweets in english that contain `tsla` or `tesla` in their text.

In [None]:
use tweets;
db.myTry.find(
    { 
        "lang" : "en", 
        "text" : /.tesla./i, /.tsla./i
    });

2. Counts the number of tweets posted per language

In [None]:
use tweets;
db.getCollection("myTry").aggregate(
    [ { 
            "$group" : { 
                "_id" : { 
                    "lang" : "$lang"
                }, 
                "COUNT(*)" : { 
                    "$sum" : NumberInt(1)
                }
            }
        }, 
        { 
            "$project" : { 
                "lang" : "$_id.lang", 
                "COUNT(*)" : "$COUNT(*)", 
                "_id" : NumberInt(0)
            }
        }
    ]);


3. Counts the total number of tweets per person

In [None]:
use tweets;
db.getCollection("myTry").aggregate(
    [
        { 
            "$group" : { 
                "_id" : { 
                    "id" : "$id"
                }, 
                "COUNT(*)" : { 
                    "$sum" : NumberInt(1)
                }
            }
        }, 
        { 
            "$project" : { 
                "id" : "$_id.id", 
                "COUNT(*)" : "$COUNT(*)", 
                "_id" : NumberInt(0)
            }
        }
    ]);


4. Counts the number of tweets per day

* Add a new field to convert 'created_at' field from string to date datatype; save the new information with the whole collection into a view (convertDate)

In [None]:
use tweets;

dateConversionStage = {
   $addFields: {
      convertedDate: { $toDate: "$created_at" }
   }
};

#using view support from studio 3T
db.myTry.aggregate([
    dateConversionStage
])

* From previous view extract the most important data and split the converted date into date and time. This data are the ones which are going to be used in our analysis. Saved fields:

In [None]:
#create view using support from studio 3T
db.getCollection("convertDate").aggregate([   
    { 
        "$match" : { 
            "lang" : "en",
            "text" : /^.* tesla .*$/i
        }
    },
    { 
        "$project" : { 
            "user" : "$user.screen_name", 
            "source" : "$source", 
            "geo" : "$geo", 
            "coordinates" : "coordinates", 
            "place" : "$place", 
            "text" : "$text", 
            "lang" : "$lang", 
            "date" : { 
                "$dateToString" : { 
                    "format" : "%Y-%m-%d", 
                    "date" : "$convertedDate"
                }
            }, 
            "time" : { 
                "$dateToString" : { 
                    "format" : "%H:%M:%S:%L", 
                    "date" : "$convertedDate"
                }
            }
        }
    }
], 
{ 
    "allowDiskUse" : false
}
);

* From ImpData group and count the number of tweets posted per each day 

In [None]:
use tweets;
db.getCollection("ImpData").aggregate(
    [   { 
            "$group" : { 
                "_id" : { 
                    "date" : "$date"
                }, 
                "COUNT(*)" : { 
                    "$sum" : NumberInt(1)
                }
            }
        }, 
        { 
            "$project" : { 
                "date" : "$_id.date", 
                "COUNT(*)" : "$COUNT(*)", 
                "_id" : NumberInt(0)
            }
        }
    ], 
    { 
        "allowDiskUse" : true
    }
);
