Mining information about Crypto-Currencies from the Web.

by Holger Büch & Kevin Hendel

Module "Web & Social Media Analytics"
by Prof. Dr. Stephan Wilczek, Prof. Dr. Jan Kirenz
Master Data Sciene & Business Analytics
University of Media Stuttgart, Germany

Idea & Planning

  • Topic: Ongoing Hype on Crypto-Currencies in 2017 and 2018
  • Idea A: Correlate development of Tweets and Stock-Values over time.
  • Idea B: Provide additional information, that helps to interpret those developments.
  • Idea C: Automatically buy/sell stocks based on prediction. (not done)
  • Try and use new technologies
  • Gain experience as a reward for the overhead in DevOps

Data Sources

  • Twitter Stream
  • Crypto-Stock-Market Stream
  • Crypto-Prices API
  • News (planned)

  • Correlation coefficent between Stock-Values and (Sentiment of) Tweets
  • Delay between events in Stock-Values and on Twitter

Project Setup

Github for collaboration

  • Feature Branches & Pull requests
  • Ticketing / Bugs Tracking
  • Slides (gitpitch)

AWS for Hosting

  • t2.medium running Ubuntu
  • Access via SSH

  • Docker based virtualized Microservices

Docker based Microservices

  • virtualized Containers for each part of the software
  • independent from host system
  • deployable locally and in the cloud
  • brings all dependencies



FROM continuumio/miniconda3

RUN conda install -y pymongo pyyaml
RUN conda install -c conda-forge -y tweepy
RUN conda install -c gomss-nowcast schedule

RUN git clone

WORKDIR /home/CryptoCrawler/crypto-price-crawler

CMD while true; do python; done

Get defined base image online
Install additional packages
Clone project into the container
Run python script in a loop in case it exits with an error

  • one Microservice each for single (or small set of) functions
  • every Microservice is independent and stateless
  • restarting of single services or the system without breaking it
  • internal and external networking

+++ @title[Docker Compose]

Docker Compose

Configuration file (docker-compos.yaml)

version: '3'

    internal: true
    internal: false

    image: mongo:jessie
      - /data/mongodb:/data/db:rw
      - backend
      - --storageEngine
      - mmapv1

      - /data/notebooks:/home/jovyan/work
      - backend
      - frontend
      - 8888:8888
      context: ./jupyter
      - crypto-mongo
      - --NotebookApp.password='sha1:f6a0093ff7ca:be25a6064ba30e37265b0f800cbb925c636cc4fe'

docker-compose version

Define virtual networks for the containers
Internal backend network for the Database. Not accessible from outside the containers
Hybrid network. Accessible by the containers and from outside via mapped ports

Definition of two example microservices

Base Image
Connect external volumes
Define network connection
Start script inside the container and additional parameters

Jupyter Notebook
Connect external volumes
Define network connections
Map internal to external ports
Dependencies to specify build and start order
Start script inside the container and additional parameters

In Action

  • docker-compose build
  • docker-compose up

Combined log output

twitter-listener_1       | INFO - 01/30/2018 15:32:36: 374000 Tweets received. Still listening...
crypto-price-listener_1  | INFO - 01/30/2018 15:32:36: Prices are {'BTC': {'USD': 10407.7, 'EUR': 8416.89}, 'ETH': {'USD': 1115.7, 'EUR': 907.42}, 'IOT': {'USD': 2.39, 'EUR': 1.95}}
crypto-price-listener_1  | INFO - 01/30/2018 15:32:36: Trying to save the prices for timestamp 1517326356797 to mongo
crypto-price-listener_1  | INFO - 01/30/2018 15:32:36: Saved prices. Waiting until next call
crypto-dash_1            | INFO - 01/30/2018 15:32:38: - - [30/Jan/2018 15:32:38] "POST /_dash-update-component HTTP/1.1" 200 -
crypto-dash_1            | INFO - 01/30/2018 15:32:43: - - [30/Jan/2018 15:32:43] "POST /_dash-update-component HTTP/1.1" 200 -
crypto-price-listener_1  | INFO - 01/30/2018 15:32:46: Running job Every 10 seconds do startListening({'mongodb': {'host': 'crypto-mongo', 'port': 27017, 'db': 'cryptocrawl'}, 'cryptocompare': {'coinlist': '', 'price': ''}, 'collections': {'generalcrypto': {'keywords': ['blockchain', 'crypto', 'altcoins', 'altcoin']}, 'bitcoin': {'keywords': ['bitcoin', 'bitcoins'], 'currencycode': 'BTC'}, 'ethereum': {'keywords': ['ethereum'], 'currencycode': 'ETH'}, 'iota': {'keywords': ['iota', 'iotas'], 'currencycode': 'IOT'}, 'trump': {'keywords': ['trump']}, 'car2go': {'keywords': ['car2go']}}, 'dash': {'live': {'default': ['bitcoin', 'ethereum', 'iota'], 'interval': 5}}}, Database(MongoClient(host=['crypto-mongo:27017'], document_class=dict, tz_aware=False, connect=True), 'cryptocrawl')) (last run: 2018-01-30 15:32:36, next run: 2018-01-30 15:32:46)
crypto-price-listener_1  | INFO - 01/30/2018 15:32:46: Starting Currency Listener
crypto-price-listener_1  | INFO - 01/30/2018 15:32:47: Valid coins are ['BTC', 'ETH', 'IOT']
crypto-price-listener_1  | INFO - 01/30/2018 15:32:47: The coin string is BTC,ETH,IOT
crypto-price-listener_1  | INFO - 01/30/2018 15:32:48: Prices are {'BTC': {'USD': 10403.83, 'EUR': 8412.23}, 'ETH': {'USD': 1115.97, 'EUR': 903.88}, 'IOT': {'USD': 2.39, 'EUR': 1.94}}
crypto-price-listener_1  | INFO - 01/30/2018 15:32:48: Trying to save the prices for timestamp 1517326368073 to mongo
crypto-price-listener_1  | INFO - 01/30/2018 15:32:48: Saved prices. Waiting until next call
crypto-dash_1            | INFO - 01/30/2018 15:32:48: - - [30/Jan/2018 15:32:48] "POST /_dash-update-component HTTP/1.1" 200 -
crypto-dash_1            | INFO - 01/30/2018 15:32:53: - - [30/Jan/2018 15:32:53] "POST /_dash-update-component HTTP/1.1" 200 -

Centralized Configuration

  • one configuration file for changes in a single place
    host: crypto-mongo
    port: 27017
    db:   cryptocrawl


            - blockchain
            - crypto
            - altcoins
            - altcoin
            - bitcoin
            - bitcoins
        currencycode: BTC
            - ethereum
        currencycode: ETH
            - iota
            - iotas
        currencycode: IOT
            - trump
            - car2go

            - bitcoin
            - ethereum
            - iota
        interval: 5

MongoDB connection information
URLs to the CryptoCompare API

Microservice 1

Mongo DB

  • Document based Database
  • A Document contains a JSON object
  • Multiple Documents grouped to Collections
  • Documents can be queried using JSON-based Syntax

+++ @title[Store Documents]

Storing Documents with Python

from pymongo import MongoClient

client = MongoClient('crypto-mongo', 27017)
db = client['cryptocrawl']

json_obj = {
    'timestamp_ms': 1517343098,
    'text': 'Something...'


Import Module (has to be installed)
Initialize Client-Connection to MongoDB
Select Database
MongoDB ♥ JSON Documents
Write JSON as Document in a Collection

+++ @title[Query Documents]

Query Documents with Python

import pandas
from pymongo import MongoClient

client = MongoClient('crypto-mongo', 27017)
db = client['cryptocrawl']

query = {'timestamp_ms': {'$gt': 1517343098}}
projection = {'text': 1, 'timestamp_ms': 1}

cursor = db['bitcoin'].find(query, projection).limit(100)

df = pandas.DataFrame(list(cursor))

Import Module, Initialize Client-Connection to MongoDB
Define Filter (similar to WHERE in SQL)
Define Fields to return (similar to SELECT in SQL)
find() returns a cursor object (here also limited to 100 results)
Cursor can be converted into list and transformed into Pandas Dataframe

*Show in Robo RT*

+++ @title[Problem with Speed - 1]

Slow Queries over Timestamp

  • We often query for a specified Range in the Timestamp, e.g:
query = {'timestamp_ms': {'$gt': 1517243098, '$lt': 1517343098}}
  • Performance was weak
  • CPU-Usage on VM peaked

+++ @title[Problem with Speed - 2]

Create Index on Timestamp attribute

$ mongo
Mongo > use cryptocrawl
Mongo > show collections
> bitcoin
> ethereum
> generalcrypto
> iota
Mongo > db.bitcoin.createIndex({"timestamp_ms": 1}, {background:true})
Mongo > db.collection.totalIndexSize()

Start MongoDB CLI client
Open Database with Name "cryptocrawler"
List all Collections of this DB
Create Index on Attribute 'timestamp_ms'. Repeat for all Collections.
Show Size of Indexes. Should fit in RAM.

+++ @title[Problem with Aggregation - 1]

Aggregate by Timestamp in Milliseconds

  • We need to aggregate on Time-Intervals, e.g. for Tweets per Hour
  • MongoDB can aggregate DateTime-Object on Intervals
  • But we get Timestamps in Milliseconds from Twitter
  • How to aggregate Integer with Milliseconds on hourly Interval?

+++ @title[Problem with Aggregation - 2]

Aggregate using Math

  • To aggregate Milliseconds by Hours, get Milliseconds per Hour: |
1 Hour is 1000ms * 60sec * 60min = 3.600.000 ms
  • Then divide Timestamp by this Value and round to floor: |
floor (timestamp / 3.600.000)
  • All Timestamps from the same Hour will result in the same Value |
  • Then Aggregation can be done on this Value |
  • Sadly, MongoDB has no floor-Function |
  • Luckily, it has a modulo- Function: |
timestamp/3.600.000 – ( (timestamp/3.600.000) mod 1)

+++ @title[Problem with Aggregation - 3]

Alternative Solutions

  • Cast to DateTime during Query
  • Convert & store Milliseconds as DateTime-Value

Would those be faster?

---?image=assets/bg-twitterlistener.png @title[Twitter Stream Listener]

Microservice 2

Twitter Stream Listener

+++ @title[Tweepy]

Using Tweepy to listen to Twitter Streaming API

import tweepy

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        print(status.text)  # Then store the tweets...
    def on_error(self, status_code):
        if status_code == 420:
            time.sleep(300) # Then reconnect ...

auth = tweepy.OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)

stream_listener = MyStreamListener(conf=conf)
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=list(['bitcoin','iota','...']), async=True)

Import Module
Inherit StreamListener class
Define what to do if tweet arrives
Handle API Error, especially 420 to avoid penalty
Set credentials and create API object
Instanciate class, start listening to Tweets with keywords

+++ @title[Information overload - 1]

Too much information

  • Over 500 MB Data during first two Hours.
  • Over 900 Tweets per Minute

Tweets after two hours Tweets per 15min, only crypto topics

+++ @title[Information overload - 2]

Filter Tweets

  • Exclude non-English Tweets
  • Exclude Retweets

Store subset of Attributes

  • TweetID
  • AuthorID
  • Text
  • Timestamps
  • Geo-Information

+++ @title[Tweepy Bug - 1]

Bug in Tweepy Module

Tweepy kept raising Exceptions after some Days of running:

File "", line 109, in myStream.userstream("with=following")
File "/tweepy/", line 394, in userstream
File "/tweepy/", line 361, in _start
File "/tweepy/", line 294, in _run
raise exception
AttributeError: 'NoneType' object has no attribute 'strip'

tweepy/tweepy#869 (open since March 2017)

+++ @title[Tweepy Bug - 2]

Implement Workaround

Handle Exceptions and reconnect Tweepy:

def startListening():
    """Start listening to twitter streams."""
        stream_listener = MyStreamListener(conf=conf)
        # [...]
    except Exception as e:
        logger.error('Exception raised!', e)

Auto-restart Microservice on exit (just in case...):

while true; do python; done

Microservice 3

Crypto Price Crawler


Crypto Price Crawler

  • receive prices in a defined interval
  • call api to get all currencies and their current values
  • save the values to the database each in a separate collection


Scheduled listener

def init():
    with open('../config.yaml', 'r') as stream:
        conf = yaml.load(stream)
    # Open a connection to mongo:
    client = MongoClient(conf['mongodb']['host'], conf['mongodb']['port'])
    db = client[conf['mongodb']['db']]
    schedule.every(10), conf, db)

Load config file
Connect to the database
Schedule listener to run every 10 seconds



def getPricesOnce(currencies, conf):
    """Gets the current price for the currencies defined in the config.yaml"""
    payload = { 'fsyms': currencies, 'tsyms': 'USD,EUR' }
    response = requests.get(conf['cryptocompare']['price'], params=payload)
    if response.status_code == 200:
        prices = json.loads(response.content)'Prices are {}'.format(prices))
        return prices
        logger.warn('Could not get prices. Error code {}'.format(response.status_code))
        return False

Crypto currencies as a comma-separated string and conf (config) as parameters
Build payload for the REST-Call
Send request and save response
Handle successful request and parse content
Handle error case

Microservice 4

Crypto API Wrapper


Crypto API Wrapper

  • serve an API as an interface between the dashboard and the database or external APIs

  • benefits

    • prevent direct database access
    • filter data and fit format for later needs
    • less logic in the frontend
  • using Flask framework to build the actual API


Random tweets endpoint

  • return random tweets about certain topics in a defined timeframe

Example: /tweets?topics=bitcoin,ethereum,iota&amount=5&from=1516110498&to=1516290284

def getTweetsForTopics(topicstring, amount, fromTs, toTs):
    topicList = parseTopics(topicstring)
    randomTweets = []
    for topic in topicList:
        cursor = db[topic].aggregate([
                { '$match': { 'timestamp_ms': {'$gt': fromTs , '$lt': toTs }}},
                { '$sample': { 'size': amount } },
                { '$project' : { '_id': 0 } }
        tweetListForTopic = list(cursor)
        identifiedTweetListForTopic = []
        for singleTweet in tweetListForTopic:
            identifiedTweet = { 'topic': topic, 'tweet': singleTweet }
        randomTweets = randomTweets + identifiedTweetListForTopic
    if len(randomTweets) >= amount:
        randomListFinal = random.sample(randomTweets, amount)
        randomListFinal = randomTweets
    resultDict = {'tweets': randomListFinal }
    return resultDict

Get URL parameters as input
Get list of topics from the comma separated topicstring
Do one request for each topic because every topic is in a separate mongo collection
Build mongo aggregation
Match all entries between the start and the end timestamps
Get a random sample with the specified size from the matched entries
Remove the _id information from the results
Add the topic to each tweet
From all sample tweets from the collections we take a sample as big as the specified amount


Add endpoint in Flask

class RandomTweets(Resource):
    def get(self):
        now = int(time.time())

        fromTs = handleTs(request.args.get('from'), now) * 1000
        toTs = handleTs(request.args.get('to'), now) * 1000

        topicstring = request.args.get('topics')
        amount = parseAmount(request.args.get('amount'))

        result = getTweetsForTopics(topicstring, amount, fromTs, toTs)
        return jsonify(result)

api.add_resource(RandomTweets, '/tweets')

Get current timestamp for timeframe calculations
Handle the begin of the timeframe. Check that it is not in the future. Calculate milliseconds from seconds
Handle the end of the timeframe. Check that it is not in the future. Calculate milliseconds from seconds. Set now as default if not set.
Get the passed topicstring
Handle the amount
Do the actual request with all parameters
Return the result as a json
Add the endpoint to Flask at the route /tweets


Additional historical price endpoint

  • endpoint to retrieve historical prices from the CryptoCompare API
  • implemented to take a timeframe and parse the parameters to fit the CryptoCompare API definitions
  • returns the daily, hourly or minutely prices for a crypto currency
  • planned to replace or support the price listener or replace missing values from system outages
  • not yet integrated into the dashboard

Microservice 5

Anomaly Detection

+++ @title[Idea]

Detect 'unusual' Events in:

  • Amount of Tweets received
  • Sentiment
  • Prices of Crypto-Currencies

Use this information for:

  • Visualization in Dashboard
  • Easing the Interpretation of the Data
  • Searching for News in those Time-Ranges (not done)

  • Research on Algorithms |
  • Tested ARIMA Model first (in Jupyter Notebook) |
  • Data doesn't cover a Time-Span long enough |
  • Twitter itself published an Algorithm |
  • Implemented only in R (Python-Versions are creepy) |
  • Solution: Implement very simplified Version |

Twitter's Algorithm explained

+++ @title[Step 1]

Step 1: Seasonal Decomposition

Seasonal Decomposition

+++ @title[Step 2]

Step 2: Extreme Studentized Deviate test (ESD)

Detect Outliers in univariant Data that is approx. normal distributed (had to be tested before!).

  • Set Parameter for Maximal Outliers |
  • Set Parameter for Significance p |
  • ESD test detects 1 largest Outlier |
  • Calculates Coefficient |
  • ESD test detects 2 largest Outliers |
  • Calculates Coefficient |
  • ... |
  • Optimal Number of Outliers selected by Coefficient |

+++ @title[Results]

Test for normal distribution

Normal Distribution

(Tested already during ARIMA Modelling approach)

+++ @title[Results]

Results of ESD Test

Number of outliers:  7
Indices of outliers:  [73, 63, 111, 119, 87, 117, 118]
        R      Lambda                   R      Lambda
 1   3.92406   2.85719          6   2.82568   2.84412
 2   3.36138   2.85462          7   2.92393   2.84144
 3   2.95692   2.85203          8   2.76510   2.83873
 4   2.96757   2.84941          9   2.51757   2.83600
 5   2.80918   2.84678          10  2.54265   2.83325


+++ @title[Implementation in Python]

Implementation in Python

import statsmodels.api as sm
from PyAstronomy import pyasl

# Seasonal Decomposition
model = sm.tsa.seasonal_decompose(ary, freq=freq)
resid = model.resid

# [...] clean/transform resid values

anomalies = pyasl.generalizedESD(resid, max_anoms, p_value)

Load Modules
Seasonal Decompositon
We only need resid Values
Some Transformation for next Step (e.g. remove NaN)
Apply ESD

Whole thing wrapped as Flask API (Show in Postman)

Microservice 6

Topic Modelling

+++ @title[Idea]

Identify Topics in Tweet-Texts

  • Aggregated view on what the Tweets are about
  • Add Information to Dashboard
  • Search for those Topics in a News API (not done)

+++ @title[Method]

Latent Dirichlet Allocation (LDA)

  • A "Document" contains some Topics with different Weights |
  • A "Topic" is Probability Distribution about all Words in Corpus |
  • A "Word" can be assigned to more than one Topics |

Does this work for such short Documents like Tweets? Lets try!

exclude_custom = '“”…‘’x'
exclude = set(string.punctuation + exclude_custom)

stop_custom = ['rt', 'bitcoin', 'bitcoins', 'iota', 'ethereum', 'btc',
               'eth', 'iot', 'ltc', 'litecoin', 'litecoins', 'iotas',
               'ltc', 'cryptocurrency', 'crypto', 'cryptocurrencies',
stop = set(stopwords.words('english') + stop_custom)

# Remove punctuation
doc = ''.join(ch for ch in doc
                      if ch not in exclude)

# Remove URLS
doc = ' '.join([i for i in doc.lower().split()
                      if not i.startswith('http')])

# Remove anything containing numbers
doc = ' '.join([i for i in doc.lower().split()
                      if not any(char.isdigit() for char in i)])

# Remove short words
doc = ' '.join([i for i in doc.lower().split()
                      if len(i) >= 4])

# Lemmatize
# clean_doc = ' '.join(lemma.lemmatize(word)
#     for word in clean_doc.split())

# Remove Stopwords
doc = ' '.join([i for i in doc.lower().split()
                      if i not in stop])

Define Chars to remove
Define Stopwords
Remove Punctuation and special Chars
Remove URLs
Remove Words containing Numbers
Remove Words below 4 Chars
Remove Stopwords

+++ @title[LDA in Python]

LDA in Python

import gensim
from gensim import corpora

dictionary = corpora.Dictionary(docs)

doc_term_matrix = [dictionary.doc2bow(doc) for doc in docs]

Lda = gensim.models.ldamodel.LdaModel
ldamodel = Lda(doc_term_matrix, num_topics=num_topics,
               id2word=dictionary, passes=20)

# [...] Converts LDA model into nice list

Import Gensim Modul for Vector Space Modelling
Load Documents into Corpora Dictionary
Prepare Document Term Matrix
Do Modelling with Parameter for Number of Topics and Passes used
Convert Results into consumable List

Whole thing wrapped as Flask API (Show in Postman)

---?image=assets/bg-senti.png @title[Microservice - Sentiment Analysis]

Microservice 7

Sentiment Analysis

+++ @title[Method]

Simple Wordlist-based approach

  • Research about Sentiment Analysis in financial Sector
  • Found Wordlist by Loughran & McDonald (2011)
  • Decided to use a simple Wordlist based approach

+++ @title[Implementation]


  • As a scheduled Service
  • Runs every 5 minutes
  • Searches for Tweets without Sentiment in MongoDB
  • Counts positive & negative Words contained in the Tweet
  • Sentiment Score = Positive - Negative

---?image=assets/bg-jupyter.png @title[Microservice - Jupyter Notebook]

Microservice 8

Jupyter Notebook


Integrate an instance of Jupyter notebook

  • Jupyter notebook inside of an own container
  • connection to the database and all other services
  • test environment for database and API calls
  • fast response without redeploying of any of the components

---?image=assets/bg-dash.png @title[Microservice - Dash]

Microservice 9



Show random tweets from the selected timeframe

            html.Span('€', className='icon'),
             'Avg. Stock Prices per Hour'])
    ], className='title'),
            style={'width': '878px', 'height': '250px'},
    ], className='content')
], className='box'),

Surrounding div element
Array of inner elements
Stock graph

Wrap up

+++ @title[What we have learned]

What we have learned

  • Plotly Dash is nice, but Data Management & Cross-Selection is quite difficult.
  • Take more care about Exception Handling, especially for critical Services (e.g. Stream Listener)
  • Docker(-Compose) is really cool for Development & Deployment!
  • Putting the right Data into Models is crucial (e.g. Missing Data in Anomaly-Detection)
  • Choosing the right Model for the Data is not easy (e.g. LDA for Tweets)

+++ @title[What we would improve]

What we would improve (1)

Architecture & Code

  • Connect Frontend through only one single API
  • Refactor Dashboard-Code

Topic Modelling

  • Search for a better Model for short Texts
  • Research, if LDA can be speed up somehow

Anomaly Detection

  • More complete Twitter Algorithm Implementation
  • Or build R Microservice

+++ @title[What we would improve]

What we would improve (2)

Dashboard UX / UI

  • Non blocking Interactions
  • Add loading Indicators
  • Improve Cross-Selection
  • Improve Performance, e.g. on Data-Loading

Additional Features

  • Leverage News APIs
  • Calculate and display Correlation Coefficent
  • Show Tweets on Map

Thank you for your Attention!